Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::bail_bug;
54use crate::component::func::{self, Func, call_post_return};
55use crate::component::{
56    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::prelude::*;
60use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63use crate::{
64    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::component::{
87    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
88    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
89    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
90    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
91    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
92};
93use wasmtime_environ::packed_option::ReservedValue;
94use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112/// Constant defined in the Component Model spec to indicate that the async
113/// intrinsic (e.g. `future.write`) has not yet completed.
114const BLOCKED: u32 = 0xffff_ffff;
115
116/// Corresponds to `CallState` in the upstream spec.
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119    Starting = 0,
120    Started = 1,
121    Returned = 2,
122    StartCancelled = 3,
123    ReturnCancelled = 4,
124}
125
126impl Status {
127    /// Packs this status and the optional `waitable` provided into a 32-bit
128    /// result that the canonical ABI requires.
129    ///
130    /// The low 4 bits are reserved for the status while the upper 28 bits are
131    /// the waitable, if present.
132    pub fn pack(self, waitable: Option<u32>) -> u32 {
133        assert!(matches!(self, Status::Returned) == waitable.is_none());
134        let waitable = waitable.unwrap_or(0);
135        assert!(waitable < (1 << 28));
136        (waitable << 4) | (self as u32)
137    }
138}
139
140/// Corresponds to `EventCode` in the Component Model spec, plus related payload
141/// data.
142#[derive(Clone, Copy, Debug)]
143enum Event {
144    None,
145    Subtask {
146        status: Status,
147    },
148    StreamRead {
149        code: ReturnCode,
150        pending: Option<(TypeStreamTableIndex, u32)>,
151    },
152    StreamWrite {
153        code: ReturnCode,
154        pending: Option<(TypeStreamTableIndex, u32)>,
155    },
156    FutureRead {
157        code: ReturnCode,
158        pending: Option<(TypeFutureTableIndex, u32)>,
159    },
160    FutureWrite {
161        code: ReturnCode,
162        pending: Option<(TypeFutureTableIndex, u32)>,
163    },
164    Cancelled,
165}
166
167impl Event {
168    /// Lower this event to core Wasm integers for delivery to the guest.
169    ///
170    /// Note that the waitable handle, if any, is assumed to be lowered
171    /// separately.
172    fn parts(self) -> (u32, u32) {
173        const EVENT_NONE: u32 = 0;
174        const EVENT_SUBTASK: u32 = 1;
175        const EVENT_STREAM_READ: u32 = 2;
176        const EVENT_STREAM_WRITE: u32 = 3;
177        const EVENT_FUTURE_READ: u32 = 4;
178        const EVENT_FUTURE_WRITE: u32 = 5;
179        const EVENT_CANCELLED: u32 = 6;
180        match self {
181            Event::None => (EVENT_NONE, 0),
182            Event::Cancelled => (EVENT_CANCELLED, 0),
183            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188        }
189    }
190}
191
192/// Corresponds to `CallbackCode` in the spec.
193mod callback_code {
194    pub const EXIT: u32 = 0;
195    pub const YIELD: u32 = 1;
196    pub const WAIT: u32 = 2;
197}
198
199/// A flag indicating that the callee is an async-lowered export.
200///
201/// This may be passed to the `async-start` intrinsic from a fused adapter.
202const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204/// Provides access to either store data (via the `get` method) or the store
205/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
206/// instance to which the current host task belongs.
207///
208/// See [`Accessor::with`] for details.
209pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210    store: StoreContextMut<'a, T>,
211    get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216    D: HasData + ?Sized,
217    T: 'static,
218{
219    /// Creates a new [`Access`] from its component parts.
220    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221        Self { store, get_data }
222    }
223
224    /// Get mutable access to the store data.
225    pub fn data_mut(&mut self) -> &mut T {
226        self.store.data_mut()
227    }
228
229    /// Get mutable access to the store data.
230    pub fn get(&mut self) -> D::Data<'_> {
231        (self.get_data)(self.data_mut())
232    }
233
234    /// Spawn a background task.
235    ///
236    /// See [`Accessor::spawn`] for details.
237    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238    where
239        T: 'static,
240    {
241        let accessor = Accessor {
242            get_data: self.get_data,
243            token: StoreToken::new(self.store.as_context_mut()),
244        };
245        self.store
246            .as_context_mut()
247            .spawn_with_accessor(accessor, task)
248    }
249
250    /// Returns the getter this accessor is using to project from `T` into
251    /// `D::Data`.
252    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253        self.get_data
254    }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259    D: HasData + ?Sized,
260    T: 'static,
261{
262    type Data = T;
263
264    fn as_context(&self) -> StoreContext<'_, T> {
265        self.store.as_context()
266    }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271    D: HasData + ?Sized,
272    T: 'static,
273{
274    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275        self.store.as_context_mut()
276    }
277}
278
279/// Provides scoped mutable access to store data in the context of a concurrent
280/// host task future.
281///
282/// This allows multiple host task futures to execute concurrently and access
283/// the store between (but not across) `await` points.
284///
285/// # Rationale
286///
287/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
288/// `D::Data<'_>`. The problem this is solving, however, is that it does not
289/// literally store these values. The basic problem is that when a concurrent
290/// host future is being polled it has access to `&mut T` (and the whole
291/// `Store`) but when it's not being polled it does not have access to these
292/// values. This reflects how the store is only ever polling one future at a
293/// time so the store is effectively being passed between futures.
294///
295/// Rust's `Future` trait, however, has no means of passing a `Store`
296/// temporarily between futures. The [`Context`](std::task::Context) type does
297/// not have the ability to attach arbitrary information to it at this time.
298/// This type, [`Accessor`], is used to bridge this expressivity gap.
299///
300/// The [`Accessor`] type here represents the ability to acquire, temporarily in
301/// a synchronous manner, the current store. The [`Accessor::with`] function
302/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
303/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
304/// not take an `async` closure as its argument, instead it's a synchronous
305/// closure which must complete during on run of `Future::poll`. This reflects
306/// how the store is temporarily made available while a host future is being
307/// polled.
308///
309/// # Implementation
310///
311/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
312/// this type additionally doesn't even have a lifetime parameter. This is
313/// instead a representation of proof of the ability to acquire these while a
314/// future is being polled. Wasmtime will, when it polls a host future,
315/// configure ambient state such that the `Accessor` that a future closes over
316/// will work and be able to access the store.
317///
318/// This has a number of implications for users such as:
319///
320/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
321///   the lifetime of a single future.
322/// * A future is expected to, however, close over an `Accessor` and keep it
323///   alive probably for the duration of the entire future.
324/// * Different host futures will be given different `Accessor`s, and that's
325///   intentional.
326/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
327///   alleviates some otherwise required bounds to be written down.
328///
329/// # Using `Accessor` in `Drop`
330///
331/// The methods on `Accessor` are only expected to work in the context of
332/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
333/// host future can be dropped at any time throughout the system and Wasmtime
334/// store context is not necessarily available at that time. It's recommended to
335/// not use `Accessor` methods in anything connected to a `Drop` implementation
336/// as they will panic and have unintended results. If you run into this though
337/// feel free to file an issue on the Wasmtime repository.
338pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340    D: HasData + ?Sized,
341{
342    token: StoreToken<T>,
343    get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346/// A helper trait to take any type of accessor-with-data in functions.
347///
348/// This trait is similar to [`AsContextMut`] except that it's used when
349/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
350/// [`Accessor`] is the main type used in concurrent settings and is passed to
351/// functions such as [`Func::call_concurrent`].
352///
353/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
354/// this trait. This effectively means that regardless of the `D` in
355/// `Accessor<T, D>` it can still be passed to a function which just needs a
356/// store accessor.
357///
358/// Acquiring an [`Accessor`] can be done through
359/// [`StoreContextMut::run_concurrent`] for example or in a host function
360/// through
361/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
362pub trait AsAccessor {
363    /// The `T` in `Store<T>` that this accessor refers to.
364    type Data: 'static;
365
366    /// The `D` in `Accessor<T, D>`, or the projection out of
367    /// `Self::Data`.
368    type AccessorData: HasData + ?Sized;
369
370    /// Returns the accessor that this is referring to.
371    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375    type Data = T::Data;
376    type AccessorData = T::AccessorData;
377
378    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379        T::as_accessor(self)
380    }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384    type Data = T;
385    type AccessorData = D;
386
387    fn as_accessor(&self) -> &Accessor<T, D> {
388        self
389    }
390}
391
392// Note that it is intentional at this time that `Accessor` does not actually
393// store `&mut T` or anything similar. This distinctly enables the `Accessor`
394// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
395// that matter). This is used to ergonomically simplify bindings where the
396// majority of the time `Accessor` is closed over in a future which then needs
397// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
398// you already have to write `T: 'static`...) it helps to avoid this.
399//
400// Note as well that `Accessor` doesn't actually store its data at all. Instead
401// it's more of a "proof" of what can be accessed from TLS. API design around
402// `Accessor` and functions like `Linker::func_wrap_concurrent` are
403// intentionally made to ensure that `Accessor` is ideally only used in the
404// context that TLS variables are actually set. For example host functions are
405// given `&Accessor`, not `Accessor`, and this prevents them from persisting
406// the value outside of a future. Within the future the TLS variables are all
407// guaranteed to be set while the future is being polled.
408//
409// Finally though this is not an ironclad guarantee, but nor does it need to be.
410// The TLS APIs are designed to panic or otherwise model usage where they're
411// called recursively or similar. It's hoped that code cannot be constructed to
412// actually hit this at runtime but this is not a safety requirement at this
413// time.
414const _: () = {
415    const fn assert<T: Send + Sync>() {}
416    assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420    /// Creates a new `Accessor` backed by the specified functions.
421    ///
422    /// - `get`: used to retrieve the store
423    ///
424    /// - `get_data`: used to "project" from the store's associated data to
425    /// another type (e.g. a field of that data or a wrapper around it).
426    ///
427    /// - `spawn`: used to queue spawned background tasks to be run later
428    pub(crate) fn new(token: StoreToken<T>) -> Self {
429        Self {
430            token,
431            get_data: |x| x,
432        }
433    }
434}
435
436impl<T, D> Accessor<T, D>
437where
438    D: HasData + ?Sized,
439{
440    /// Run the specified closure, passing it mutable access to the store.
441    ///
442    /// This function is one of the main building blocks of the [`Accessor`]
443    /// type. This yields synchronous, blocking, access to the store via an
444    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
445    /// providing the ability to access `D` via [`Access::get`]. Note that the
446    /// `fun` here is given only temporary access to the store and `T`/`D`
447    /// meaning that the return value `R` here is not allowed to capture borrows
448    /// into the two. If access is needed to data within `T` or `D` outside of
449    /// this closure then it must be `clone`d out, for example.
450    ///
451    /// # Panics
452    ///
453    /// This function will panic if it is call recursively with any other
454    /// accessor already in scope. For example if `with` is called within `fun`,
455    /// then this function will panic. It is up to the embedder to ensure that
456    /// this does not happen.
457    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458        tls::get(|vmstore| {
459            fun(Access {
460                store: self.token.as_context_mut(vmstore),
461                get_data: self.get_data,
462            })
463        })
464    }
465
466    /// Returns the getter this accessor is using to project from `T` into
467    /// `D::Data`.
468    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469        self.get_data
470    }
471
472    /// Changes this accessor to access `D2` instead of the current type
473    /// parameter `D`.
474    ///
475    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
476    ///
477    /// # Panics
478    ///
479    /// When using this API the returned value is disconnected from `&self` and
480    /// the lifetime binding the `self` argument. An `Accessor` only works
481    /// within the context of the closure or async closure that it was
482    /// originally given to, however. This means that due to the fact that the
483    /// returned value has no lifetime connection it's possible to use the
484    /// accessor outside of `&self`, the original accessor, and panic.
485    ///
486    /// The returned value should only be used within the scope of the original
487    /// `Accessor` that `self` refers to.
488    pub fn with_getter<D2: HasData>(
489        &self,
490        get_data: fn(&mut T) -> D2::Data<'_>,
491    ) -> Accessor<T, D2> {
492        Accessor {
493            token: self.token,
494            get_data,
495        }
496    }
497
498    /// Spawn a background task which will receive an `&Accessor<T, D>` and
499    /// run concurrently with any other tasks in progress for the current
500    /// store.
501    ///
502    /// This is particularly useful for host functions which return a `stream`
503    /// or `future` such that the code to write to the write end of that
504    /// `stream` or `future` must run after the function returns.
505    ///
506    /// The returned [`JoinHandle`] may be used to cancel the task.
507    ///
508    /// # Panics
509    ///
510    /// Panics if called within a closure provided to the [`Accessor::with`]
511    /// function. This can only be called outside an active invocation of
512    /// [`Accessor::with`].
513    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514    where
515        T: 'static,
516    {
517        let accessor = self.clone_for_spawn();
518        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519    }
520
521    fn clone_for_spawn(&self) -> Self {
522        Self {
523            token: self.token,
524            get_data: self.get_data,
525        }
526    }
527
528    /// Polls to see if this store contains any "interesting" tasks still within
529    /// it.
530    ///
531    /// Returns `Poll::Ready(())` if there are no more interesting tasks, and
532    /// otherwise returns `Poll::Pending`. If pending is returned then whenever
533    /// the last remaining "interesting" task has exited the provided context's
534    /// waker will be notified. Note that only the waker passed to the last call
535    /// to `poll_no_interesting_tasks` will be notified, so this is only
536    /// appropriate to use one-at-a-time.
537    ///
538    /// The component model specification, as of this current date, does not
539    /// have a distinction between "interesting" tasks and not. The current
540    /// intention is that in a future revision of the component model this will
541    /// be distinguished at the component ABI level where tasks will be able to
542    /// flag themselves as "interesting" optionally. Additionally extra work can
543    /// be opted-in to being "interesting".
544    ///
545    /// For now what this means is that all component model tasks within this
546    /// store are considered interesting. This specifically includes the entire
547    /// duration of a task, so even all of the time after a task has returned
548    /// but before it has exited. This means that this function is, today,
549    /// effectively a proxy for "are there any more tasks still running in this
550    /// store". This can be used by embedders to determine whether there's any
551    /// more work going on, even in the background, for a particular guest.
552    /// Hosts can use this as a signal that the guest wants to stay alive a
553    /// little longer, even after a task has returned.
554    ///
555    /// In the future this predicate won't include all tasks in this store. Some
556    /// tasks will be able to flag themselves as not interesting, meaning that
557    /// when this returns ready it'd be possible that there are still tasks
558    /// remaining in the store.
559    ///
560    /// Note that at this time spawned threads within a task are always
561    /// considered uninteresting. If this function returns ready, then spawned
562    /// threads may still be in the store.
563    pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
564        self.with(|mut access| {
565            let store = access.as_context_mut().0;
566            let state = store.concurrent_state_mut();
567            if state.interesting_tasks == 0 {
568                Poll::Ready(())
569            } else {
570                state.interesting_tasks_empty_waker = Some(cx.waker().clone());
571                Poll::Pending
572            }
573        })
574    }
575}
576
577/// Represents a task which may be provided to `Accessor::spawn`,
578/// `Accessor::forward`, or `StorecContextMut::spawn`.
579// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
580// option.
581//
582// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
583// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
584// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
585// Send + Sync + 'static` fails with a type mismatch error when we try to pass
586// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
587// best we can do for the time being.
588pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
589where
590    D: HasData + ?Sized,
591{
592    /// Run the task.
593    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
594}
595
596/// Represents parameter and result metadata for the caller side of a
597/// guest->guest call orchestrated by a fused adapter.
598enum CallerInfo {
599    /// Metadata for a call to an async-lowered import
600    Async {
601        params: Vec<ValRaw>,
602        has_result: bool,
603    },
604    /// Metadata for a call to an sync-lowered import
605    Sync {
606        params: Vec<ValRaw>,
607        result_count: u32,
608    },
609}
610
611/// Indicates how a guest task is waiting on a waitable set.
612enum WaitMode {
613    /// The guest task is waiting using `task.wait`
614    Fiber(StoreFiber<'static>),
615    /// The guest task is waiting via a callback declared as part of an
616    /// async-lifted export.
617    Callback(Instance),
618}
619
620/// Represents the reason a fiber is suspending itself.
621#[derive(Debug)]
622enum SuspendReason {
623    /// The fiber is waiting for an event to be delivered to the specified
624    /// waitable set or task.
625    Waiting {
626        set: TableId<WaitableSet>,
627        thread: QualifiedThreadId,
628        skip_may_block_check: bool,
629    },
630    /// The fiber has finished handling its most recent work item and is waiting
631    /// for another (or to be dropped if it is no longer needed).
632    NeedWork,
633    /// The fiber is yielding and should be resumed once other tasks have had a
634    /// chance to run.
635    Yielding {
636        thread: QualifiedThreadId,
637        skip_may_block_check: bool,
638    },
639    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
640    ExplicitlySuspending {
641        thread: QualifiedThreadId,
642        skip_may_block_check: bool,
643    },
644}
645
646/// Represents a pending call into guest code for a given guest task.
647enum GuestCallKind {
648    /// Indicates there's an event to deliver to the task, possibly related to a
649    /// waitable set the task has been waiting on or polling.
650    DeliverEvent {
651        /// The instance to which the task belongs.
652        instance: Instance,
653        /// The waitable set the event belongs to, if any.
654        ///
655        /// If this is `None` the event will be waiting in the
656        /// `GuestTask::event` field for the task.
657        set: Option<TableId<WaitableSet>>,
658    },
659    /// Indicates that a new guest task call is pending and may be executed
660    /// using the specified closure.
661    ///
662    /// If the closure returns `Ok(Some(call))`, the `call` should be run
663    /// immediately using `handle_guest_call`.
664    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
665    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
666}
667
668impl fmt::Debug for GuestCallKind {
669    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
670        match self {
671            Self::DeliverEvent { instance, set } => f
672                .debug_struct("DeliverEvent")
673                .field("instance", instance)
674                .field("set", set)
675                .finish(),
676            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
677            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
678        }
679    }
680}
681
682/// The target of a suspension intrinsic.
683#[derive(Copy, Clone, Debug)]
684pub enum SuspensionTarget {
685    SomeSuspended(u32),
686    Some(u32),
687    None,
688}
689
690impl SuspensionTarget {
691    fn is_none(&self) -> bool {
692        matches!(self, SuspensionTarget::None)
693    }
694    fn is_some(&self) -> bool {
695        !self.is_none()
696    }
697}
698
699/// Represents a pending call into guest code for a given guest thread.
700#[derive(Debug)]
701struct GuestCall {
702    thread: QualifiedThreadId,
703    kind: GuestCallKind,
704}
705
706impl GuestCall {
707    /// Returns whether or not the call is ready to run.
708    ///
709    /// A call will not be ready to run if either:
710    ///
711    /// - the (sub-)component instance to be called has already been entered and
712    /// cannot be reentered until an in-progress call completes
713    ///
714    /// - the call is for a not-yet started task and the (sub-)component
715    /// instance to be called has backpressure enabled
716    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
717        let instance = store
718            .concurrent_state_mut()
719            .get_mut(self.thread.task)?
720            .instance;
721        let state = store.instance_state(instance).concurrent_state();
722
723        let ready = match &self.kind {
724            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
725            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
726            GuestCallKind::StartExplicit(_) => true,
727        };
728        log::trace!(
729            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
730            state.do_not_enter,
731            state.backpressure
732        );
733        Ok(ready)
734    }
735}
736
737/// Job to be run on a worker fiber.
738enum WorkerItem {
739    GuestCall(GuestCall),
740    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
741}
742
743/// Represents a pending work item to be handled by the event loop for a given
744/// component instance.
745enum WorkItem {
746    /// A host task to be pushed to `ConcurrentState::futures`.
747    PushFuture(AlwaysMut<HostTaskFuture>),
748    /// A fiber to resume.
749    ResumeFiber(StoreFiber<'static>),
750    /// A thread to resume.
751    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
752    /// A pending call into guest code for a given guest task.
753    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
754    /// A job to run on a worker fiber.
755    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
756}
757
758impl fmt::Debug for WorkItem {
759    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
760        match self {
761            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
762            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
763            Self::ResumeThread(instance, thread) => f
764                .debug_tuple("ResumeThread")
765                .field(instance)
766                .field(thread)
767                .finish(),
768            Self::GuestCall(instance, call) => f
769                .debug_tuple("GuestCall")
770                .field(instance)
771                .field(call)
772                .finish(),
773            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
774        }
775    }
776}
777
778/// Whether a suspension intrinsic was cancelled or completed
779#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
780pub(crate) enum WaitResult {
781    Cancelled,
782    Completed,
783}
784
785/// Poll the specified future until it completes on behalf of a guest->host call
786/// using a sync-lowered import.
787///
788/// This is similar to `Instance::first_poll` except it's for sync-lowered
789/// imports, meaning we don't need to handle cancellation and we can block the
790/// caller until the task completes, at which point the caller can handle
791/// lowering the result to the guest's stack and linear memory.
792pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
793    store: &mut dyn VMStore,
794    future: impl Future<Output = Result<R>> + Send + 'static,
795) -> Result<R> {
796    let state = store.concurrent_state_mut();
797    let task = state.current_host_thread()?;
798
799    // Wrap the future in a closure which will take care of stashing the result
800    // in `GuestTask::result` and resuming this fiber when the host task
801    // completes.
802    let mut future = Box::pin(async move {
803        let result = future.await?;
804        tls::get(move |store| {
805            let state = store.concurrent_state_mut();
806            let host_state = &mut state.get_mut(task)?.state;
807            assert!(matches!(host_state, HostTaskState::CalleeStarted));
808            *host_state = HostTaskState::CalleeFinished(Box::new(result));
809
810            Waitable::Host(task).set_event(
811                state,
812                Some(Event::Subtask {
813                    status: Status::Returned,
814                }),
815            )?;
816
817            Ok(())
818        })
819    }) as HostTaskFuture;
820
821    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
822    // add the future to `ConcurrentState::futures` and poll it automatically
823    // from the event loop if it doesn't complete immediately here.
824    let poll = tls::set(store, || {
825        future
826            .as_mut()
827            .poll(&mut Context::from_waker(&Waker::noop()))
828    });
829
830    match poll {
831        // It completed immediately; check the result and delete the task.
832        Poll::Ready(result) => result?,
833
834        // It did not complete immediately; add it to
835        // `ConcurrentState::futures` so it will be polled via the event loop;
836        // then use `GuestThread::sync_call_set` to wait for the task to
837        // complete, suspending the current fiber until it does so.
838        Poll::Pending => {
839            let state = store.concurrent_state_mut();
840            state.push_future(future);
841
842            let caller = state.get_mut(task)?.caller;
843            let set = state.get_mut(caller.thread)?.sync_call_set;
844            Waitable::Host(task).join(state, Some(set))?;
845
846            store.suspend(SuspendReason::Waiting {
847                set,
848                thread: caller,
849                skip_may_block_check: false,
850            })?;
851
852            // Remove the `task` from the `sync_call_set` to ensure that when
853            // this function returns and the task is deleted that there are no
854            // more lingering references to this host task.
855            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
856        }
857    }
858
859    // Retrieve and return the result.
860    let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
861    match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
862        HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
863            Ok(result) => *result,
864            Err(_) => bail_bug!("host task finished with wrong type of result"),
865        }),
866        _ => bail_bug!("unexpected host task state after completion"),
867    }
868}
869
870/// Execute the specified guest call.
871fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
872    let mut next = Some(call);
873    while let Some(call) = next.take() {
874        match call.kind {
875            GuestCallKind::DeliverEvent { instance, set } => {
876                let (event, waitable) =
877                    match instance.get_event(store, call.thread.task, set, true)? {
878                        Some(pair) => pair,
879                        None => bail_bug!("delivering non-present event"),
880                    };
881                let state = store.concurrent_state_mut();
882                let task = state.get_mut(call.thread.task)?;
883                let runtime_instance = task.instance;
884                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
885
886                log::trace!(
887                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
888                    call.thread,
889                );
890
891                let old_thread = store.set_thread(call.thread)?;
892                log::trace!(
893                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
894                    call.thread
895                );
896
897                store.enter_instance(runtime_instance);
898
899                let Some(callback) = store
900                    .concurrent_state_mut()
901                    .get_mut(call.thread.task)?
902                    .callback
903                    .take()
904                else {
905                    bail_bug!("guest task callback field not present")
906                };
907
908                let code = callback(store, event, handle)?;
909
910                store
911                    .concurrent_state_mut()
912                    .get_mut(call.thread.task)?
913                    .callback = Some(callback);
914
915                store.exit_instance(runtime_instance)?;
916
917                store.set_thread(old_thread)?;
918
919                next = instance.handle_callback_code(
920                    store,
921                    call.thread,
922                    runtime_instance.index,
923                    code,
924                )?;
925
926                log::trace!(
927                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
928                );
929            }
930            GuestCallKind::StartImplicit(fun) => {
931                next = fun(store)?;
932            }
933            GuestCallKind::StartExplicit(fun) => {
934                fun(store)?;
935            }
936        }
937    }
938
939    Ok(())
940}
941
942impl<T> Store<T> {
943    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
944    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
945    where
946        T: Send + 'static,
947    {
948        ensure!(
949            self.as_context().0.concurrency_support(),
950            "cannot use `run_concurrent` when Config::concurrency_support disabled",
951        );
952        self.as_context_mut().run_concurrent(fun).await
953    }
954
955    #[doc(hidden)]
956    pub fn assert_concurrent_state_empty(&mut self) {
957        self.as_context_mut().assert_concurrent_state_empty();
958    }
959
960    #[doc(hidden)]
961    pub fn concurrent_state_table_size(&mut self) -> usize {
962        self.as_context_mut().concurrent_state_table_size()
963    }
964
965    /// Convenience wrapper for [`StoreContextMut::spawn`].
966    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
967    where
968        T: 'static,
969    {
970        self.as_context_mut().spawn(task)
971    }
972}
973
974impl<T> StoreContextMut<'_, T> {
975    /// Assert that all the relevant tables and queues in the concurrent state
976    /// for this store are empty.
977    ///
978    /// This is for sanity checking in integration tests
979    /// (e.g. `component-async-tests`) that the relevant state has been cleared
980    /// after each test concludes.  This should help us catch leaks, e.g. guest
981    /// tasks which haven't been deleted despite having completed and having
982    /// been dropped by their supertasks.
983    ///
984    /// Only intended for use in Wasmtime's own testing.
985    #[doc(hidden)]
986    pub fn assert_concurrent_state_empty(self) {
987        let store = self.0;
988        store
989            .store_data_mut()
990            .components
991            .assert_instance_states_empty();
992        let state = store.concurrent_state_mut();
993        assert!(
994            state.table.get_mut().is_empty(),
995            "non-empty table: {:?}",
996            state.table.get_mut()
997        );
998        assert!(state.high_priority.is_empty());
999        assert!(state.low_priority.is_empty());
1000        assert!(state.current_thread.is_none());
1001        assert!(state.futures_mut().unwrap().is_empty());
1002        assert!(state.global_error_context_ref_counts.is_empty());
1003    }
1004
1005    /// Helper function to perform tests over the size of the concurrent state
1006    /// table which can be useful for detecting leaks.
1007    ///
1008    /// Only intended for use in Wasmtime's own testing.
1009    #[doc(hidden)]
1010    pub fn concurrent_state_table_size(&mut self) -> usize {
1011        self.0
1012            .concurrent_state_mut()
1013            .table
1014            .get_mut()
1015            .iter_mut()
1016            .count()
1017    }
1018
1019    /// Spawn a background task to run as part of this instance's event loop.
1020    ///
1021    /// The task will receive an `&Accessor<U>` and run concurrently with
1022    /// any other tasks in progress for the instance.
1023    ///
1024    /// Note that the task will only make progress if and when the event loop
1025    /// for this instance is run.
1026    ///
1027    /// The returned [`JoinHandle`] may be used to cancel the task.
1028    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1029    where
1030        T: 'static,
1031    {
1032        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1033        self.spawn_with_accessor(accessor, task)
1034    }
1035
1036    /// Internal implementation of `spawn` functions where a `store` is
1037    /// available along with an `Accessor`.
1038    fn spawn_with_accessor<D>(
1039        self,
1040        accessor: Accessor<T, D>,
1041        task: impl AccessorTask<T, D>,
1042    ) -> JoinHandle
1043    where
1044        T: 'static,
1045        D: HasData + ?Sized,
1046    {
1047        // Create an "abortable future" here where internally the future will
1048        // hook calls to poll and possibly spawn more background tasks on each
1049        // iteration.
1050        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1051        self.0
1052            .concurrent_state_mut()
1053            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1054        handle
1055    }
1056
1057    /// Run the specified closure `fun` to completion as part of this store's
1058    /// event loop.
1059    ///
1060    /// This will run `fun` as part of this store's event loop until it
1061    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1062    /// controlled access to the store and its data.
1063    ///
1064    /// This function can be used to invoke [`Func::call_concurrent`] for
1065    /// example within the async closure provided here.
1066    ///
1067    /// This function will unconditionally return an error if
1068    /// [`Config::concurrency_support`] is disabled.
1069    ///
1070    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1071    ///
1072    /// # Store-blocking behavior
1073    ///
1074    /// At this time there are certain situations in which the `Future` returned
1075    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1076    /// extended period of time, despite one or more `Waker::wake` events having
1077    /// occurred for the task to which it belongs.  This can manifest as the
1078    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1079    /// the `Store` being held by e.g. a blocking host function, preventing the
1080    /// `Future` from being polled. A canonical example of this is when the
1081    /// `fun` provided to this function attempts to set a timeout for an
1082    /// invocation of a wasm function. In this situation the async closure is
1083    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1084    /// to elapse. At this time this setup will not always work and the timeout
1085    /// may not reliably fire.
1086    ///
1087    /// This function will not block the current thread and as such is always
1088    /// suitable to run in an `async` context, but the current implementation of
1089    /// Wasmtime can lead to situations where a certain wasm computation is
1090    /// required to make progress the closure to make progress. This is an
1091    /// artifact of Wasmtime's historical implementation of `async` functions
1092    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1093    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1094    /// progress for a readiness notification of (b) to get delivered.
1095    ///
1096    /// This effectively means that it's not possible to reliably perform a
1097    /// "select" operation within the `fun` closure, which timeouts for example
1098    /// are based on. Fixing this requires some relatively major refactoring
1099    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1100    /// that is intended to be fixed one day. In the meantime it's recommended
1101    /// to apply timeouts or such to the entire `run_concurrent` call itself
1102    /// rather than internally.
1103    ///
1104    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1105    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1106    ///
1107    /// # Example
1108    ///
1109    /// ```
1110    /// # use {
1111    /// #   wasmtime::{
1112    /// #     error::{Result},
1113    /// #     component::{ Component, Linker, Resource, ResourceTable},
1114    /// #     Config, Engine, Store
1115    /// #   },
1116    /// # };
1117    /// #
1118    /// # struct MyResource(u32);
1119    /// # struct Ctx { table: ResourceTable }
1120    /// #
1121    /// # async fn foo() -> Result<()> {
1122    /// # let mut config = Config::new();
1123    /// # let engine = Engine::new(&config)?;
1124    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1125    /// # let mut linker = Linker::new(&engine);
1126    /// # let component = Component::new(&engine, "")?;
1127    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1128    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1129    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1130    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1131    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1132    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1133    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1134    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1135    ///    Ok(())
1136    /// }).await??;
1137    /// # Ok(())
1138    /// # }
1139    /// ```
1140    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1141    where
1142        T: Send + 'static,
1143    {
1144        ensure!(
1145            self.0.concurrency_support(),
1146            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1147        );
1148        self.do_run_concurrent(fun, false).await
1149    }
1150
1151    pub(super) async fn run_concurrent_trap_on_idle<R>(
1152        self,
1153        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1154    ) -> Result<R>
1155    where
1156        T: Send + 'static,
1157    {
1158        self.do_run_concurrent(fun, true).await
1159    }
1160
1161    async fn do_run_concurrent<R>(
1162        mut self,
1163        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1164        trap_on_idle: bool,
1165    ) -> Result<R>
1166    where
1167        T: Send + 'static,
1168    {
1169        debug_assert!(self.0.concurrency_support());
1170        check_recursive_run();
1171        let token = StoreToken::new(self.as_context_mut());
1172
1173        struct Dropper<'a, T: 'static, V> {
1174            store: StoreContextMut<'a, T>,
1175            value: ManuallyDrop<V>,
1176        }
1177
1178        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1179            fn drop(&mut self) {
1180                tls::set(self.store.0, || {
1181                    // SAFETY: Here we drop the value without moving it for the
1182                    // first and only time -- per the contract for `Drop::drop`,
1183                    // this code won't run again, and the `value` field will no
1184                    // longer be accessible.
1185                    unsafe { ManuallyDrop::drop(&mut self.value) }
1186                });
1187            }
1188        }
1189
1190        let accessor = &Accessor::new(token);
1191        let dropper = &mut Dropper {
1192            store: self,
1193            value: ManuallyDrop::new(fun(accessor)),
1194        };
1195        // SAFETY: We never move `dropper` nor its `value` field.
1196        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1197
1198        dropper
1199            .store
1200            .as_context_mut()
1201            .poll_until(future, trap_on_idle)
1202            .await
1203    }
1204
1205    /// Run this store's event loop.
1206    ///
1207    /// The returned future will resolve when the specified future completes or,
1208    /// if `trap_on_idle` is true, when the event loop can't make further
1209    /// progress.
1210    async fn poll_until<R>(
1211        mut self,
1212        mut future: Pin<&mut impl Future<Output = R>>,
1213        trap_on_idle: bool,
1214    ) -> Result<R>
1215    where
1216        T: Send + 'static,
1217    {
1218        struct Reset<'a, T: 'static> {
1219            store: StoreContextMut<'a, T>,
1220            futures: Option<FuturesUnordered<HostTaskFuture>>,
1221        }
1222
1223        impl<'a, T> Drop for Reset<'a, T> {
1224            fn drop(&mut self) {
1225                if let Some(futures) = self.futures.take() {
1226                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1227                }
1228            }
1229        }
1230
1231        loop {
1232            // Take `ConcurrentState::futures` out of the store so we can poll
1233            // it while also safely giving any of the futures inside access to
1234            // `self`.
1235            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1236            let mut reset = Reset {
1237                store: self.as_context_mut(),
1238                futures,
1239            };
1240            let mut next = match reset.futures.as_mut() {
1241                Some(f) => pin!(f.next()),
1242                None => bail_bug!("concurrent state missing futures field"),
1243            };
1244
1245            enum PollResult<R> {
1246                Complete(R),
1247                ProcessWork {
1248                    ready: Vec<WorkItem>,
1249                    low_priority: bool,
1250                },
1251            }
1252
1253            let result = future::poll_fn(|cx| {
1254                // First, poll the future we were passed as an argument and
1255                // return immediately if it's ready.
1256                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1257                    return Poll::Ready(Ok(PollResult::Complete(value)));
1258                }
1259
1260                // Next, poll `ConcurrentState::futures` (which includes any
1261                // pending host tasks and/or background tasks), returning
1262                // immediately if one of them fails.
1263                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1264                    Poll::Ready(Some(output)) => {
1265                        match output {
1266                            Err(e) => return Poll::Ready(Err(e)),
1267                            Ok(()) => {}
1268                        }
1269                        Poll::Ready(true)
1270                    }
1271                    Poll::Ready(None) => Poll::Ready(false),
1272                    Poll::Pending => Poll::Pending,
1273                };
1274
1275                // Next, collect the next batch of work items to process, if
1276                // any.  This will be either all of the high-priority work
1277                // items, or if there are none, a single low-priority work item.
1278                let state = reset.store.0.concurrent_state_mut();
1279                let mut ready = mem::take(&mut state.high_priority);
1280                let mut low_priority = false;
1281                if ready.is_empty() {
1282                    if let Some(item) = state.low_priority.pop_back() {
1283                        ready.push(item);
1284                        low_priority = true;
1285                    }
1286                }
1287                if !ready.is_empty() {
1288                    return Poll::Ready(Ok(PollResult::ProcessWork {
1289                        ready,
1290                        low_priority,
1291                    }));
1292                }
1293
1294                // Finally, if we have nothing else to do right now, determine what to do
1295                // based on whether there are any pending futures in
1296                // `ConcurrentState::futures`.
1297                return match next {
1298                    Poll::Ready(true) => {
1299                        // In this case, one of the futures in
1300                        // `ConcurrentState::futures` completed
1301                        // successfully, so we return now and continue
1302                        // the outer loop in case there is another one
1303                        // ready to complete.
1304                        Poll::Ready(Ok(PollResult::ProcessWork {
1305                            ready: Vec::new(),
1306                            low_priority: false,
1307                        }))
1308                    }
1309                    Poll::Ready(false) => {
1310                        // Poll the future we were passed one last time
1311                        // in case one of `ConcurrentState::futures` had
1312                        // the side effect of unblocking it.
1313                        if let Poll::Ready(value) =
1314                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1315                        {
1316                            Poll::Ready(Ok(PollResult::Complete(value)))
1317                        } else {
1318                            // In this case, there are no more pending
1319                            // futures in `ConcurrentState::futures`,
1320                            // there are no remaining work items, _and_
1321                            // the future we were passed as an argument
1322                            // still hasn't completed.
1323                            if trap_on_idle {
1324                                // `trap_on_idle` is true, so we exit
1325                                // immediately.
1326                                Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1327                            } else {
1328                                // `trap_on_idle` is false, so we assume
1329                                // that future will wake up and give us
1330                                // more work to do when it's ready to.
1331                                Poll::Pending
1332                            }
1333                        }
1334                    }
1335                    // There is at least one pending future in
1336                    // `ConcurrentState::futures` and we have nothing
1337                    // else to do but wait for now, so we return
1338                    // `Pending`.
1339                    Poll::Pending => Poll::Pending,
1340                };
1341            })
1342            .await;
1343
1344            // Put the `ConcurrentState::futures` back into the store before we
1345            // return or handle any work items since one or more of those items
1346            // might append more futures.
1347            drop(reset);
1348
1349            match result? {
1350                // The future we were passed as an argument completed, so we
1351                // return the result.
1352                PollResult::Complete(value) => break Ok(value),
1353                // The future we were passed has not yet completed, so handle
1354                // any work items and then loop again.
1355                PollResult::ProcessWork {
1356                    ready,
1357                    low_priority,
1358                } => {
1359                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1360                        store: StoreContextMut<'a, T>,
1361                        ready: I,
1362                    }
1363
1364                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1365                        fn drop(&mut self) {
1366                            while let Some(item) = self.ready.next() {
1367                                match item {
1368                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1369                                    WorkItem::PushFuture(future) => {
1370                                        tls::set(self.store.0, move || drop(future))
1371                                    }
1372                                    _ => {}
1373                                }
1374                            }
1375                        }
1376                    }
1377
1378                    let mut dispose = Dispose {
1379                        store: self.as_context_mut(),
1380                        ready: ready.into_iter(),
1381                    };
1382
1383                    // If we're about to run a low-priority task, first yield to
1384                    // the executor.  This ensures that it won't be starved of
1385                    // the ability to e.g. update the readiness of sockets,
1386                    // etc. which the guest may be using `thread.yield` along
1387                    // with `waitable-set.poll` to monitor in a CPU-heavy loop.
1388                    //
1389                    // This works for e.g. `thread.yield` and callbacks which
1390                    // return `CALLBACK_CODE_YIELD` because we queue a low
1391                    // priority item to resume the task (i.e. resume the thread
1392                    // or call the callback, respectively) just prior to
1393                    // suspending it.  Indeed, as of this writing those are the
1394                    // _only_ situations we queue low-priority tasks.
1395                    // Therefore, we interpret the guest's request to yield as
1396                    // meaning "yield to other guest tasks _and_/_or_ host
1397                    // operations such as updating socket readiness", the latter
1398                    // being the async runtime's responsibility.
1399                    //
1400                    // In the future, if this ends up causing measurable
1401                    // performance issues, this could be optimized such that we
1402                    // only yield periodically (e.g. for batches of low priority
1403                    // items) and not for each and every idividual item.
1404                    if low_priority {
1405                        dispose.store.0.yield_now().await
1406                    }
1407
1408                    while let Some(item) = dispose.ready.next() {
1409                        dispose
1410                            .store
1411                            .as_context_mut()
1412                            .handle_work_item(item)
1413                            .await?;
1414                    }
1415                }
1416            }
1417        }
1418    }
1419
1420    /// Handle the specified work item, possibly resuming a fiber if applicable.
1421    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1422    where
1423        T: Send,
1424    {
1425        log::trace!("handle work item {item:?}");
1426        match item {
1427            WorkItem::PushFuture(future) => {
1428                self.0
1429                    .concurrent_state_mut()
1430                    .futures_mut()?
1431                    .push(future.into_inner());
1432            }
1433            WorkItem::ResumeFiber(fiber) => {
1434                self.0.resume_fiber(fiber).await?;
1435            }
1436            WorkItem::ResumeThread(_, thread) => {
1437                if let GuestThreadState::Ready(fiber) = mem::replace(
1438                    &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1439                    GuestThreadState::Running,
1440                ) {
1441                    self.0.resume_fiber(fiber).await?;
1442                } else {
1443                    bail_bug!("cannot resume non-pending thread {thread:?}");
1444                }
1445            }
1446            WorkItem::GuestCall(_, call) => {
1447                if call.is_ready(self.0)? {
1448                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1449                } else {
1450                    let state = self.0.concurrent_state_mut();
1451                    let task = state.get_mut(call.thread.task)?;
1452                    if !task.starting_sent {
1453                        task.starting_sent = true;
1454                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1455                            Waitable::Guest(call.thread.task).set_event(
1456                                state,
1457                                Some(Event::Subtask {
1458                                    status: Status::Starting,
1459                                }),
1460                            )?;
1461                        }
1462                    }
1463
1464                    let instance = state.get_mut(call.thread.task)?.instance;
1465                    self.0
1466                        .instance_state(instance)
1467                        .concurrent_state()
1468                        .pending
1469                        .insert(call.thread, call.kind);
1470                }
1471            }
1472            WorkItem::WorkerFunction(fun) => {
1473                self.run_on_worker(WorkerItem::Function(fun)).await?;
1474            }
1475        }
1476
1477        Ok(())
1478    }
1479
1480    /// Execute the specified guest call on a worker fiber.
1481    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1482    where
1483        T: Send,
1484    {
1485        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1486            fiber
1487        } else {
1488            fiber::make_fiber(self.0, move |store| {
1489                loop {
1490                    let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1491                        bail_bug!("worker_item not present when resuming fiber")
1492                    };
1493                    match item {
1494                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1495                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1496                    }
1497
1498                    store.suspend(SuspendReason::NeedWork)?;
1499                }
1500            })?
1501        };
1502
1503        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1504        assert!(worker_item.is_none());
1505        *worker_item = Some(item);
1506
1507        self.0.resume_fiber(worker).await
1508    }
1509
1510    /// Wrap the specified host function in a future which will call it, passing
1511    /// it an `&Accessor<T>`.
1512    ///
1513    /// See the `Accessor` documentation for details.
1514    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1515    where
1516        T: 'static,
1517        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1518            + Send
1519            + Sync
1520            + 'static,
1521        R: Send + Sync + 'static,
1522    {
1523        let token = StoreToken::new(self);
1524        async move {
1525            let mut accessor = Accessor::new(token);
1526            closure(&mut accessor).await
1527        }
1528    }
1529}
1530
1531impl StoreOpaque {
1532    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1533    /// guest-to-guest call or a sync host-to-guest call.
1534    ///
1535    /// This task will only be used for the purpose of handling calls to
1536    /// intrinsic functions; both parameter lowering and result lifting are
1537    /// assumed to be taken care of elsewhere.
1538    pub(crate) fn enter_guest_sync_call(
1539        &mut self,
1540        guest_caller: Option<RuntimeInstance>,
1541        callee_async: bool,
1542        callee: RuntimeInstance,
1543    ) -> Result<()> {
1544        log::trace!("enter sync call {callee:?}");
1545        if !self.concurrency_support() {
1546            return self.enter_call_not_concurrent();
1547        }
1548
1549        let state = self.concurrent_state_mut();
1550        let thread = state.current_thread;
1551        let instance = if let Some(thread) = thread.guest() {
1552            Some(state.get_mut(thread.task)?.instance)
1553        } else {
1554            None
1555        };
1556        if guest_caller.is_some() {
1557            debug_assert_eq!(instance, guest_caller);
1558        }
1559        let guest_thread = GuestTask::new(
1560            state,
1561            Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1562            LiftResult {
1563                lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1564                ty: TypeTupleIndex::reserved_value(),
1565                memory: None,
1566                string_encoding: StringEncoding::Utf8,
1567            },
1568            if let Some(thread) = thread.guest() {
1569                Caller::Guest { thread: *thread }
1570            } else {
1571                Caller::Host {
1572                    tx: None,
1573                    host_future_present: false,
1574                    caller: thread,
1575                }
1576            },
1577            None,
1578            callee,
1579            callee_async,
1580        )?;
1581
1582        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1583            guest_thread.thread,
1584            self,
1585            callee.index,
1586        )?;
1587        self.set_thread(guest_thread)?;
1588
1589        Ok(())
1590    }
1591
1592    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1593    pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1594        if !self.concurrency_support() {
1595            return Ok(self.exit_call_not_concurrent());
1596        }
1597        let thread = match self.set_thread(CurrentThread::None)?.guest() {
1598            Some(t) => *t,
1599            None => bail_bug!("expected task when exiting"),
1600        };
1601        let task = self.concurrent_state_mut().get_mut(thread.task)?;
1602        let instance = task.instance;
1603        let caller = match &task.caller {
1604            &Caller::Guest { thread } => thread.into(),
1605            &Caller::Host { caller, .. } => caller,
1606        };
1607        task.lift_result = None;
1608        task.exited = true;
1609        self.set_thread(caller)?;
1610
1611        log::trace!("exit sync call {instance:?}");
1612        self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1613
1614        Ok(())
1615    }
1616
1617    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1618    /// transition to the host.
1619    ///
1620    /// FIXME: this is called for all guest->host transitions and performs some
1621    /// relatively expensive table manipulations. This would ideally be
1622    /// optimized to avoid the full allocation of a `HostTask` in at least some
1623    /// situations.
1624    pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1625        if !self.concurrency_support() {
1626            self.enter_call_not_concurrent()?;
1627            return Ok(None);
1628        }
1629        let state = self.concurrent_state_mut();
1630        let caller = state.current_guest_thread()?;
1631        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1632        log::trace!("new host task {task:?}");
1633        self.set_thread(task)?;
1634        Ok(Some(task))
1635    }
1636
1637    /// Invoked before lowering the results of a host task to the guest.
1638    ///
1639    /// This is used to update the current thread annotations within the store
1640    /// to ensure that it reflects the guest task, not the host task, since
1641    /// lowering may execute guest code.
1642    pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1643        if !self.concurrency_support() {
1644            return Ok(());
1645        }
1646        let task = self.concurrent_state_mut().current_host_thread()?;
1647        let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1648        self.set_thread(caller)?;
1649        Ok(())
1650    }
1651
1652    /// Dual of `host_task_create` and signifies that the host has finished and
1653    /// will be cleaned up.
1654    ///
1655    /// Note that this isn't invoked when the host is invoked asynchronously and
1656    /// the host isn't complete yet. In that situation the host task persists
1657    /// and will be cleaned up separately in `subtask_drop`
1658    pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1659        match task {
1660            Some(task) => {
1661                log::trace!("delete host task {task:?}");
1662                self.concurrent_state_mut().delete(task)?;
1663            }
1664            None => {
1665                self.exit_call_not_concurrent();
1666            }
1667        }
1668        Ok(())
1669    }
1670
1671    /// Determine whether the specified instance may be entered from the host.
1672    ///
1673    /// We return `true` here only if all of the following hold:
1674    ///
1675    /// - The top-level instance is not already on the current task's call stack.
1676    /// - The instance is not in need of a post-return function call.
1677    /// - `self` has not been poisoned due to a trap.
1678    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1679        if self.trapped() {
1680            return Ok(false);
1681        }
1682        if !self.concurrency_support() {
1683            return Ok(true);
1684        }
1685        let state = self.concurrent_state_mut();
1686        let mut cur = state.current_thread;
1687        loop {
1688            match cur {
1689                CurrentThread::None => break Ok(true),
1690                CurrentThread::Guest(thread) => {
1691                    let task = state.get_mut(thread.task)?;
1692
1693                    // Note that we only compare top-level instance IDs here.
1694                    // The idea is that the host is not allowed to recursively
1695                    // enter a top-level instance even if the specific leaf
1696                    // instance is not on the stack. This the behavior defined
1697                    // in the spec, and it allows us to elide runtime checks in
1698                    // guest-to-guest adapters.
1699                    if task.instance.instance == instance.instance {
1700                        break Ok(false);
1701                    }
1702                    cur = match task.caller {
1703                        Caller::Host { caller, .. } => caller,
1704                        Caller::Guest { thread } => thread.into(),
1705                    };
1706                }
1707                CurrentThread::Host(id) => {
1708                    cur = state.get_mut(id)?.caller.into();
1709                }
1710            }
1711        }
1712    }
1713
1714    /// Helper function to retrieve the `InstanceState` for the
1715    /// specified instance.
1716    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1717        self.component_instance_mut(instance.instance)
1718            .instance_state(instance.index)
1719    }
1720
1721    /// Configure the currently running `thread`.
1722    ///
1723    /// This will save off any state necessary for the previous thread, if
1724    /// applicable, and then it'll additionally update state for `thread` if
1725    /// needed too.
1726    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1727        let thread = thread.into();
1728        let state = self.concurrent_state_mut();
1729        let old_thread = mem::replace(&mut state.current_thread, thread);
1730
1731        // First thing to do after swapping threads is updating the context
1732        // slots for this thread within the store. This restores the behavior of
1733        // `context.{get,set}`. This involves taking the old state out of the
1734        // store, saving it in the thread that's being swapped from, and doing
1735        // the inverse for the new thread. When debug assertions are enabled
1736        // this also leaves behind sentinel values to try to uncover bugs where
1737        // this may be forgotten.
1738        if let Some(old_thread) = old_thread.guest() {
1739            let old_context = self.vm_store_context().component_context;
1740            self.concurrent_state_mut()
1741                .get_mut(old_thread.thread)?
1742                .context = old_context;
1743        }
1744        if cfg!(debug_assertions) {
1745            self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1746        }
1747        if let Some(thread) = thread.guest() {
1748            let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1749            let context = thread.context;
1750            if cfg!(debug_assertions) {
1751                thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1752            }
1753            self.vm_store_context_mut().component_context = context;
1754        }
1755
1756        // Each time we switch threads, we conservatively set `task_may_block`
1757        // to `false` for the component instance we're switching away from (if
1758        // any), meaning it will be `false` for any new thread created for that
1759        // instance unless explicitly set otherwise.
1760        //
1761        // Additionally if we're switching to a new thread, set its component
1762        // instance's `task_may_block` according to where it left off.
1763        let state = self.concurrent_state_mut();
1764        if let Some(old_thread) = old_thread.guest() {
1765            let instance = state.get_mut(old_thread.task)?.instance.instance;
1766            self.component_instance_mut(instance)
1767                .set_task_may_block(false)
1768        }
1769
1770        if thread.guest().is_some() {
1771            self.set_task_may_block()?;
1772        }
1773
1774        Ok(old_thread)
1775    }
1776
1777    /// Set the global variable representing whether the current task may block
1778    /// prior to entering Wasm code.
1779    fn set_task_may_block(&mut self) -> Result<()> {
1780        let state = self.concurrent_state_mut();
1781        let guest_thread = state.current_guest_thread()?;
1782        let instance = state.get_mut(guest_thread.task)?.instance.instance;
1783        let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1784        self.component_instance_mut(instance)
1785            .set_task_may_block(may_block);
1786        Ok(())
1787    }
1788
1789    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1790        if !self.concurrency_support() {
1791            return Ok(());
1792        }
1793        let state = self.concurrent_state_mut();
1794        let task = state.current_guest_thread()?.task;
1795        let instance = state.get_mut(task)?.instance.instance;
1796        let task_may_block = self.component_instance(instance).get_task_may_block();
1797
1798        if task_may_block {
1799            Ok(())
1800        } else {
1801            Err(Trap::CannotBlockSyncTask.into())
1802        }
1803    }
1804
1805    /// Record that we're about to enter a (sub-)component instance which does
1806    /// not support more than one concurrent, stackful activation, meaning it
1807    /// cannot be entered again until the next call returns.
1808    fn enter_instance(&mut self, instance: RuntimeInstance) {
1809        log::trace!("enter {instance:?}");
1810        self.instance_state(instance)
1811            .concurrent_state()
1812            .do_not_enter = true;
1813    }
1814
1815    /// Record that we've exited a (sub-)component instance previously entered
1816    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1817    /// See the documentation for the latter for details.
1818    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1819        log::trace!("exit {instance:?}");
1820        self.instance_state(instance)
1821            .concurrent_state()
1822            .do_not_enter = false;
1823        self.partition_pending(instance)
1824    }
1825
1826    /// Iterate over `InstanceState::pending`, moving any ready items into the
1827    /// "high priority" work item queue.
1828    ///
1829    /// See `GuestCall::is_ready` for details.
1830    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1831        for (thread, kind) in
1832            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1833        {
1834            let call = GuestCall { thread, kind };
1835            if call.is_ready(self)? {
1836                self.concurrent_state_mut()
1837                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1838            } else {
1839                self.instance_state(instance)
1840                    .concurrent_state()
1841                    .pending
1842                    .insert(call.thread, call.kind);
1843            }
1844        }
1845
1846        Ok(())
1847    }
1848
1849    /// Implements the `backpressure.{inc,dec}` intrinsics.
1850    pub(crate) fn backpressure_modify(
1851        &mut self,
1852        caller_instance: RuntimeInstance,
1853        modify: impl FnOnce(u16) -> Option<u16>,
1854    ) -> Result<()> {
1855        let state = self.instance_state(caller_instance).concurrent_state();
1856        let old = state.backpressure;
1857        let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1858        state.backpressure = new;
1859
1860        if old > 0 && new == 0 {
1861            // Backpressure was previously enabled and is now disabled; move any
1862            // newly-eligible guest calls to the "high priority" queue.
1863            self.partition_pending(caller_instance)?;
1864        }
1865
1866        Ok(())
1867    }
1868
1869    /// Resume the specified fiber, giving it exclusive access to the specified
1870    /// store.
1871    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1872        let old_thread = self.concurrent_state_mut().current_thread;
1873        log::trace!("resume_fiber: save current thread {old_thread:?}");
1874
1875        let fiber = fiber::resolve_or_release(self, fiber).await?;
1876
1877        self.set_thread(old_thread)?;
1878
1879        let state = self.concurrent_state_mut();
1880
1881        if let Some(ot) = old_thread.guest() {
1882            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1883        }
1884        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1885
1886        if let Some(mut fiber) = fiber {
1887            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1888            // See the `SuspendReason` documentation for what each case means.
1889            let reason = match state.suspend_reason.take() {
1890                Some(r) => r,
1891                None => bail_bug!("suspend reason missing when resuming fiber"),
1892            };
1893            match reason {
1894                SuspendReason::NeedWork => {
1895                    if state.worker.is_none() {
1896                        state.worker = Some(fiber);
1897                    } else {
1898                        fiber.dispose(self);
1899                    }
1900                }
1901                SuspendReason::Yielding { thread, .. } => {
1902                    state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1903                    let instance = state.get_mut(thread.task)?.instance.index;
1904                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1905                }
1906                SuspendReason::ExplicitlySuspending { thread, .. } => {
1907                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1908                }
1909                SuspendReason::Waiting { set, thread, .. } => {
1910                    let old = state
1911                        .get_mut(set)?
1912                        .waiting
1913                        .insert(thread, WaitMode::Fiber(fiber));
1914                    assert!(old.is_none());
1915                }
1916            };
1917        } else {
1918            log::trace!("resume_fiber: fiber has exited");
1919        }
1920
1921        Ok(())
1922    }
1923
1924    /// Suspend the current fiber, storing the reason in
1925    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1926    /// it should be resumed.
1927    ///
1928    /// See the `SuspendReason` documentation for details.
1929    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1930        log::trace!("suspend fiber: {reason:?}");
1931
1932        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1933        // pop the call context which manages resource borrows before suspending
1934        // and then push it again once we've resumed.
1935        let task = match &reason {
1936            SuspendReason::Yielding { thread, .. }
1937            | SuspendReason::Waiting { thread, .. }
1938            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1939            SuspendReason::NeedWork => None,
1940        };
1941
1942        let old_guest_thread = if task.is_some() {
1943            self.concurrent_state_mut().current_thread
1944        } else {
1945            CurrentThread::None
1946        };
1947
1948        // We should not have reached here unless either there's no current
1949        // task, or the current task is permitted to block.  In addition, we
1950        // special-case `thread.switch-to` and waiting for a subtask to go from
1951        // `starting` to `started`, both of which we consider non-blocking
1952        // operations despite requiring a suspend.
1953        debug_assert!(
1954            matches!(
1955                reason,
1956                SuspendReason::ExplicitlySuspending {
1957                    skip_may_block_check: true,
1958                    ..
1959                } | SuspendReason::Waiting {
1960                    skip_may_block_check: true,
1961                    ..
1962                } | SuspendReason::Yielding {
1963                    skip_may_block_check: true,
1964                    ..
1965                }
1966            ) || old_guest_thread
1967                .guest()
1968                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1969                .transpose()?
1970                .unwrap_or(true)
1971        );
1972
1973        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1974        assert!(suspend_reason.is_none());
1975        *suspend_reason = Some(reason);
1976
1977        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1978
1979        if task.is_some() {
1980            self.set_thread(old_guest_thread)?;
1981        }
1982
1983        Ok(())
1984    }
1985
1986    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1987        let state = self.concurrent_state_mut();
1988        let caller = state.current_guest_thread()?;
1989        let old_set = waitable.common(state)?.set;
1990        let set = state.get_mut(caller.thread)?.sync_call_set;
1991        waitable.join(state, Some(set))?;
1992        self.suspend(SuspendReason::Waiting {
1993            set,
1994            thread: caller,
1995            skip_may_block_check: false,
1996        })?;
1997        let state = self.concurrent_state_mut();
1998        waitable.join(state, old_set)
1999    }
2000
2001    /// Cleans up the data structures backing the `guest_thread` specified,
2002    /// removing it from the internal tables of `runtime_instance` as well.
2003    ///
2004    /// This function is used whenever a guest thread has fully exited and
2005    /// completed. This'll clean up the associated `GuestThread` structure and
2006    /// related resources it contains.
2007    ///
2008    /// Other functionality that this implements is:
2009    ///
2010    /// * This will perform conditional cleanup of the `GuestTask` that owns
2011    ///   this thread if `cleanup_task` is `CleanupTask::Yes`.
2012    /// * If there are no more threads in the `GuestTask` that this thread is
2013    ///   associated with, and if the task hasn't produced a result (e.g. it's not
2014    ///   returned or cancelled), then a trap will be raised that a result
2015    ///   wasn't ever produced.
2016    /// * If this task is finished, meaning the top-level thread exited and
2017    ///   additionally it's been returned or cancelled, then this will handle
2018    ///   management of the store's "active interesting tasks" counter.
2019    ///
2020    /// Effectively this is intended to be a "narrow waist" through which many
2021    /// destruction operations are funneled through.
2022    fn cleanup_thread(
2023        &mut self,
2024        guest_thread: QualifiedThreadId,
2025        runtime_instance: RuntimeInstance,
2026        cleanup_task: CleanupTask,
2027    ) -> Result<()> {
2028        let state = self.concurrent_state_mut();
2029        let thread_data = state.get_mut(guest_thread.thread)?;
2030        let sync_call_set = thread_data.sync_call_set;
2031        if let Some(guest_id) = thread_data.instance_rep {
2032            self.instance_state(runtime_instance)
2033                .thread_handle_table()
2034                .guest_thread_remove(guest_id)?;
2035        }
2036        let state = self.concurrent_state_mut();
2037
2038        // Clean up any pending subtasks in the sync_call_set
2039        for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2040            if let Some(Event::Subtask {
2041                status: Status::Returned | Status::ReturnCancelled,
2042            }) = waitable.common(state)?.event
2043            {
2044                waitable.delete_from(state)?;
2045            }
2046        }
2047
2048        state.delete(guest_thread.thread)?;
2049        state.delete(sync_call_set)?;
2050        let task = state.get_mut(guest_thread.task)?;
2051        task.threads.remove(&guest_thread.thread);
2052
2053        if task.threads.is_empty() && !task.returned_or_cancelled() {
2054            bail!(Trap::NoAsyncResult);
2055        }
2056        let ready_to_delete = task.ready_to_delete();
2057
2058        if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2059            task.decremented_interesting_task_count = true;
2060
2061            debug_assert!(state.interesting_tasks > 0);
2062            state.interesting_tasks -= 1;
2063            if state.interesting_tasks == 0
2064                && let Some(waker) = state.interesting_tasks_empty_waker.take()
2065            {
2066                waker.wake();
2067            }
2068        }
2069
2070        match cleanup_task {
2071            CleanupTask::Yes => {
2072                if ready_to_delete {
2073                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2074                }
2075            }
2076            CleanupTask::No => {}
2077        }
2078
2079        Ok(())
2080    }
2081
2082    /// Performs cancellation of the `guest_task` specified with the
2083    /// precondition that the task hasn't lowered its parameters.
2084    ///
2085    /// In this situation the task hasn't ever been started meaning it hasn't
2086    /// actually run any wasm code yet. This requires cleaning up metadata such
2087    /// as thread information attached to the task.
2088    ///
2089    /// The main two entrypoints for this function are:
2090    ///
2091    /// * Task cancellation via `subtask.cancel`, the intrinsic.
2092    /// * Dropping a host `call_async` future which needs to cancel the task
2093    ///   because it cannot reference its parameters any more.
2094    fn cancel_guest_subtask_without_lowered_parameters(
2095        &mut self,
2096        caller_instance: RuntimeInstance,
2097        guest_task: TableId<GuestTask>,
2098    ) -> Result<()> {
2099        let concurrent_state = self.concurrent_state_mut();
2100        let task = concurrent_state.get_mut(guest_task)?;
2101        assert!(!task.already_lowered_parameters());
2102        // The task is in a `starting` state, meaning it hasn't run at
2103        // all yet.  Here we update its fields to indicate that it is
2104        // ready to delete immediately once `subtask.drop` is called.
2105        task.lower_params = None;
2106        task.lift_result = None;
2107        task.exited = true;
2108        let instance = task.instance;
2109
2110        // Clean up the thread within this task as it's now never going
2111        // to run.
2112        assert_eq!(1, task.threads.len());
2113        let thread = *task.threads.iter().next().unwrap();
2114        self.cleanup_thread(
2115            QualifiedThreadId {
2116                task: guest_task,
2117                thread,
2118            },
2119            caller_instance,
2120            CleanupTask::No,
2121        )?;
2122
2123        // Not yet started; cancel and remove from pending
2124        let pending = &mut self.instance_state(instance).concurrent_state().pending;
2125        let pending_count = pending.len();
2126        pending.retain(|thread, _| thread.task != guest_task);
2127        // If there were no pending threads for this task, we're in an error state
2128        if pending.len() == pending_count {
2129            bail!(Trap::SubtaskCancelAfterTerminal);
2130        }
2131        Ok(())
2132    }
2133}
2134
2135enum CleanupTask {
2136    Yes,
2137    No,
2138}
2139
2140impl Instance {
2141    /// Get the next pending event for the specified task and (optional)
2142    /// waitable set, along with the waitable handle if applicable.
2143    fn get_event(
2144        self,
2145        store: &mut StoreOpaque,
2146        guest_task: TableId<GuestTask>,
2147        set: Option<TableId<WaitableSet>>,
2148        cancellable: bool,
2149    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2150        let state = store.concurrent_state_mut();
2151
2152        let event = &mut state.get_mut(guest_task)?.event;
2153        if let Some(ev) = event
2154            && (cancellable || !matches!(ev, Event::Cancelled))
2155        {
2156            log::trace!("deliver event {ev:?} to {guest_task:?}");
2157            let ev = *ev;
2158            *event = None;
2159            return Ok(Some((ev, None)));
2160        }
2161
2162        let set = match set {
2163            Some(set) => set,
2164            None => return Ok(None),
2165        };
2166        let waitable = match state.get_mut(set)?.ready.pop_first() {
2167            Some(v) => v,
2168            None => return Ok(None),
2169        };
2170
2171        let common = waitable.common(state)?;
2172        let handle = match common.handle {
2173            Some(h) => h,
2174            None => bail_bug!("handle not set when delivering event"),
2175        };
2176        let event = match common.event.take() {
2177            Some(e) => e,
2178            None => bail_bug!("event not set when delivering event"),
2179        };
2180
2181        log::trace!(
2182            "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2183        );
2184
2185        waitable.on_delivery(store, self, event)?;
2186
2187        Ok(Some((event, Some((waitable, handle)))))
2188    }
2189
2190    /// Handle the `CallbackCode` returned from an async-lifted export or its
2191    /// callback.
2192    ///
2193    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
2194    /// using `handle_guest_call`.
2195    fn handle_callback_code(
2196        self,
2197        store: &mut StoreOpaque,
2198        guest_thread: QualifiedThreadId,
2199        runtime_instance: RuntimeComponentInstanceIndex,
2200        code: u32,
2201    ) -> Result<Option<GuestCall>> {
2202        let (code, set) = unpack_callback_code(code);
2203
2204        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2205
2206        let state = store.concurrent_state_mut();
2207
2208        let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2209            let set = store
2210                .instance_state(self.runtime_instance(runtime_instance))
2211                .handle_table()
2212                .waitable_set_rep(handle)?;
2213
2214            Ok(TableId::<WaitableSet>::new(set))
2215        };
2216
2217        Ok(match code {
2218            callback_code::EXIT => {
2219                log::trace!("implicit thread {guest_thread:?} completed");
2220                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2221                task.exited = true;
2222                task.callback = None;
2223                store.cleanup_thread(
2224                    guest_thread,
2225                    self.runtime_instance(runtime_instance),
2226                    CleanupTask::Yes,
2227                )?;
2228                None
2229            }
2230            callback_code::YIELD => {
2231                let task = state.get_mut(guest_thread.task)?;
2232                // If an `Event::Cancelled` is pending, we'll deliver that;
2233                // otherwise, we'll deliver `Event::None`.  Note that
2234                // `GuestTask::event` is only ever set to one of those two
2235                // `Event` variants.
2236                if let Some(event) = task.event {
2237                    assert!(matches!(event, Event::None | Event::Cancelled));
2238                } else {
2239                    task.event = Some(Event::None);
2240                }
2241                let call = GuestCall {
2242                    thread: guest_thread,
2243                    kind: GuestCallKind::DeliverEvent {
2244                        instance: self,
2245                        set: None,
2246                    },
2247                };
2248                if state.may_block(guest_thread.task)? {
2249                    // Push this thread onto the "low priority" queue so it runs
2250                    // after any other threads have had a chance to run.
2251                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2252                    None
2253                } else {
2254                    // Yielding in a non-blocking context is defined as a no-op
2255                    // according to the spec, so we must run this thread
2256                    // immediately without allowing any others to run.
2257                    Some(call)
2258                }
2259            }
2260            callback_code::WAIT => {
2261                // The task may only return `WAIT` if it was created for a call
2262                // to an async export).  Otherwise, we'll trap.
2263                state.check_blocking_for(guest_thread.task)?;
2264
2265                let set = get_set(store, set)?;
2266                let state = store.concurrent_state_mut();
2267
2268                if state.get_mut(guest_thread.task)?.event.is_some()
2269                    || !state.get_mut(set)?.ready.is_empty()
2270                {
2271                    // An event is immediately available; deliver it ASAP.
2272                    state.push_high_priority(WorkItem::GuestCall(
2273                        runtime_instance,
2274                        GuestCall {
2275                            thread: guest_thread,
2276                            kind: GuestCallKind::DeliverEvent {
2277                                instance: self,
2278                                set: Some(set),
2279                            },
2280                        },
2281                    ));
2282                } else {
2283                    // No event is immediately available.
2284                    //
2285                    // We're waiting, so register to be woken up when an event
2286                    // is published for this waitable set.
2287                    //
2288                    // Here we also set `GuestTask::wake_on_cancel` which allows
2289                    // `subtask.cancel` to interrupt the wait.
2290                    let old = state
2291                        .get_mut(guest_thread.thread)?
2292                        .wake_on_cancel
2293                        .replace(set);
2294                    if !old.is_none() {
2295                        bail_bug!("thread unexpectedly had wake_on_cancel set");
2296                    }
2297                    let old = state
2298                        .get_mut(set)?
2299                        .waiting
2300                        .insert(guest_thread, WaitMode::Callback(self));
2301                    if !old.is_none() {
2302                        bail_bug!("set's waiting set already had this thread registered");
2303                    }
2304                }
2305                None
2306            }
2307            _ => bail!(Trap::UnsupportedCallbackCode),
2308        })
2309    }
2310
2311    /// Add the specified guest call to the "high priority" work item queue, to
2312    /// be started as soon as backpressure and/or reentrance rules allow.
2313    ///
2314    /// SAFETY: The raw pointer arguments must be valid references to guest
2315    /// functions (with the appropriate signatures) when the closures queued by
2316    /// this function are called.
2317    unsafe fn queue_call<T: 'static>(
2318        self,
2319        mut store: StoreContextMut<T>,
2320        guest_thread: QualifiedThreadId,
2321        callee: SendSyncPtr<VMFuncRef>,
2322        param_count: usize,
2323        result_count: usize,
2324        async_: bool,
2325        callback: Option<SendSyncPtr<VMFuncRef>>,
2326        post_return: Option<SendSyncPtr<VMFuncRef>>,
2327    ) -> Result<()> {
2328        /// Return a closure which will call the specified function in the scope
2329        /// of the specified task.
2330        ///
2331        /// This will use `GuestTask::lower_params` to lower the parameters, but
2332        /// will not lift the result; instead, it returns a
2333        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2334        /// any, may be lifted.  Note that an async-lifted export will have
2335        /// returned its result using the `task.return` intrinsic (or not
2336        /// returned a result at all, in the case of `task.cancel`), in which
2337        /// case the "result" of this call will either be a callback code or
2338        /// nothing.
2339        ///
2340        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2341        /// the returned closure is called.
2342        unsafe fn make_call<T: 'static>(
2343            store: StoreContextMut<T>,
2344            guest_thread: QualifiedThreadId,
2345            callee: SendSyncPtr<VMFuncRef>,
2346            param_count: usize,
2347            result_count: usize,
2348        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2349        + Send
2350        + Sync
2351        + 'static
2352        + use<T> {
2353            let token = StoreToken::new(store);
2354            move |store: &mut dyn VMStore| {
2355                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2356
2357                store
2358                    .concurrent_state_mut()
2359                    .get_mut(guest_thread.thread)?
2360                    .state = GuestThreadState::Running;
2361                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2362                let lower = match task.lower_params.take() {
2363                    Some(l) => l,
2364                    None => bail_bug!("lower_params missing"),
2365                };
2366
2367                lower(store, &mut storage[..param_count])?;
2368
2369                let mut store = token.as_context_mut(store);
2370
2371                // SAFETY: Per the contract documented in `make_call's`
2372                // documentation, `callee` must be a valid pointer.
2373                unsafe {
2374                    crate::Func::call_unchecked_raw(
2375                        &mut store,
2376                        callee.as_non_null(),
2377                        NonNull::new(
2378                            &mut storage[..param_count.max(result_count)]
2379                                as *mut [MaybeUninit<ValRaw>] as _,
2380                        )
2381                        .unwrap(),
2382                    )?;
2383                }
2384
2385                Ok(storage)
2386            }
2387        }
2388
2389        // SAFETY: Per the contract described in this function documentation,
2390        // the `callee` pointer which `call` closes over must be valid when
2391        // called by the closure we queue below.
2392        let call = unsafe {
2393            make_call(
2394                store.as_context_mut(),
2395                guest_thread,
2396                callee,
2397                param_count,
2398                result_count,
2399            )
2400        };
2401
2402        let callee_instance = store
2403            .0
2404            .concurrent_state_mut()
2405            .get_mut(guest_thread.task)?
2406            .instance;
2407
2408        let fun = if callback.is_some() {
2409            assert!(async_);
2410
2411            Box::new(move |store: &mut dyn VMStore| {
2412                self.add_guest_thread_to_instance_table(
2413                    guest_thread.thread,
2414                    store,
2415                    callee_instance.index,
2416                )?;
2417                let old_thread = store.set_thread(guest_thread)?;
2418                log::trace!(
2419                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2420                );
2421
2422                store.enter_instance(callee_instance);
2423
2424                // SAFETY: See the documentation for `make_call` to review the
2425                // contract we must uphold for `call` here.
2426                //
2427                // Per the contract described in the `queue_call`
2428                // documentation, the `callee` pointer which `call` closes
2429                // over must be valid.
2430                let storage = call(store)?;
2431
2432                store.exit_instance(callee_instance)?;
2433
2434                store.set_thread(old_thread)?;
2435                let state = store.concurrent_state_mut();
2436                if let Some(t) = old_thread.guest() {
2437                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
2438                }
2439                log::trace!("stackless call: restored {old_thread:?} as current thread");
2440
2441                // SAFETY: `wasmparser` will have validated that the callback
2442                // function returns a `i32` result.
2443                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2444
2445                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2446            })
2447                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2448        } else {
2449            let token = StoreToken::new(store.as_context_mut());
2450            Box::new(move |store: &mut dyn VMStore| {
2451                self.add_guest_thread_to_instance_table(
2452                    guest_thread.thread,
2453                    store,
2454                    callee_instance.index,
2455                )?;
2456                let old_thread = store.set_thread(guest_thread)?;
2457                log::trace!(
2458                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2459                );
2460                let flags = self.id().get(store).instance_flags(callee_instance.index);
2461
2462                // Unless this is a callback-less (i.e. stackful)
2463                // async-lifted export, we need to record that the instance
2464                // cannot be entered until the call returns.
2465                if !async_ {
2466                    store.enter_instance(callee_instance);
2467                }
2468
2469                // SAFETY: See the documentation for `make_call` to review the
2470                // contract we must uphold for `call` here.
2471                //
2472                // Per the contract described in the `queue_call`
2473                // documentation, the `callee` pointer which `call` closes
2474                // over must be valid.
2475                let storage = call(store)?;
2476
2477                if !async_ {
2478                    // This is a sync-lifted export, so now is when we lift the
2479                    // result, optionally call the post-return function, if any,
2480                    // and finally notify any current or future waiters that the
2481                    // subtask has returned.
2482
2483                    let lift = {
2484                        store.exit_instance(callee_instance)?;
2485
2486                        let state = store.concurrent_state_mut();
2487                        if !state.get_mut(guest_thread.task)?.result.is_none() {
2488                            bail_bug!("task has already produced a result");
2489                        }
2490
2491                        match state.get_mut(guest_thread.task)?.lift_result.take() {
2492                            Some(lift) => lift,
2493                            None => bail_bug!("lift_result field is missing"),
2494                        }
2495                    };
2496
2497                    // SAFETY: `result_count` represents the number of core Wasm
2498                    // results returned, per `wasmparser`.
2499                    let result = (lift.lift)(store, unsafe {
2500                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2501                            &storage[..result_count],
2502                        )
2503                    })?;
2504
2505                    let post_return_arg = match result_count {
2506                        0 => ValRaw::i32(0),
2507                        // SAFETY: `result_count` represents the number of
2508                        // core Wasm results returned, per `wasmparser`.
2509                        1 => unsafe { storage[0].assume_init() },
2510                        _ => unreachable!(),
2511                    };
2512
2513                    unsafe {
2514                        call_post_return(
2515                            token.as_context_mut(store),
2516                            post_return.map(|v| v.as_non_null()),
2517                            post_return_arg,
2518                            flags,
2519                        )?;
2520                    }
2521
2522                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2523                }
2524
2525                store.set_thread(old_thread)?;
2526
2527                store
2528                    .concurrent_state_mut()
2529                    .get_mut(guest_thread.task)?
2530                    .exited = true;
2531
2532                // This is a callback-less call, so the implicit thread has now completed
2533                store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2534                Ok(None)
2535            })
2536        };
2537
2538        store
2539            .0
2540            .concurrent_state_mut()
2541            .push_high_priority(WorkItem::GuestCall(
2542                callee_instance.index,
2543                GuestCall {
2544                    thread: guest_thread,
2545                    kind: GuestCallKind::StartImplicit(fun),
2546                },
2547            ));
2548
2549        Ok(())
2550    }
2551
2552    /// Prepare (but do not start) a guest->guest call.
2553    ///
2554    /// This is called from fused adapter code generated in
2555    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2556    /// are synthesized Wasm functions which move the parameters from the caller
2557    /// to the callee and the result from the callee to the caller,
2558    /// respectively.  The adapter will call `Self::start_call` immediately
2559    /// after calling this function.
2560    ///
2561    /// SAFETY: All the pointer arguments must be valid pointers to guest
2562    /// entities (and with the expected signatures for the function references
2563    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2564    unsafe fn prepare_call<T: 'static>(
2565        self,
2566        mut store: StoreContextMut<T>,
2567        start: NonNull<VMFuncRef>,
2568        return_: NonNull<VMFuncRef>,
2569        caller_instance: RuntimeComponentInstanceIndex,
2570        callee_instance: RuntimeComponentInstanceIndex,
2571        task_return_type: TypeTupleIndex,
2572        callee_async: bool,
2573        memory: *mut VMMemoryDefinition,
2574        string_encoding: StringEncoding,
2575        caller_info: CallerInfo,
2576    ) -> Result<()> {
2577        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2578            // A task may only call an async-typed function via a sync lower if
2579            // it was created by a call to an async export.  Otherwise, we'll
2580            // trap.
2581            store.0.check_blocking()?;
2582        }
2583
2584        enum ResultInfo {
2585            Heap { results: u32 },
2586            Stack { result_count: u32 },
2587        }
2588
2589        let result_info = match &caller_info {
2590            CallerInfo::Async {
2591                has_result: true,
2592                params,
2593            } => ResultInfo::Heap {
2594                results: match params.last() {
2595                    Some(r) => r.get_u32(),
2596                    None => bail_bug!("retptr missing"),
2597                },
2598            },
2599            CallerInfo::Async {
2600                has_result: false, ..
2601            } => ResultInfo::Stack { result_count: 0 },
2602            CallerInfo::Sync {
2603                result_count,
2604                params,
2605            } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2606                results: match params.last() {
2607                    Some(r) => r.get_u32(),
2608                    None => bail_bug!("arg ptr missing"),
2609                },
2610            },
2611            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2612                result_count: *result_count,
2613            },
2614        };
2615
2616        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2617
2618        // Create a new guest task for the call, closing over the `start` and
2619        // `return_` functions to lift the parameters and lower the result,
2620        // respectively.
2621        let start = SendSyncPtr::new(start);
2622        let return_ = SendSyncPtr::new(return_);
2623        let token = StoreToken::new(store.as_context_mut());
2624        let state = store.0.concurrent_state_mut();
2625        let old_thread = state.current_guest_thread()?;
2626
2627        debug_assert_eq!(
2628            state.get_mut(old_thread.task)?.instance,
2629            self.runtime_instance(caller_instance)
2630        );
2631
2632        let guest_thread = GuestTask::new(
2633            state,
2634            Box::new(move |store, dst| {
2635                let mut store = token.as_context_mut(store);
2636                assert!(dst.len() <= MAX_FLAT_PARAMS);
2637                // The `+ 1` here accounts for the return pointer, if any:
2638                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2639                let count = match caller_info {
2640                    // Async callers, if they have a result, use the last
2641                    // parameter as a return pointer so chop that off if
2642                    // relevant here.
2643                    CallerInfo::Async { params, has_result } => {
2644                        let params = &params[..params.len() - usize::from(has_result)];
2645                        for (param, src) in params.iter().zip(&mut src) {
2646                            src.write(*param);
2647                        }
2648                        params.len()
2649                    }
2650
2651                    // Sync callers forward everything directly.
2652                    CallerInfo::Sync { params, .. } => {
2653                        for (param, src) in params.iter().zip(&mut src) {
2654                            src.write(*param);
2655                        }
2656                        params.len()
2657                    }
2658                };
2659                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2660                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2661                // how it was constructed (see
2662                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2663                // for details) we know it takes count parameters and returns
2664                // `dst.len()` results.
2665                unsafe {
2666                    crate::Func::call_unchecked_raw(
2667                        &mut store,
2668                        start.as_non_null(),
2669                        NonNull::new(
2670                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2671                        )
2672                        .unwrap(),
2673                    )?;
2674                }
2675                dst.copy_from_slice(&src[..dst.len()]);
2676                let state = store.0.concurrent_state_mut();
2677                Waitable::Guest(state.current_guest_thread()?.task).set_event(
2678                    state,
2679                    Some(Event::Subtask {
2680                        status: Status::Started,
2681                    }),
2682                )?;
2683                Ok(())
2684            }),
2685            LiftResult {
2686                lift: Box::new(move |store, src| {
2687                    // SAFETY: See comment in closure passed as `lower_params`
2688                    // parameter above.
2689                    let mut store = token.as_context_mut(store);
2690                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2691                    if let ResultInfo::Heap { results } = &result_info {
2692                        my_src.push(ValRaw::u32(*results));
2693                    }
2694
2695                    // Execute the `return_` hook, generated by Wasmtime's FACT
2696                    // compiler, in the context of the old thread. The old
2697                    // thread, this thread's caller, may have `realloc`
2698                    // callbacks invoked for example and those need the correct
2699                    // context set for the current thread.
2700                    let prev = store.0.set_thread(old_thread)?;
2701
2702                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2703                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2704                    // on how it was constructed (see
2705                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2706                    // for details) we know it takes `src.len()` parameters and
2707                    // returns up to 1 result.
2708                    unsafe {
2709                        crate::Func::call_unchecked_raw(
2710                            &mut store,
2711                            return_.as_non_null(),
2712                            my_src.as_mut_slice().into(),
2713                        )?;
2714                    }
2715
2716                    // Restore the previous current thread after the
2717                    // lifting/lowering has returned.
2718                    store.0.set_thread(prev)?;
2719
2720                    let state = store.0.concurrent_state_mut();
2721                    let thread = state.current_guest_thread()?;
2722                    if sync_caller {
2723                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2724                            if let ResultInfo::Stack { result_count } = &result_info {
2725                                match result_count {
2726                                    0 => None,
2727                                    1 => Some(my_src[0]),
2728                                    _ => unreachable!(),
2729                                }
2730                            } else {
2731                                None
2732                            },
2733                        );
2734                    }
2735                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2736                }),
2737                ty: task_return_type,
2738                memory: NonNull::new(memory).map(SendSyncPtr::new),
2739                string_encoding,
2740            },
2741            Caller::Guest { thread: old_thread },
2742            None,
2743            self.runtime_instance(callee_instance),
2744            callee_async,
2745        )?;
2746
2747        // Make the new thread the current one so that `Self::start_call` knows
2748        // which one to start.
2749        store.0.set_thread(guest_thread)?;
2750        log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2751
2752        Ok(())
2753    }
2754
2755    /// Call the specified callback function for an async-lifted export.
2756    ///
2757    /// SAFETY: `function` must be a valid reference to a guest function of the
2758    /// correct signature for a callback.
2759    unsafe fn call_callback<T>(
2760        self,
2761        mut store: StoreContextMut<T>,
2762        function: SendSyncPtr<VMFuncRef>,
2763        event: Event,
2764        handle: u32,
2765    ) -> Result<u32> {
2766        let (ordinal, result) = event.parts();
2767        let params = &mut [
2768            ValRaw::u32(ordinal),
2769            ValRaw::u32(handle),
2770            ValRaw::u32(result),
2771        ];
2772        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2773        // `wasmtime-cranelift`-generated fused adapter code or
2774        // `component::Options`.  Per `wasmparser` callback signature
2775        // validation, we know it takes three parameters and returns one.
2776        unsafe {
2777            crate::Func::call_unchecked_raw(
2778                &mut store,
2779                function.as_non_null(),
2780                params.as_mut_slice().into(),
2781            )?;
2782        }
2783        Ok(params[0].get_u32())
2784    }
2785
2786    /// Start a guest->guest call previously prepared using
2787    /// `Self::prepare_call`.
2788    ///
2789    /// This is called from fused adapter code generated in
2790    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2791    /// this function immediately after calling `Self::prepare_call`.
2792    ///
2793    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2794    /// functions with the appropriate signatures for the current guest task.
2795    /// If this is a call to an async-lowered import, the actual call may be
2796    /// deferred and run after this function returns, in which case the pointer
2797    /// arguments must also be valid when the call happens.
2798    unsafe fn start_call<T: 'static>(
2799        self,
2800        mut store: StoreContextMut<T>,
2801        callback: *mut VMFuncRef,
2802        post_return: *mut VMFuncRef,
2803        callee: NonNull<VMFuncRef>,
2804        param_count: u32,
2805        result_count: u32,
2806        flags: u32,
2807        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2808    ) -> Result<u32> {
2809        let token = StoreToken::new(store.as_context_mut());
2810        let async_caller = storage.is_none();
2811        let state = store.0.concurrent_state_mut();
2812        let guest_thread = state.current_guest_thread()?;
2813        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2814        let callee = SendSyncPtr::new(callee);
2815        let param_count = usize::try_from(param_count)?;
2816        assert!(param_count <= MAX_FLAT_PARAMS);
2817        let result_count = usize::try_from(result_count)?;
2818        assert!(result_count <= MAX_FLAT_RESULTS);
2819
2820        let task = state.get_mut(guest_thread.task)?;
2821        if let Some(callback) = NonNull::new(callback) {
2822            // We're calling an async-lifted export with a callback, so store
2823            // the callback and related context as part of the task so we can
2824            // call it later when needed.
2825            let callback = SendSyncPtr::new(callback);
2826            task.callback = Some(Box::new(move |store, event, handle| {
2827                let store = token.as_context_mut(store);
2828                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2829            }));
2830        }
2831
2832        let Caller::Guest { thread: caller } = &task.caller else {
2833            // As of this writing, `start_call` is only used for guest->guest
2834            // calls.
2835            bail_bug!("start_call unexpectedly invoked for host->guest call");
2836        };
2837        let caller = *caller;
2838        let caller_instance = state.get_mut(caller.task)?.instance;
2839
2840        // Queue the call as a "high priority" work item.
2841        unsafe {
2842            self.queue_call(
2843                store.as_context_mut(),
2844                guest_thread,
2845                callee,
2846                param_count,
2847                result_count,
2848                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2849                NonNull::new(callback).map(SendSyncPtr::new),
2850                NonNull::new(post_return).map(SendSyncPtr::new),
2851            )?;
2852        }
2853
2854        let state = store.0.concurrent_state_mut();
2855
2856        // Use the caller's `GuestThread::sync_call_set` to register interest in
2857        // the subtask...
2858        let guest_waitable = Waitable::Guest(guest_thread.task);
2859        let old_set = guest_waitable.common(state)?.set;
2860        let set = state.get_mut(caller.thread)?.sync_call_set;
2861        guest_waitable.join(state, Some(set))?;
2862
2863        store.0.set_thread(CurrentThread::None)?;
2864
2865        // ... and suspend this fiber temporarily while we wait for it to start.
2866        //
2867        // Note that we _could_ call the callee directly using the current fiber
2868        // rather than suspend this one, but that would make reasoning about the
2869        // event loop more complicated and is probably only worth doing if
2870        // there's a measurable performance benefit.  In addition, it would mean
2871        // blocking the caller if the callee calls a blocking sync-lowered
2872        // import, and as of this writing the spec says we must not do that.
2873        //
2874        // Alternatively, the fused adapter code could be modified to call the
2875        // callee directly without calling a host-provided intrinsic at all (in
2876        // which case it would need to do its own, inline backpressure checks,
2877        // etc.).  Again, we'd want to see a measurable performance benefit
2878        // before committing to such an optimization.  And again, we'd need to
2879        // update the spec to allow that.
2880        let (status, waitable) = loop {
2881            store.0.suspend(SuspendReason::Waiting {
2882                set,
2883                thread: caller,
2884                // Normally, `StoreOpaque::suspend` would assert it's being
2885                // called from a context where blocking is allowed.  However, if
2886                // `async_caller` is `true`, we'll only "block" long enough for
2887                // the callee to start, i.e. we won't repeat this loop, so we
2888                // tell `suspend` it's okay even if we're not allowed to block.
2889                // Alternatively, if the callee is not an async function, then
2890                // we know it won't block anyway.
2891                skip_may_block_check: async_caller || !callee_async,
2892            })?;
2893
2894            let state = store.0.concurrent_state_mut();
2895
2896            log::trace!("taking event for {:?}", guest_thread.task);
2897            let event = guest_waitable.take_event(state)?;
2898            let Some(Event::Subtask { status }) = event else {
2899                bail_bug!("subtasks should only get subtask events, got {event:?}")
2900            };
2901
2902            log::trace!("status {status:?} for {:?}", guest_thread.task);
2903
2904            if status == Status::Returned {
2905                // It returned, so we can stop waiting.
2906                break (status, None);
2907            } else if async_caller {
2908                // It hasn't returned yet, but the caller is calling via an
2909                // async-lowered import, so we generate a handle for the task
2910                // waitable and return the status.
2911                let handle = store
2912                    .0
2913                    .instance_state(caller_instance)
2914                    .handle_table()
2915                    .subtask_insert_guest(guest_thread.task.rep())?;
2916                store
2917                    .0
2918                    .concurrent_state_mut()
2919                    .get_mut(guest_thread.task)?
2920                    .common
2921                    .handle = Some(handle);
2922                break (status, Some(handle));
2923            } else {
2924                // The callee hasn't returned yet, and the caller is calling via
2925                // a sync-lowered import, so we loop and keep waiting until the
2926                // callee returns.
2927            }
2928        };
2929
2930        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2931
2932        // Reset the current thread to point to the caller as it resumes control.
2933        store.0.set_thread(caller)?;
2934        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2935        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2936
2937        if let Some(storage) = storage {
2938            // The caller used a sync-lowered import to call an async-lifted
2939            // export, in which case the result, if any, has been stashed in
2940            // `GuestTask::sync_result`.
2941            let state = store.0.concurrent_state_mut();
2942            let task = state.get_mut(guest_thread.task)?;
2943            if let Some(result) = task.sync_result.take()? {
2944                if let Some(result) = result {
2945                    storage[0] = MaybeUninit::new(result);
2946                }
2947
2948                if task.exited && task.ready_to_delete() {
2949                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2950                }
2951            }
2952        }
2953
2954        Ok(status.pack(waitable))
2955    }
2956
2957    /// Poll the specified future once on behalf of a guest->host call using an
2958    /// async-lowered import.
2959    ///
2960    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2961    /// `Pending`, add it to the set of futures to be polled as part of this
2962    /// instance's event loop until it completes, and then return
2963    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2964    ///
2965    /// Whether the future returns `Ready` immediately or later, the `lower`
2966    /// function will be used to lower the result, if any, into the guest caller's
2967    /// stack and linear memory. The `lower` function is invoked with `None` if
2968    /// the future is cancelled.
2969    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2970        self,
2971        mut store: StoreContextMut<'_, T>,
2972        future: impl Future<Output = Result<R>> + Send + 'static,
2973        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2974    ) -> Result<Option<u32>> {
2975        let token = StoreToken::new(store.as_context_mut());
2976        let state = store.0.concurrent_state_mut();
2977        let task = state.current_host_thread()?;
2978
2979        // Create an abortable future which hooks calls to poll and manages call
2980        // context state for the future.
2981        let (join_handle, future) = JoinHandle::run(future);
2982        {
2983            let state = &mut state.get_mut(task)?.state;
2984            assert!(matches!(state, HostTaskState::CalleeStarted));
2985            *state = HostTaskState::CalleeRunning(join_handle);
2986        }
2987
2988        let mut future = Box::pin(future);
2989
2990        // Finally, poll the future.  We can use a dummy `Waker` here because
2991        // we'll add the future to `ConcurrentState::futures` and poll it
2992        // automatically from the event loop if it doesn't complete immediately
2993        // here.
2994        let poll = tls::set(store.0, || {
2995            future
2996                .as_mut()
2997                .poll(&mut Context::from_waker(&Waker::noop()))
2998        });
2999
3000        match poll {
3001            // It finished immediately; lower the result and delete the task.
3002            Poll::Ready(result) => {
3003                let result = result.transpose()?;
3004                lower(store.as_context_mut(), result)?;
3005                return Ok(None);
3006            }
3007
3008            // Future isn't ready yet, so fall through.
3009            Poll::Pending => {}
3010        }
3011
3012        // It hasn't finished yet; add the future to
3013        // `ConcurrentState::futures` so it will be polled by the event
3014        // loop and allocate a waitable handle to return to the guest.
3015
3016        // Wrap the future in a closure responsible for lowering the result into
3017        // the guest's stack and memory, as well as notifying any waiters that
3018        // the task returned.
3019        let future = Box::pin(async move {
3020            let result = match future.await {
3021                Some(result) => Some(result?),
3022                None => None,
3023            };
3024            let on_complete = move |store: &mut dyn VMStore| {
3025                // Restore the `current_thread` to be the host so `lower` knows
3026                // how to manipulate borrows and knows which scope of borrows
3027                // to check.
3028                let mut store = token.as_context_mut(store);
3029                let old = store.0.set_thread(task)?;
3030
3031                let status = if result.is_some() {
3032                    Status::Returned
3033                } else {
3034                    Status::ReturnCancelled
3035                };
3036
3037                lower(store.as_context_mut(), result)?;
3038                let state = store.0.concurrent_state_mut();
3039                match &mut state.get_mut(task)?.state {
3040                    // The task is already flagged as finished because it was
3041                    // cancelled. No need to transition further.
3042                    HostTaskState::CalleeDone { .. } => {}
3043
3044                    // Otherwise transition this task to the done state.
3045                    other => *other = HostTaskState::CalleeDone { cancelled: false },
3046                }
3047                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3048
3049                store.0.set_thread(old)?;
3050                Ok(())
3051            };
3052
3053            // Here we schedule a task to run on a worker fiber to do the
3054            // lowering since it may involve a call to the guest's realloc
3055            // function. This is necessary because calling the guest while
3056            // there are host embedder frames on the stack is unsound.
3057            tls::get(move |store| {
3058                store
3059                    .concurrent_state_mut()
3060                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3061                        on_complete,
3062                    ))));
3063                Ok(())
3064            })
3065        });
3066
3067        // Make this task visible to the guest and then record what it
3068        // was made visible as.
3069        let state = store.0.concurrent_state_mut();
3070        state.push_future(future);
3071        let caller = state.get_mut(task)?.caller;
3072        let instance = state.get_mut(caller.task)?.instance;
3073        let handle = store
3074            .0
3075            .instance_state(instance)
3076            .handle_table()
3077            .subtask_insert_host(task.rep())?;
3078        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3079        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3080
3081        // Restore the currently running thread to this host task's
3082        // caller. Note that the host task isn't deallocated as it's
3083        // within the store and will get deallocated later.
3084        store.0.set_thread(caller)?;
3085        Ok(Some(handle))
3086    }
3087
3088    /// Implements the `task.return` intrinsic, lifting the result for the
3089    /// current guest task.
3090    pub(crate) fn task_return(
3091        self,
3092        store: &mut dyn VMStore,
3093        ty: TypeTupleIndex,
3094        options: OptionsIndex,
3095        storage: &[ValRaw],
3096    ) -> Result<()> {
3097        let state = store.concurrent_state_mut();
3098        let guest_thread = state.current_guest_thread()?;
3099        let lift = state
3100            .get_mut(guest_thread.task)?
3101            .lift_result
3102            .take()
3103            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3104        if !state.get_mut(guest_thread.task)?.result.is_none() {
3105            bail_bug!("task result unexpectedly already set");
3106        }
3107
3108        let CanonicalOptions {
3109            string_encoding,
3110            data_model,
3111            ..
3112        } = &self.id().get(store).component().env_component().options[options];
3113
3114        let invalid = ty != lift.ty
3115            || string_encoding != &lift.string_encoding
3116            || match data_model {
3117                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3118                    Some(memory) => {
3119                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3120                        let actual = self.id().get(store).runtime_memory(memory);
3121                        expected != actual.as_ptr()
3122                    }
3123                    // Memory not specified, meaning it didn't need to be
3124                    // specified per validation, so not invalid.
3125                    None => false,
3126                },
3127                // Always invalid as this isn't supported.
3128                CanonicalOptionsDataModel::Gc { .. } => true,
3129            };
3130
3131        if invalid {
3132            bail!(Trap::TaskReturnInvalid);
3133        }
3134
3135        log::trace!("task.return for {guest_thread:?}");
3136
3137        let result = (lift.lift)(store, storage)?;
3138        self.task_complete(store, guest_thread.task, result, Status::Returned)
3139    }
3140
3141    /// Implements the `task.cancel` intrinsic.
3142    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3143        let state = store.concurrent_state_mut();
3144        let guest_thread = state.current_guest_thread()?;
3145        let task = state.get_mut(guest_thread.task)?;
3146        if !task.cancel_sent {
3147            bail!(Trap::TaskCancelNotCancelled);
3148        }
3149        _ = task
3150            .lift_result
3151            .take()
3152            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3153
3154        if !task.result.is_none() {
3155            bail_bug!("task result should not bet set yet");
3156        }
3157
3158        log::trace!("task.cancel for {guest_thread:?}");
3159
3160        self.task_complete(
3161            store,
3162            guest_thread.task,
3163            Box::new(DummyResult),
3164            Status::ReturnCancelled,
3165        )
3166    }
3167
3168    /// Complete the specified guest task (i.e. indicate that it has either
3169    /// returned a (possibly empty) result or cancelled itself).
3170    ///
3171    /// This will return any resource borrows and notify any current or future
3172    /// waiters that the task has completed.
3173    fn task_complete(
3174        self,
3175        store: &mut StoreOpaque,
3176        guest_task: TableId<GuestTask>,
3177        result: Box<dyn Any + Send + Sync>,
3178        status: Status,
3179    ) -> Result<()> {
3180        store
3181            .component_resource_tables(Some(self))
3182            .validate_scope_exit()?;
3183
3184        let state = store.concurrent_state_mut();
3185        let task = state.get_mut(guest_task)?;
3186
3187        if let Caller::Host { tx, .. } = &mut task.caller {
3188            if let Some(tx) = tx.take() {
3189                _ = tx.send(result);
3190            }
3191        } else {
3192            task.result = Some(result);
3193            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3194        }
3195
3196        Ok(())
3197    }
3198
3199    /// Implements the `waitable-set.new` intrinsic.
3200    pub(crate) fn waitable_set_new(
3201        self,
3202        store: &mut StoreOpaque,
3203        caller_instance: RuntimeComponentInstanceIndex,
3204    ) -> Result<u32> {
3205        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3206        let handle = store
3207            .instance_state(self.runtime_instance(caller_instance))
3208            .handle_table()
3209            .waitable_set_insert(set.rep())?;
3210        log::trace!("new waitable set {set:?} (handle {handle})");
3211        Ok(handle)
3212    }
3213
3214    /// Implements the `waitable-set.drop` intrinsic.
3215    pub(crate) fn waitable_set_drop(
3216        self,
3217        store: &mut StoreOpaque,
3218        caller_instance: RuntimeComponentInstanceIndex,
3219        set: u32,
3220    ) -> Result<()> {
3221        let rep = store
3222            .instance_state(self.runtime_instance(caller_instance))
3223            .handle_table()
3224            .waitable_set_remove(set)?;
3225
3226        log::trace!("drop waitable set {rep} (handle {set})");
3227
3228        // Note that we're careful to check for waiters _before_ deleting the
3229        // set to avoid dropping any waiters in `WaitMode::Fiber(_)`, which
3230        // would panic.  See `drop-waitable-set-with-waiters.wast` for details.
3231        if !store
3232            .concurrent_state_mut()
3233            .get_mut(TableId::<WaitableSet>::new(rep))?
3234            .waiting
3235            .is_empty()
3236        {
3237            bail!(Trap::WaitableSetDropHasWaiters);
3238        }
3239
3240        store
3241            .concurrent_state_mut()
3242            .delete(TableId::<WaitableSet>::new(rep))?;
3243
3244        Ok(())
3245    }
3246
3247    /// Implements the `waitable.join` intrinsic.
3248    pub(crate) fn waitable_join(
3249        self,
3250        store: &mut StoreOpaque,
3251        caller_instance: RuntimeComponentInstanceIndex,
3252        waitable_handle: u32,
3253        set_handle: u32,
3254    ) -> Result<()> {
3255        let mut instance = self.id().get_mut(store);
3256        let waitable =
3257            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3258
3259        let set = if set_handle == 0 {
3260            None
3261        } else {
3262            let set = instance.instance_states().0[caller_instance]
3263                .handle_table()
3264                .waitable_set_rep(set_handle)?;
3265
3266            Some(TableId::<WaitableSet>::new(set))
3267        };
3268
3269        log::trace!(
3270            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3271        );
3272
3273        waitable.join(store.concurrent_state_mut(), set)
3274    }
3275
3276    /// Implements the `subtask.drop` intrinsic.
3277    pub(crate) fn subtask_drop(
3278        self,
3279        store: &mut StoreOpaque,
3280        caller_instance: RuntimeComponentInstanceIndex,
3281        task_id: u32,
3282    ) -> Result<()> {
3283        self.waitable_join(store, caller_instance, task_id, 0)?;
3284
3285        let (rep, is_host) = store
3286            .instance_state(self.runtime_instance(caller_instance))
3287            .handle_table()
3288            .subtask_remove(task_id)?;
3289
3290        let concurrent_state = store.concurrent_state_mut();
3291        let (waitable, delete) = if is_host {
3292            let id = TableId::<HostTask>::new(rep);
3293            let task = concurrent_state.get_mut(id)?;
3294            match &task.state {
3295                HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3296                HostTaskState::CalleeDone { .. } => {}
3297                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3298                    bail_bug!("invalid state for callee in `subtask.drop`")
3299                }
3300            }
3301            (Waitable::Host(id), true)
3302        } else {
3303            let id = TableId::<GuestTask>::new(rep);
3304            let task = concurrent_state.get_mut(id)?;
3305            if task.lift_result.is_some() {
3306                bail!(Trap::SubtaskDropNotResolved);
3307            }
3308            (
3309                Waitable::Guest(id),
3310                concurrent_state.get_mut(id)?.ready_to_delete(),
3311            )
3312        };
3313
3314        waitable.common(concurrent_state)?.handle = None;
3315
3316        // If this subtask has an event that means that the terminal status of
3317        // this subtask wasn't yet received so it can't be dropped yet.
3318        if waitable.take_event(concurrent_state)?.is_some() {
3319            bail!(Trap::SubtaskDropNotResolved);
3320        }
3321
3322        if delete {
3323            waitable.delete_from(concurrent_state)?;
3324        }
3325
3326        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3327        Ok(())
3328    }
3329
3330    /// Implements the `waitable-set.wait` intrinsic.
3331    pub(crate) fn waitable_set_wait(
3332        self,
3333        store: &mut StoreOpaque,
3334        options: OptionsIndex,
3335        set: u32,
3336        payload: u32,
3337    ) -> Result<u32> {
3338        if !self.options(store, options).async_ {
3339            // The caller may only call `waitable-set.wait` from an async task
3340            // (i.e. a task created via a call to an async export).
3341            // Otherwise, we'll trap.
3342            store.check_blocking()?;
3343        }
3344
3345        let &CanonicalOptions {
3346            cancellable,
3347            instance: caller_instance,
3348            ..
3349        } = &self.id().get(store).component().env_component().options[options];
3350        let rep = store
3351            .instance_state(self.runtime_instance(caller_instance))
3352            .handle_table()
3353            .waitable_set_rep(set)?;
3354
3355        self.waitable_check(
3356            store,
3357            cancellable,
3358            WaitableCheck::Wait,
3359            WaitableCheckParams {
3360                set: TableId::new(rep),
3361                options,
3362                payload,
3363            },
3364        )
3365    }
3366
3367    /// Implements the `waitable-set.poll` intrinsic.
3368    pub(crate) fn waitable_set_poll(
3369        self,
3370        store: &mut StoreOpaque,
3371        options: OptionsIndex,
3372        set: u32,
3373        payload: u32,
3374    ) -> Result<u32> {
3375        let &CanonicalOptions {
3376            cancellable,
3377            instance: caller_instance,
3378            ..
3379        } = &self.id().get(store).component().env_component().options[options];
3380        let rep = store
3381            .instance_state(self.runtime_instance(caller_instance))
3382            .handle_table()
3383            .waitable_set_rep(set)?;
3384
3385        self.waitable_check(
3386            store,
3387            cancellable,
3388            WaitableCheck::Poll,
3389            WaitableCheckParams {
3390                set: TableId::new(rep),
3391                options,
3392                payload,
3393            },
3394        )
3395    }
3396
3397    /// Implements the `thread.index` intrinsic.
3398    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3399        let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3400        match store
3401            .concurrent_state_mut()
3402            .get_mut(thread_id)?
3403            .instance_rep
3404        {
3405            Some(r) => Ok(r),
3406            None => bail_bug!("thread should have instance_rep by now"),
3407        }
3408    }
3409
3410    /// Implements the `thread.new-indirect` intrinsic.
3411    pub(crate) fn thread_new_indirect<T: 'static>(
3412        self,
3413        mut store: StoreContextMut<T>,
3414        runtime_instance: RuntimeComponentInstanceIndex,
3415        _func_ty_idx: TypeFuncIndex, // currently unused
3416        start_func_table_idx: RuntimeTableIndex,
3417        start_func_idx: u32,
3418        context: i32,
3419    ) -> Result<u32> {
3420        log::trace!("creating new thread");
3421
3422        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3423        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3424        let callee = instance
3425            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3426            .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3427        if callee.type_index(store.0) != start_func_ty.type_index() {
3428            bail!(Trap::ThreadNewIndirectInvalidType);
3429        }
3430
3431        let token = StoreToken::new(store.as_context_mut());
3432        let start_func = Box::new(
3433            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3434                let old_thread = store.set_thread(guest_thread)?;
3435                log::trace!(
3436                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3437                );
3438
3439                let mut store = token.as_context_mut(store);
3440                let mut params = [ValRaw::i32(context)];
3441                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3442                // on a separate fiber if we're running in an async store.
3443                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3444
3445                store.0.set_thread(old_thread)?;
3446
3447                store.0.cleanup_thread(
3448                    guest_thread,
3449                    self.runtime_instance(runtime_instance),
3450                    CleanupTask::Yes,
3451                )?;
3452                log::trace!("explicit thread {guest_thread:?} completed");
3453                let state = store.0.concurrent_state_mut();
3454                if let Some(t) = old_thread.guest() {
3455                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
3456                }
3457                log::trace!("thread start: restored {old_thread:?} as current thread");
3458
3459                Ok(())
3460            },
3461        );
3462
3463        let state = store.0.concurrent_state_mut();
3464        let current_thread = state.current_guest_thread()?;
3465        let parent_task = current_thread.task;
3466
3467        let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3468        let thread_id = state.push(new_thread)?;
3469        state.get_mut(parent_task)?.threads.insert(thread_id);
3470
3471        log::trace!("new thread with id {thread_id:?} created");
3472
3473        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3474    }
3475
3476    pub(crate) fn resume_thread(
3477        self,
3478        store: &mut StoreOpaque,
3479        runtime_instance: RuntimeComponentInstanceIndex,
3480        thread_idx: u32,
3481        high_priority: bool,
3482        allow_ready: bool,
3483    ) -> Result<()> {
3484        let thread_id =
3485            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3486        let state = store.concurrent_state_mut();
3487        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3488        let thread = state.get_mut(guest_thread.thread)?;
3489
3490        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3491            GuestThreadState::NotStartedExplicit(start_func) => {
3492                log::trace!("starting thread {guest_thread:?}");
3493                let guest_call = WorkItem::GuestCall(
3494                    runtime_instance,
3495                    GuestCall {
3496                        thread: guest_thread,
3497                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3498                            start_func(store, guest_thread)
3499                        })),
3500                    },
3501                );
3502                store
3503                    .concurrent_state_mut()
3504                    .push_work_item(guest_call, high_priority);
3505            }
3506            GuestThreadState::Suspended(fiber) => {
3507                log::trace!("resuming thread {thread_id:?} that was suspended");
3508                store
3509                    .concurrent_state_mut()
3510                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3511            }
3512            GuestThreadState::Ready(fiber) if allow_ready => {
3513                log::trace!("resuming thread {thread_id:?} that was ready");
3514                thread.state = GuestThreadState::Ready(fiber);
3515                store
3516                    .concurrent_state_mut()
3517                    .promote_thread_work_item(guest_thread);
3518            }
3519            other => {
3520                thread.state = other;
3521                bail!(Trap::CannotResumeThread);
3522            }
3523        }
3524        Ok(())
3525    }
3526
3527    fn add_guest_thread_to_instance_table(
3528        self,
3529        thread_id: TableId<GuestThread>,
3530        store: &mut StoreOpaque,
3531        runtime_instance: RuntimeComponentInstanceIndex,
3532    ) -> Result<u32> {
3533        let guest_id = store
3534            .instance_state(self.runtime_instance(runtime_instance))
3535            .thread_handle_table()
3536            .guest_thread_insert(thread_id.rep())?;
3537        store
3538            .concurrent_state_mut()
3539            .get_mut(thread_id)?
3540            .instance_rep = Some(guest_id);
3541        Ok(guest_id)
3542    }
3543
3544    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3545    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3546    pub(crate) fn suspension_intrinsic(
3547        self,
3548        store: &mut StoreOpaque,
3549        caller: RuntimeComponentInstanceIndex,
3550        cancellable: bool,
3551        yielding: bool,
3552        to_thread: SuspensionTarget,
3553    ) -> Result<WaitResult> {
3554        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3555        if to_thread.is_none() {
3556            let state = store.concurrent_state_mut();
3557            if yielding {
3558                // This is a `thread.yield` call
3559                if !state.may_block(guest_thread.task)? {
3560                    // In a non-blocking context, a `thread.yield` may trigger
3561                    // other threads in the same component instance to run.
3562                    if !state.promote_instance_local_thread_work_item(caller) {
3563                        // No other threads are runnable, so just return
3564                        return Ok(WaitResult::Completed);
3565                    }
3566                }
3567            } else {
3568                // The caller may only call `thread.suspend` from an async task
3569                // (i.e. a task created via a call to an async export).
3570                // Otherwise, we'll trap.
3571                store.check_blocking()?;
3572            }
3573        }
3574
3575        // There could be a pending cancellation from a previous uncancellable wait
3576        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3577            return Ok(WaitResult::Cancelled);
3578        }
3579
3580        match to_thread {
3581            SuspensionTarget::SomeSuspended(thread) => {
3582                self.resume_thread(store, caller, thread, true, false)?
3583            }
3584            SuspensionTarget::Some(thread) => {
3585                self.resume_thread(store, caller, thread, true, true)?
3586            }
3587            SuspensionTarget::None => { /* nothing to do */ }
3588        }
3589
3590        let reason = if yielding {
3591            SuspendReason::Yielding {
3592                thread: guest_thread,
3593                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3594                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3595                // panic if we called it in a non-blocking context.
3596                skip_may_block_check: to_thread.is_some(),
3597            }
3598        } else {
3599            SuspendReason::ExplicitlySuspending {
3600                thread: guest_thread,
3601                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3602                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3603                // panic if we called it in a non-blocking context.
3604                skip_may_block_check: to_thread.is_some(),
3605            }
3606        };
3607
3608        store.suspend(reason)?;
3609
3610        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3611            Ok(WaitResult::Cancelled)
3612        } else {
3613            Ok(WaitResult::Completed)
3614        }
3615    }
3616
3617    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3618    fn waitable_check(
3619        self,
3620        store: &mut StoreOpaque,
3621        cancellable: bool,
3622        check: WaitableCheck,
3623        params: WaitableCheckParams,
3624    ) -> Result<u32> {
3625        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3626
3627        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3628
3629        let state = store.concurrent_state_mut();
3630        let task = state.get_mut(guest_thread.task)?;
3631
3632        // If we're waiting, and there are no events immediately available,
3633        // suspend the fiber until that changes.
3634        match &check {
3635            WaitableCheck::Wait => {
3636                let set = params.set;
3637
3638                if (task.event.is_none()
3639                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3640                    && state.get_mut(set)?.ready.is_empty()
3641                {
3642                    if cancellable {
3643                        let old = state
3644                            .get_mut(guest_thread.thread)?
3645                            .wake_on_cancel
3646                            .replace(set);
3647                        if !old.is_none() {
3648                            bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3649                        }
3650                    }
3651
3652                    store.suspend(SuspendReason::Waiting {
3653                        set,
3654                        thread: guest_thread,
3655                        skip_may_block_check: false,
3656                    })?;
3657                }
3658            }
3659            WaitableCheck::Poll => {}
3660        }
3661
3662        log::trace!(
3663            "waitable check for {guest_thread:?}; set {:?}, part two",
3664            params.set
3665        );
3666
3667        // Deliver any pending events to the guest and return.
3668        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3669
3670        let (ordinal, handle, result) = match &check {
3671            WaitableCheck::Wait => {
3672                let (event, waitable) = match event {
3673                    Some(p) => p,
3674                    None => bail_bug!("event expected to be present"),
3675                };
3676                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3677                let (ordinal, result) = event.parts();
3678                (ordinal, handle, result)
3679            }
3680            WaitableCheck::Poll => {
3681                if let Some((event, waitable)) = event {
3682                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3683                    let (ordinal, result) = event.parts();
3684                    (ordinal, handle, result)
3685                } else {
3686                    log::trace!(
3687                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3688                        guest_thread.task,
3689                        params.set
3690                    );
3691                    let (ordinal, result) = Event::None.parts();
3692                    (ordinal, 0, result)
3693                }
3694            }
3695        };
3696        let memory = self.options_memory_mut(store, params.options);
3697        let ptr = func::validate_inbounds_dynamic(
3698            &CanonicalAbiInfo::POINTER_PAIR,
3699            memory,
3700            &ValRaw::u32(params.payload),
3701        )?;
3702        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3703        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3704        Ok(ordinal)
3705    }
3706
3707    /// Implements the `subtask.cancel` intrinsic.
3708    pub(crate) fn subtask_cancel(
3709        self,
3710        store: &mut StoreOpaque,
3711        caller_instance: RuntimeComponentInstanceIndex,
3712        async_: bool,
3713        task_id: u32,
3714    ) -> Result<u32> {
3715        if !async_ {
3716            // The caller may only sync call `subtask.cancel` from an async task
3717            // (i.e. a task created via a call to an async export).  Otherwise,
3718            // we'll trap.
3719            store.check_blocking()?;
3720        }
3721
3722        let (rep, is_host) = store
3723            .instance_state(self.runtime_instance(caller_instance))
3724            .handle_table()
3725            .subtask_rep(task_id)?;
3726        let waitable = if is_host {
3727            Waitable::Host(TableId::<HostTask>::new(rep))
3728        } else {
3729            Waitable::Guest(TableId::<GuestTask>::new(rep))
3730        };
3731        let concurrent_state = store.concurrent_state_mut();
3732
3733        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3734
3735        let needs_block;
3736        if let Waitable::Host(host_task) = waitable {
3737            let state = &mut concurrent_state.get_mut(host_task)?.state;
3738            match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3739                // If the callee is still running, signal an abort is requested.
3740                //
3741                // After cancelling this falls through to block waiting for the
3742                // host task to actually finish assuming that `async_` is false.
3743                // This blocking behavior resolves the race of `handle.abort()`
3744                // with the task actually getting cancelled or finishing.
3745                HostTaskState::CalleeRunning(handle) => {
3746                    handle.abort();
3747                    needs_block = true;
3748                }
3749
3750                // Cancellation was already requested, so fail as the task can't
3751                // be cancelled twice.
3752                HostTaskState::CalleeDone { cancelled } => {
3753                    if cancelled {
3754                        bail!(Trap::SubtaskCancelAfterTerminal);
3755                    } else {
3756                        // The callee is already done so there's no need to
3757                        // block further for an event.
3758                        needs_block = false;
3759                    }
3760                }
3761
3762                // These states should not be possible for a subtask that's
3763                // visible from the guest, so trap here.
3764                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3765                    bail_bug!("invalid states for host callee")
3766                }
3767            }
3768        } else {
3769            let guest_task = TableId::<GuestTask>::new(rep);
3770            let task = concurrent_state.get_mut(guest_task)?;
3771            if !task.already_lowered_parameters() {
3772                store.cancel_guest_subtask_without_lowered_parameters(
3773                    self.runtime_instance(caller_instance),
3774                    guest_task,
3775                )?;
3776                return Ok(Status::StartCancelled as u32);
3777            } else if !task.returned_or_cancelled() {
3778                // Started, but not yet returned or cancelled; send the
3779                // `CANCELLED` event
3780                task.cancel_sent = true;
3781                // Note that this might overwrite an event that was set earlier
3782                // (e.g. `Event::None` if the task is yielding, or
3783                // `Event::Cancelled` if it was already cancelled), but that's
3784                // okay -- this should supersede the previous state.
3785                task.event = Some(Event::Cancelled);
3786                let runtime_instance = task.instance.index;
3787                for thread in task.threads.clone() {
3788                    let thread = QualifiedThreadId {
3789                        task: guest_task,
3790                        thread,
3791                    };
3792                    if let Some(set) = concurrent_state
3793                        .get_mut(thread.thread)?
3794                        .wake_on_cancel
3795                        .take()
3796                    {
3797                        let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3798                            Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3799                            Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3800                                runtime_instance,
3801                                GuestCall {
3802                                    thread,
3803                                    kind: GuestCallKind::DeliverEvent {
3804                                        instance,
3805                                        set: None,
3806                                    },
3807                                },
3808                            ),
3809                            None => bail_bug!("thread not present in wake_on_cancel set"),
3810                        };
3811                        concurrent_state.push_high_priority(item);
3812
3813                        let caller = concurrent_state.current_guest_thread()?;
3814                        store.suspend(SuspendReason::Yielding {
3815                            thread: caller,
3816                            // `subtask.cancel` is not allowed to be called in a
3817                            // sync context, so we cannot skip the may-block check.
3818                            skip_may_block_check: false,
3819                        })?;
3820                        break;
3821                    }
3822                }
3823
3824                // Guest tasks need to block if they have not yet returned or
3825                // cancelled, even as a result of the event delivery above.
3826                needs_block = !store
3827                    .concurrent_state_mut()
3828                    .get_mut(guest_task)?
3829                    .returned_or_cancelled()
3830            } else {
3831                needs_block = false;
3832            }
3833        };
3834
3835        // If we need to block waiting on the terminal status of this subtask
3836        // then return immediately in `async` mode, or otherwise wait for the
3837        // event to get signaled through the store.
3838        if needs_block {
3839            if async_ {
3840                return Ok(BLOCKED);
3841            }
3842
3843            // Wait for this waitable to get signaled with its terminal status
3844            // from the completion callback enqueued by `first_poll`. Once
3845            // that's done fall through to the sahred
3846            store.wait_for_event(waitable)?;
3847
3848            // .. fall through to determine what event's in store for us.
3849        }
3850
3851        let event = waitable.take_event(store.concurrent_state_mut())?;
3852        if let Some(Event::Subtask {
3853            status: status @ (Status::Returned | Status::ReturnCancelled),
3854        }) = event
3855        {
3856            Ok(status as u32)
3857        } else {
3858            bail!(Trap::SubtaskCancelAfterTerminal);
3859        }
3860    }
3861}
3862
3863/// Trait representing component model ABI async intrinsics and fused adapter
3864/// helper functions.
3865///
3866/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3867/// which must be valid for at least the duration of the call (and possibly for
3868/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3869/// pointers used for async calls).
3870pub trait VMComponentAsyncStore {
3871    /// A helper function for fused adapter modules involving calls where the
3872    /// one of the caller or callee is async.
3873    ///
3874    /// This helper is not used when the caller and callee both use the sync
3875    /// ABI, only when at least one is async is this used.
3876    unsafe fn prepare_call(
3877        &mut self,
3878        instance: Instance,
3879        memory: *mut VMMemoryDefinition,
3880        start: NonNull<VMFuncRef>,
3881        return_: NonNull<VMFuncRef>,
3882        caller_instance: RuntimeComponentInstanceIndex,
3883        callee_instance: RuntimeComponentInstanceIndex,
3884        task_return_type: TypeTupleIndex,
3885        callee_async: bool,
3886        string_encoding: StringEncoding,
3887        result_count: u32,
3888        storage: *mut ValRaw,
3889        storage_len: usize,
3890    ) -> Result<()>;
3891
3892    /// A helper function for fused adapter modules involving calls where the
3893    /// caller is sync-lowered but the callee is async-lifted.
3894    unsafe fn sync_start(
3895        &mut self,
3896        instance: Instance,
3897        callback: *mut VMFuncRef,
3898        callee: NonNull<VMFuncRef>,
3899        param_count: u32,
3900        storage: *mut MaybeUninit<ValRaw>,
3901        storage_len: usize,
3902    ) -> Result<()>;
3903
3904    /// A helper function for fused adapter modules involving calls where the
3905    /// caller is async-lowered.
3906    unsafe fn async_start(
3907        &mut self,
3908        instance: Instance,
3909        callback: *mut VMFuncRef,
3910        post_return: *mut VMFuncRef,
3911        callee: NonNull<VMFuncRef>,
3912        param_count: u32,
3913        result_count: u32,
3914        flags: u32,
3915    ) -> Result<u32>;
3916
3917    /// The `future.write` intrinsic.
3918    fn future_write(
3919        &mut self,
3920        instance: Instance,
3921        caller: RuntimeComponentInstanceIndex,
3922        ty: TypeFutureTableIndex,
3923        options: OptionsIndex,
3924        future: u32,
3925        address: u32,
3926    ) -> Result<u32>;
3927
3928    /// The `future.read` intrinsic.
3929    fn future_read(
3930        &mut self,
3931        instance: Instance,
3932        caller: RuntimeComponentInstanceIndex,
3933        ty: TypeFutureTableIndex,
3934        options: OptionsIndex,
3935        future: u32,
3936        address: u32,
3937    ) -> Result<u32>;
3938
3939    /// The `future.drop-writable` intrinsic.
3940    fn future_drop_writable(
3941        &mut self,
3942        instance: Instance,
3943        ty: TypeFutureTableIndex,
3944        writer: u32,
3945    ) -> Result<()>;
3946
3947    /// The `stream.write` intrinsic.
3948    fn stream_write(
3949        &mut self,
3950        instance: Instance,
3951        caller: RuntimeComponentInstanceIndex,
3952        ty: TypeStreamTableIndex,
3953        options: OptionsIndex,
3954        stream: u32,
3955        address: u32,
3956        count: u32,
3957    ) -> Result<u32>;
3958
3959    /// The `stream.read` intrinsic.
3960    fn stream_read(
3961        &mut self,
3962        instance: Instance,
3963        caller: RuntimeComponentInstanceIndex,
3964        ty: TypeStreamTableIndex,
3965        options: OptionsIndex,
3966        stream: u32,
3967        address: u32,
3968        count: u32,
3969    ) -> Result<u32>;
3970
3971    /// The "fast-path" implementation of the `stream.write` intrinsic for
3972    /// "flat" (i.e. memcpy-able) payloads.
3973    fn flat_stream_write(
3974        &mut self,
3975        instance: Instance,
3976        caller: RuntimeComponentInstanceIndex,
3977        ty: TypeStreamTableIndex,
3978        options: OptionsIndex,
3979        payload_size: u32,
3980        payload_align: u32,
3981        stream: u32,
3982        address: u32,
3983        count: u32,
3984    ) -> Result<u32>;
3985
3986    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3987    /// (i.e. memcpy-able) payloads.
3988    fn flat_stream_read(
3989        &mut self,
3990        instance: Instance,
3991        caller: RuntimeComponentInstanceIndex,
3992        ty: TypeStreamTableIndex,
3993        options: OptionsIndex,
3994        payload_size: u32,
3995        payload_align: u32,
3996        stream: u32,
3997        address: u32,
3998        count: u32,
3999    ) -> Result<u32>;
4000
4001    /// The `stream.drop-writable` intrinsic.
4002    fn stream_drop_writable(
4003        &mut self,
4004        instance: Instance,
4005        ty: TypeStreamTableIndex,
4006        writer: u32,
4007    ) -> Result<()>;
4008
4009    /// The `error-context.debug-message` intrinsic.
4010    fn error_context_debug_message(
4011        &mut self,
4012        instance: Instance,
4013        ty: TypeComponentLocalErrorContextTableIndex,
4014        options: OptionsIndex,
4015        err_ctx_handle: u32,
4016        debug_msg_address: u32,
4017    ) -> Result<()>;
4018
4019    /// The `thread.new-indirect` intrinsic
4020    fn thread_new_indirect(
4021        &mut self,
4022        instance: Instance,
4023        caller: RuntimeComponentInstanceIndex,
4024        func_ty_idx: TypeFuncIndex,
4025        start_func_table_idx: RuntimeTableIndex,
4026        start_func_idx: u32,
4027        context: i32,
4028    ) -> Result<u32>;
4029}
4030
4031/// SAFETY: See trait docs.
4032impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4033    unsafe fn prepare_call(
4034        &mut self,
4035        instance: Instance,
4036        memory: *mut VMMemoryDefinition,
4037        start: NonNull<VMFuncRef>,
4038        return_: NonNull<VMFuncRef>,
4039        caller_instance: RuntimeComponentInstanceIndex,
4040        callee_instance: RuntimeComponentInstanceIndex,
4041        task_return_type: TypeTupleIndex,
4042        callee_async: bool,
4043        string_encoding: StringEncoding,
4044        result_count_or_max_if_async: u32,
4045        storage: *mut ValRaw,
4046        storage_len: usize,
4047    ) -> Result<()> {
4048        // SAFETY: The `wasmtime_cranelift`-generated code that calls
4049        // this method will have ensured that `storage` is a valid
4050        // pointer containing at least `storage_len` items.
4051        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
4052
4053        unsafe {
4054            instance.prepare_call(
4055                StoreContextMut(self),
4056                start,
4057                return_,
4058                caller_instance,
4059                callee_instance,
4060                task_return_type,
4061                callee_async,
4062                memory,
4063                string_encoding,
4064                match result_count_or_max_if_async {
4065                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4066                        params,
4067                        has_result: false,
4068                    },
4069                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4070                        params,
4071                        has_result: true,
4072                    },
4073                    result_count => CallerInfo::Sync {
4074                        params,
4075                        result_count,
4076                    },
4077                },
4078            )
4079        }
4080    }
4081
4082    unsafe fn sync_start(
4083        &mut self,
4084        instance: Instance,
4085        callback: *mut VMFuncRef,
4086        callee: NonNull<VMFuncRef>,
4087        param_count: u32,
4088        storage: *mut MaybeUninit<ValRaw>,
4089        storage_len: usize,
4090    ) -> Result<()> {
4091        unsafe {
4092            instance
4093                .start_call(
4094                    StoreContextMut(self),
4095                    callback,
4096                    ptr::null_mut(),
4097                    callee,
4098                    param_count,
4099                    1,
4100                    START_FLAG_ASYNC_CALLEE,
4101                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4102                    // this method will have ensured that `storage` is a valid
4103                    // pointer containing at least `storage_len` items.
4104                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4105                )
4106                .map(drop)
4107        }
4108    }
4109
4110    unsafe fn async_start(
4111        &mut self,
4112        instance: Instance,
4113        callback: *mut VMFuncRef,
4114        post_return: *mut VMFuncRef,
4115        callee: NonNull<VMFuncRef>,
4116        param_count: u32,
4117        result_count: u32,
4118        flags: u32,
4119    ) -> Result<u32> {
4120        unsafe {
4121            instance.start_call(
4122                StoreContextMut(self),
4123                callback,
4124                post_return,
4125                callee,
4126                param_count,
4127                result_count,
4128                flags,
4129                None,
4130            )
4131        }
4132    }
4133
4134    fn future_write(
4135        &mut self,
4136        instance: Instance,
4137        caller: RuntimeComponentInstanceIndex,
4138        ty: TypeFutureTableIndex,
4139        options: OptionsIndex,
4140        future: u32,
4141        address: u32,
4142    ) -> Result<u32> {
4143        instance
4144            .guest_write(
4145                StoreContextMut(self),
4146                caller,
4147                TransmitIndex::Future(ty),
4148                options,
4149                None,
4150                future,
4151                address,
4152                1,
4153            )
4154            .map(|result| result.encode())
4155    }
4156
4157    fn future_read(
4158        &mut self,
4159        instance: Instance,
4160        caller: RuntimeComponentInstanceIndex,
4161        ty: TypeFutureTableIndex,
4162        options: OptionsIndex,
4163        future: u32,
4164        address: u32,
4165    ) -> Result<u32> {
4166        instance
4167            .guest_read(
4168                StoreContextMut(self),
4169                caller,
4170                TransmitIndex::Future(ty),
4171                options,
4172                None,
4173                future,
4174                address,
4175                1,
4176            )
4177            .map(|result| result.encode())
4178    }
4179
4180    fn stream_write(
4181        &mut self,
4182        instance: Instance,
4183        caller: RuntimeComponentInstanceIndex,
4184        ty: TypeStreamTableIndex,
4185        options: OptionsIndex,
4186        stream: u32,
4187        address: u32,
4188        count: u32,
4189    ) -> Result<u32> {
4190        instance
4191            .guest_write(
4192                StoreContextMut(self),
4193                caller,
4194                TransmitIndex::Stream(ty),
4195                options,
4196                None,
4197                stream,
4198                address,
4199                count,
4200            )
4201            .map(|result| result.encode())
4202    }
4203
4204    fn stream_read(
4205        &mut self,
4206        instance: Instance,
4207        caller: RuntimeComponentInstanceIndex,
4208        ty: TypeStreamTableIndex,
4209        options: OptionsIndex,
4210        stream: u32,
4211        address: u32,
4212        count: u32,
4213    ) -> Result<u32> {
4214        instance
4215            .guest_read(
4216                StoreContextMut(self),
4217                caller,
4218                TransmitIndex::Stream(ty),
4219                options,
4220                None,
4221                stream,
4222                address,
4223                count,
4224            )
4225            .map(|result| result.encode())
4226    }
4227
4228    fn future_drop_writable(
4229        &mut self,
4230        instance: Instance,
4231        ty: TypeFutureTableIndex,
4232        writer: u32,
4233    ) -> Result<()> {
4234        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4235    }
4236
4237    fn flat_stream_write(
4238        &mut self,
4239        instance: Instance,
4240        caller: RuntimeComponentInstanceIndex,
4241        ty: TypeStreamTableIndex,
4242        options: OptionsIndex,
4243        payload_size: u32,
4244        payload_align: u32,
4245        stream: u32,
4246        address: u32,
4247        count: u32,
4248    ) -> Result<u32> {
4249        instance
4250            .guest_write(
4251                StoreContextMut(self),
4252                caller,
4253                TransmitIndex::Stream(ty),
4254                options,
4255                Some(FlatAbi {
4256                    size: payload_size,
4257                    align: payload_align,
4258                }),
4259                stream,
4260                address,
4261                count,
4262            )
4263            .map(|result| result.encode())
4264    }
4265
4266    fn flat_stream_read(
4267        &mut self,
4268        instance: Instance,
4269        caller: RuntimeComponentInstanceIndex,
4270        ty: TypeStreamTableIndex,
4271        options: OptionsIndex,
4272        payload_size: u32,
4273        payload_align: u32,
4274        stream: u32,
4275        address: u32,
4276        count: u32,
4277    ) -> Result<u32> {
4278        instance
4279            .guest_read(
4280                StoreContextMut(self),
4281                caller,
4282                TransmitIndex::Stream(ty),
4283                options,
4284                Some(FlatAbi {
4285                    size: payload_size,
4286                    align: payload_align,
4287                }),
4288                stream,
4289                address,
4290                count,
4291            )
4292            .map(|result| result.encode())
4293    }
4294
4295    fn stream_drop_writable(
4296        &mut self,
4297        instance: Instance,
4298        ty: TypeStreamTableIndex,
4299        writer: u32,
4300    ) -> Result<()> {
4301        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4302    }
4303
4304    fn error_context_debug_message(
4305        &mut self,
4306        instance: Instance,
4307        ty: TypeComponentLocalErrorContextTableIndex,
4308        options: OptionsIndex,
4309        err_ctx_handle: u32,
4310        debug_msg_address: u32,
4311    ) -> Result<()> {
4312        instance.error_context_debug_message(
4313            StoreContextMut(self),
4314            ty,
4315            options,
4316            err_ctx_handle,
4317            debug_msg_address,
4318        )
4319    }
4320
4321    fn thread_new_indirect(
4322        &mut self,
4323        instance: Instance,
4324        caller: RuntimeComponentInstanceIndex,
4325        func_ty_idx: TypeFuncIndex,
4326        start_func_table_idx: RuntimeTableIndex,
4327        start_func_idx: u32,
4328        context: i32,
4329    ) -> Result<u32> {
4330        instance.thread_new_indirect(
4331            StoreContextMut(self),
4332            caller,
4333            func_ty_idx,
4334            start_func_table_idx,
4335            start_func_idx,
4336            context,
4337        )
4338    }
4339}
4340
4341type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4342
4343/// Represents the state of a pending host task.
4344///
4345/// This is used to represent tasks when the guest calls into the host.
4346pub(crate) struct HostTask {
4347    common: WaitableCommon,
4348
4349    /// Guest thread which called the host.
4350    caller: QualifiedThreadId,
4351
4352    /// State of borrows/etc the host needs to track. Used when the guest passes
4353    /// borrows to the host, for example.
4354    call_context: CallContext,
4355
4356    state: HostTaskState,
4357}
4358
4359enum HostTaskState {
4360    /// A host task has been created and it's considered "started".
4361    ///
4362    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4363    /// is where this will get updated further.
4364    CalleeStarted,
4365
4366    /// State used for tasks in `first_poll` meaning that the guest did an async
4367    /// lower of a host async function which is blocked. The specified handle is
4368    /// linked to the future in the main `FuturesUnordered` of a store which is
4369    /// used to cancel it if the guest requests cancellation.
4370    CalleeRunning(JoinHandle),
4371
4372    /// Terminal state used for tasks in `poll_and_block` to store the result of
4373    /// their computation. Note that this state is not used for tasks in
4374    /// `first_poll`.
4375    CalleeFinished(LiftedResult),
4376
4377    /// Terminal state for host tasks meaning that the task was cancelled or the
4378    /// result was taken.
4379    CalleeDone { cancelled: bool },
4380}
4381
4382impl HostTask {
4383    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4384        Self {
4385            common: WaitableCommon::default(),
4386            call_context: CallContext::default(),
4387            caller,
4388            state,
4389        }
4390    }
4391}
4392
4393impl TableDebug for HostTask {
4394    fn type_name() -> &'static str {
4395        "HostTask"
4396    }
4397}
4398
4399type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4400
4401/// Represents the caller of a given guest task.
4402enum Caller {
4403    /// The host called the guest task.
4404    Host {
4405        /// If present, may be used to deliver the result.
4406        tx: Option<oneshot::Sender<LiftedResult>>,
4407        /// If true, there's a host future that must be dropped before the task
4408        /// can be deleted.
4409        host_future_present: bool,
4410        /// Represents the caller of the host function which called back into a
4411        /// guest. Note that this thread could belong to an entirely unrelated
4412        /// top-level component instance than the one the host called into.
4413        caller: CurrentThread,
4414    },
4415    /// Another guest thread called the guest task
4416    Guest {
4417        /// The id of the caller
4418        thread: QualifiedThreadId,
4419    },
4420}
4421
4422/// Represents a closure and related canonical ABI parameters required to
4423/// validate a `task.return` call at runtime and lift the result.
4424struct LiftResult {
4425    lift: RawLift,
4426    ty: TypeTupleIndex,
4427    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4428    string_encoding: StringEncoding,
4429}
4430
4431/// The table ID for a guest thread, qualified by the task to which it belongs.
4432///
4433/// This exists to minimize table lookups and the necessity to pass stores around mutably
4434/// for the common case of identifying the task to which a thread belongs.
4435#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4436pub(crate) struct QualifiedThreadId {
4437    task: TableId<GuestTask>,
4438    thread: TableId<GuestThread>,
4439}
4440
4441impl QualifiedThreadId {
4442    fn qualify(
4443        state: &mut ConcurrentState,
4444        thread: TableId<GuestThread>,
4445    ) -> Result<QualifiedThreadId> {
4446        Ok(QualifiedThreadId {
4447            task: state.get_mut(thread)?.parent_task,
4448            thread,
4449        })
4450    }
4451}
4452
4453impl fmt::Debug for QualifiedThreadId {
4454    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4455        f.debug_tuple("QualifiedThreadId")
4456            .field(&self.task.rep())
4457            .field(&self.thread.rep())
4458            .finish()
4459    }
4460}
4461
4462enum GuestThreadState {
4463    NotStartedImplicit,
4464    NotStartedExplicit(
4465        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4466    ),
4467    Running,
4468    Suspended(StoreFiber<'static>),
4469    Ready(StoreFiber<'static>),
4470    Completed,
4471}
4472pub struct GuestThread {
4473    /// Context-local state used to implement the `context.{get,set}`
4474    /// intrinsics.
4475    context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4476    /// The owning guest task.
4477    parent_task: TableId<GuestTask>,
4478    /// If present, indicates that the thread is currently waiting on the
4479    /// specified set but may be cancelled and woken immediately.
4480    wake_on_cancel: Option<TableId<WaitableSet>>,
4481    /// The execution state of this guest thread
4482    state: GuestThreadState,
4483    /// The index of this thread in the component instance's handle table.
4484    /// This must always be `Some` after initialization.
4485    instance_rep: Option<u32>,
4486    /// Scratch waitable set used to watch subtasks during synchronous calls.
4487    sync_call_set: TableId<WaitableSet>,
4488}
4489
4490impl GuestThread {
4491    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4492    /// handle.
4493    fn from_instance(
4494        state: Pin<&mut ComponentInstance>,
4495        caller_instance: RuntimeComponentInstanceIndex,
4496        guest_thread: u32,
4497    ) -> Result<TableId<Self>> {
4498        let rep = state.instance_states().0[caller_instance]
4499            .thread_handle_table()
4500            .guest_thread_rep(guest_thread)?;
4501        Ok(TableId::new(rep))
4502    }
4503
4504    fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4505        let sync_call_set = state.push(WaitableSet::default())?;
4506        Ok(Self {
4507            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4508            parent_task,
4509            wake_on_cancel: None,
4510            state: GuestThreadState::NotStartedImplicit,
4511            instance_rep: None,
4512            sync_call_set,
4513        })
4514    }
4515
4516    fn new_explicit(
4517        state: &mut ConcurrentState,
4518        parent_task: TableId<GuestTask>,
4519        start_func: Box<
4520            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4521        >,
4522    ) -> Result<Self> {
4523        let sync_call_set = state.push(WaitableSet::default())?;
4524        Ok(Self {
4525            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4526            parent_task,
4527            wake_on_cancel: None,
4528            state: GuestThreadState::NotStartedExplicit(start_func),
4529            instance_rep: None,
4530            sync_call_set,
4531        })
4532    }
4533}
4534
4535impl TableDebug for GuestThread {
4536    fn type_name() -> &'static str {
4537        "GuestThread"
4538    }
4539}
4540
4541enum SyncResult {
4542    NotProduced,
4543    Produced(Option<ValRaw>),
4544    Taken,
4545}
4546
4547impl SyncResult {
4548    fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4549        Ok(match mem::replace(self, SyncResult::Taken) {
4550            SyncResult::NotProduced => None,
4551            SyncResult::Produced(val) => Some(val),
4552            SyncResult::Taken => {
4553                bail_bug!("attempted to take a synchronous result that was already taken")
4554            }
4555        })
4556    }
4557}
4558
4559#[derive(Debug)]
4560enum HostFutureState {
4561    NotApplicable,
4562    Live,
4563    Dropped,
4564}
4565
4566/// Represents a pending guest task.
4567pub(crate) struct GuestTask {
4568    /// See `WaitableCommon`
4569    common: WaitableCommon,
4570    /// Closure to lower the parameters passed to this task.
4571    lower_params: Option<RawLower>,
4572    /// See `LiftResult`
4573    lift_result: Option<LiftResult>,
4574    /// A place to stash the type-erased lifted result if it can't be delivered
4575    /// immediately.
4576    result: Option<LiftedResult>,
4577    /// Closure to call the callback function for an async-lifted export, if
4578    /// provided.
4579    callback: Option<CallbackFn>,
4580    /// See `Caller`
4581    caller: Caller,
4582    /// Borrow state for this task.
4583    ///
4584    /// Keeps track of `borrow<T>` received to this task to ensure that
4585    /// everything is dropped by the time it exits.
4586    call_context: CallContext,
4587    /// A place to stash the lowered result for a sync-to-async call until it
4588    /// can be returned to the caller.
4589    sync_result: SyncResult,
4590    /// Whether or not the task has been cancelled (i.e. whether the task is
4591    /// permitted to call `task.cancel`).
4592    cancel_sent: bool,
4593    /// Whether or not we've sent a `Status::Starting` event to any current or
4594    /// future waiters for this waitable.
4595    starting_sent: bool,
4596    /// The runtime instance to which the exported function for this guest task
4597    /// belongs.
4598    ///
4599    /// Note that the task may do a sync->sync call via a fused adapter which
4600    /// results in that task executing code in a different instance, and it may
4601    /// call host functions and intrinsics from that other instance.
4602    instance: RuntimeInstance,
4603    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4604    /// delivered to this task.
4605    event: Option<Event>,
4606    /// Whether or not the task has exited.
4607    exited: bool,
4608    /// Threads belonging to this task
4609    threads: HashSet<TableId<GuestThread>>,
4610    /// The state of the host future that represents an async task, which must
4611    /// be dropped before we can delete the task.
4612    host_future_state: HostFutureState,
4613    /// Indicates whether this task was created for a call to an async-lifted
4614    /// export.
4615    async_function: bool,
4616
4617    decremented_interesting_task_count: bool,
4618}
4619
4620impl GuestTask {
4621    fn already_lowered_parameters(&self) -> bool {
4622        // We reset `lower_params` after we lower the parameters
4623        self.lower_params.is_none()
4624    }
4625
4626    fn returned_or_cancelled(&self) -> bool {
4627        // We reset `lift_result` after we return or exit
4628        self.lift_result.is_none()
4629    }
4630
4631    fn ready_to_delete(&self) -> bool {
4632        let threads_completed = self.threads.is_empty();
4633        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4634        let pending_completion_event = matches!(
4635            self.common.event,
4636            Some(Event::Subtask {
4637                status: Status::Returned | Status::ReturnCancelled
4638            })
4639        );
4640        let ready = threads_completed
4641            && !has_sync_result
4642            && !pending_completion_event
4643            && !matches!(self.host_future_state, HostFutureState::Live);
4644        log::trace!(
4645            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4646            threads_completed,
4647            has_sync_result,
4648            pending_completion_event,
4649            self.host_future_state
4650        );
4651        ready
4652    }
4653
4654    fn new(
4655        state: &mut ConcurrentState,
4656        lower_params: RawLower,
4657        lift_result: LiftResult,
4658        caller: Caller,
4659        callback: Option<CallbackFn>,
4660        instance: RuntimeInstance,
4661        async_function: bool,
4662    ) -> Result<QualifiedThreadId> {
4663        let host_future_state = match &caller {
4664            Caller::Guest { .. } => HostFutureState::NotApplicable,
4665            Caller::Host {
4666                host_future_present,
4667                ..
4668            } => {
4669                if *host_future_present {
4670                    HostFutureState::Live
4671                } else {
4672                    HostFutureState::NotApplicable
4673                }
4674            }
4675        };
4676        let task = state.push(Self {
4677            common: WaitableCommon::default(),
4678            lower_params: Some(lower_params),
4679            lift_result: Some(lift_result),
4680            result: None,
4681            callback,
4682            caller,
4683            call_context: CallContext::default(),
4684            sync_result: SyncResult::NotProduced,
4685            cancel_sent: false,
4686            starting_sent: false,
4687            instance,
4688            event: None,
4689            exited: false,
4690            threads: HashSet::new(),
4691            host_future_state,
4692            async_function,
4693            decremented_interesting_task_count: false,
4694        })?;
4695        let new_thread = GuestThread::new_implicit(state, task)?;
4696        let thread = state.push(new_thread)?;
4697        state.get_mut(task)?.threads.insert(thread);
4698        state.interesting_tasks += 1;
4699        Ok(QualifiedThreadId { task, thread })
4700    }
4701}
4702
4703impl TableDebug for GuestTask {
4704    fn type_name() -> &'static str {
4705        "GuestTask"
4706    }
4707}
4708
4709/// Represents state common to all kinds of waitables.
4710#[derive(Default)]
4711struct WaitableCommon {
4712    /// The currently pending event for this waitable, if any.
4713    event: Option<Event>,
4714    /// The set to which this waitable belongs, if any.
4715    set: Option<TableId<WaitableSet>>,
4716    /// The handle with which the guest refers to this waitable, if any.
4717    handle: Option<u32>,
4718}
4719
4720/// Represents a Component Model Async `waitable`.
4721#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4722enum Waitable {
4723    /// A host task
4724    Host(TableId<HostTask>),
4725    /// A guest task
4726    Guest(TableId<GuestTask>),
4727    /// The read or write end of a stream or future
4728    Transmit(TableId<TransmitHandle>),
4729}
4730
4731impl Waitable {
4732    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4733    /// handle.
4734    fn from_instance(
4735        state: Pin<&mut ComponentInstance>,
4736        caller_instance: RuntimeComponentInstanceIndex,
4737        waitable: u32,
4738    ) -> Result<Self> {
4739        use crate::runtime::vm::component::Waitable;
4740
4741        let (waitable, kind) = state.instance_states().0[caller_instance]
4742            .handle_table()
4743            .waitable_rep(waitable)?;
4744
4745        Ok(match kind {
4746            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4747            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4748            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4749        })
4750    }
4751
4752    /// Retrieve the host-visible identifier for this `Waitable`.
4753    fn rep(&self) -> u32 {
4754        match self {
4755            Self::Host(id) => id.rep(),
4756            Self::Guest(id) => id.rep(),
4757            Self::Transmit(id) => id.rep(),
4758        }
4759    }
4760
4761    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4762    /// remove it from any set it may currently belong to (when `set` is
4763    /// `None`).
4764    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4765        log::trace!("waitable {self:?} join set {set:?}",);
4766
4767        let old = mem::replace(&mut self.common(state)?.set, set);
4768
4769        if let Some(old) = old {
4770            match *self {
4771                Waitable::Host(id) => state.remove_child(id, old),
4772                Waitable::Guest(id) => state.remove_child(id, old),
4773                Waitable::Transmit(id) => state.remove_child(id, old),
4774            }?;
4775
4776            state.get_mut(old)?.ready.remove(self);
4777        }
4778
4779        if let Some(set) = set {
4780            match *self {
4781                Waitable::Host(id) => state.add_child(id, set),
4782                Waitable::Guest(id) => state.add_child(id, set),
4783                Waitable::Transmit(id) => state.add_child(id, set),
4784            }?;
4785
4786            if self.common(state)?.event.is_some() {
4787                self.mark_ready(state)?;
4788            }
4789        }
4790
4791        Ok(())
4792    }
4793
4794    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4795    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4796        Ok(match self {
4797            Self::Host(id) => &mut state.get_mut(*id)?.common,
4798            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4799            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4800        })
4801    }
4802
4803    /// Set or clear the pending event for this waitable and either deliver it
4804    /// to the first waiter, if any, or mark it as ready to be delivered to the
4805    /// next waiter that arrives.
4806    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4807        log::trace!("set event for {self:?}: {event:?}");
4808        self.common(state)?.event = event;
4809        self.mark_ready(state)
4810    }
4811
4812    /// Take the pending event from this waitable, leaving `None` in its place.
4813    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4814        let common = self.common(state)?;
4815        let event = common.event.take();
4816        if let Some(set) = self.common(state)?.set {
4817            state.get_mut(set)?.ready.remove(self);
4818        }
4819
4820        Ok(event)
4821    }
4822
4823    /// Deliver the current event for this waitable to the first waiter, if any,
4824    /// or else mark it as ready to be delivered to the next waiter that
4825    /// arrives.
4826    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4827        if let Some(set) = self.common(state)?.set {
4828            state.get_mut(set)?.ready.insert(*self);
4829            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4830                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4831                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4832
4833                let item = match mode {
4834                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4835                    WaitMode::Callback(instance) => WorkItem::GuestCall(
4836                        state.get_mut(thread.task)?.instance.index,
4837                        GuestCall {
4838                            thread,
4839                            kind: GuestCallKind::DeliverEvent {
4840                                instance,
4841                                set: Some(set),
4842                            },
4843                        },
4844                    ),
4845                };
4846                state.push_high_priority(item);
4847            }
4848        }
4849        Ok(())
4850    }
4851
4852    /// Remove this waitable from the instance's rep table.
4853    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4854        match self {
4855            Self::Host(task) => {
4856                log::trace!("delete host task {task:?}");
4857                state.delete(*task)?;
4858            }
4859            Self::Guest(task) => {
4860                log::trace!("delete guest task {task:?}");
4861                let task = state.delete(*task)?;
4862
4863                // When a guest task is created it increments the
4864                // `ConcurrentState::interesting_tasks` counter, and that needs
4865                // to be paired with a decrement. There are a few situations in
4866                // which the decrement needs to happen which don't all funnel
4867                // through here, so in lieu of that at least try to catch issues
4868                // where we forgot to do a decrement.
4869                debug_assert!(task.decremented_interesting_task_count);
4870            }
4871            Self::Transmit(task) => {
4872                state.delete(*task)?;
4873            }
4874        }
4875
4876        Ok(())
4877    }
4878}
4879
4880impl fmt::Debug for Waitable {
4881    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4882        match self {
4883            Self::Host(id) => write!(f, "{id:?}"),
4884            Self::Guest(id) => write!(f, "{id:?}"),
4885            Self::Transmit(id) => write!(f, "{id:?}"),
4886        }
4887    }
4888}
4889
4890/// Represents a Component Model Async `waitable-set`.
4891#[derive(Default)]
4892struct WaitableSet {
4893    /// Which waitables in this set have pending events, if any.
4894    ready: BTreeSet<Waitable>,
4895    /// Which guest threads are currently waiting on this set, if any.
4896    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4897}
4898
4899impl TableDebug for WaitableSet {
4900    fn type_name() -> &'static str {
4901        "WaitableSet"
4902    }
4903}
4904
4905/// Type-erased closure to lower the parameters for a guest task.
4906type RawLower =
4907    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4908
4909/// Type-erased closure to lift the result for a guest task.
4910type RawLift = Box<
4911    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4912>;
4913
4914/// Type erased result of a guest task which may be downcast to the expected
4915/// type by a host caller (or simply ignored in the case of a guest caller; see
4916/// `DummyResult`).
4917type LiftedResult = Box<dyn Any + Send + Sync>;
4918
4919/// Used to return a result from a `LiftFn` when the actual result has already
4920/// been lowered to a guest task's stack and linear memory.
4921struct DummyResult;
4922
4923/// Represents the Component Model Async state of a (sub-)component instance.
4924#[derive(Default)]
4925pub struct ConcurrentInstanceState {
4926    /// Whether backpressure is set for this instance (enabled if >0)
4927    backpressure: u16,
4928    /// Whether this instance can be entered
4929    do_not_enter: bool,
4930    /// Pending calls for this instance which require `Self::backpressure` to be
4931    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4932    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4933}
4934
4935impl ConcurrentInstanceState {
4936    pub fn pending_is_empty(&self) -> bool {
4937        self.pending.is_empty()
4938    }
4939}
4940
4941#[derive(Debug, Copy, Clone)]
4942pub(crate) enum CurrentThread {
4943    Guest(QualifiedThreadId),
4944    Host(TableId<HostTask>),
4945    None,
4946}
4947
4948impl CurrentThread {
4949    fn guest(&self) -> Option<&QualifiedThreadId> {
4950        match self {
4951            Self::Guest(id) => Some(id),
4952            _ => None,
4953        }
4954    }
4955
4956    fn host(&self) -> Option<TableId<HostTask>> {
4957        match self {
4958            Self::Host(id) => Some(*id),
4959            _ => None,
4960        }
4961    }
4962
4963    fn is_none(&self) -> bool {
4964        matches!(self, Self::None)
4965    }
4966}
4967
4968impl From<QualifiedThreadId> for CurrentThread {
4969    fn from(id: QualifiedThreadId) -> Self {
4970        Self::Guest(id)
4971    }
4972}
4973
4974impl From<TableId<HostTask>> for CurrentThread {
4975    fn from(id: TableId<HostTask>) -> Self {
4976        Self::Host(id)
4977    }
4978}
4979
4980/// Represents the Component Model Async state of a store.
4981pub struct ConcurrentState {
4982    /// The currently running thread, if any.
4983    current_thread: CurrentThread,
4984
4985    /// The set of pending host and background tasks, if any.
4986    ///
4987    /// See `ComponentInstance::poll_until` for where we temporarily take this
4988    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4989    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4990    /// The table of waitables, waitable sets, etc.
4991    table: AlwaysMut<ResourceTable>,
4992    /// The "high priority" work queue for this store's event loop.
4993    high_priority: Vec<WorkItem>,
4994    /// The "low priority" work queue for this store's event loop.
4995    low_priority: VecDeque<WorkItem>,
4996    /// A place to stash the reason a fiber is suspending so that the code which
4997    /// resumed it will know under what conditions the fiber should be resumed
4998    /// again.
4999    suspend_reason: Option<SuspendReason>,
5000    /// A cached fiber which is waiting for work to do.
5001    ///
5002    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
5003    worker: Option<StoreFiber<'static>>,
5004    /// A place to stash the work item for which we're resuming a worker fiber.
5005    worker_item: Option<WorkerItem>,
5006
5007    /// Reference counts for all component error contexts
5008    ///
5009    /// NOTE: it is possible the global ref count to be *greater* than the sum of
5010    /// (sub)component ref counts as tracked by `error_context_tables`, for
5011    /// example when the host holds one or more references to error contexts.
5012    ///
5013    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
5014    /// component-wide representation) of the index into concurrent state for a given
5015    /// stored `ErrorContext`.
5016    ///
5017    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
5018    /// as a `TableId<ErrorContextState>`.
5019    global_error_context_ref_counts:
5020        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5021
5022    /// The number of "interesting tasks" currently executing in the store.
5023    ///
5024    /// This tracks the concept of a component instance lifetime as defined in
5025    /// https://github.com/WebAssembly/component-model/pull/643. Specifically
5026    /// all tasks currently increment this counter which then gets decremented
5027    /// when they exit. In the future some tasks might not increment this
5028    /// counter, but for now all do.
5029    ///
5030    /// This is used to implement `Accessor::poll_no_interesting_tasks` to
5031    /// inform the embedder when all tasks have completed. This is then
5032    /// used in wasmtime-wasi-http, for example, to know when an instance is
5033    /// idle.
5034    interesting_tasks: usize,
5035
5036    /// Single waker to notify when `interesting_tasks` reaches 0.
5037    ///
5038    /// Used in the implementation of `Accessor::poll_no_interesting_tasks`.
5039    interesting_tasks_empty_waker: Option<Waker>,
5040}
5041
5042impl Default for ConcurrentState {
5043    fn default() -> Self {
5044        Self {
5045            current_thread: CurrentThread::None,
5046            table: AlwaysMut::new(ResourceTable::new()),
5047            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5048            high_priority: Vec::new(),
5049            low_priority: VecDeque::new(),
5050            suspend_reason: None,
5051            worker: None,
5052            worker_item: None,
5053            global_error_context_ref_counts: BTreeMap::new(),
5054            interesting_tasks: 0,
5055            interesting_tasks_empty_waker: None,
5056        }
5057    }
5058}
5059
5060impl ConcurrentState {
5061    /// Take ownership of any fibers and futures owned by this object.
5062    ///
5063    /// This should be used when disposing of the `Store` containing this object
5064    /// in order to gracefully resolve any and all fibers using
5065    /// `StoreFiber::dispose`.  This is necessary to avoid possible
5066    /// use-after-free bugs due to fibers which may still have access to the
5067    /// `Store`.
5068    ///
5069    /// Additionally, the futures collected with this function should be dropped
5070    /// within a `tls::set` call, which will ensure than any futures closing
5071    /// over an `&Accessor` will have access to the store when dropped, allowing
5072    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
5073    /// panicking.
5074    ///
5075    /// Note that this will leave the object in an inconsistent and unusable
5076    /// state, so it should only be used just prior to dropping it.
5077    pub(crate) fn take_fibers_and_futures(
5078        &mut self,
5079        fibers: &mut Vec<StoreFiber<'static>>,
5080        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5081    ) {
5082        for entry in self.table.get_mut().iter_mut() {
5083            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5084                for mode in mem::take(&mut set.waiting).into_values() {
5085                    if let WaitMode::Fiber(fiber) = mode {
5086                        fibers.push(fiber);
5087                    }
5088                }
5089            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5090                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
5091                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5092                {
5093                    fibers.push(fiber);
5094                }
5095            }
5096        }
5097
5098        if let Some(fiber) = self.worker.take() {
5099            fibers.push(fiber);
5100        }
5101
5102        let mut handle_item = |item| match item {
5103            WorkItem::ResumeFiber(fiber) => {
5104                fibers.push(fiber);
5105            }
5106            WorkItem::PushFuture(future) => {
5107                self.futures
5108                    .get_mut()
5109                    .as_mut()
5110                    .unwrap()
5111                    .push(future.into_inner());
5112            }
5113            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5114            }
5115        };
5116
5117        for item in mem::take(&mut self.high_priority) {
5118            handle_item(item);
5119        }
5120        for item in mem::take(&mut self.low_priority) {
5121            handle_item(item);
5122        }
5123
5124        if let Some(them) = self.futures.get_mut().take() {
5125            futures.push(them);
5126        }
5127    }
5128
5129    fn push<V: Send + Sync + 'static>(
5130        &mut self,
5131        value: V,
5132    ) -> Result<TableId<V>, ResourceTableError> {
5133        self.table.get_mut().push(value).map(TableId::from)
5134    }
5135
5136    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5137        self.table.get_mut().get_mut(&Resource::from(id))
5138    }
5139
5140    pub fn add_child<T: 'static, U: 'static>(
5141        &mut self,
5142        child: TableId<T>,
5143        parent: TableId<U>,
5144    ) -> Result<(), ResourceTableError> {
5145        self.table
5146            .get_mut()
5147            .add_child(Resource::from(child), Resource::from(parent))
5148    }
5149
5150    pub fn remove_child<T: 'static, U: 'static>(
5151        &mut self,
5152        child: TableId<T>,
5153        parent: TableId<U>,
5154    ) -> Result<(), ResourceTableError> {
5155        self.table
5156            .get_mut()
5157            .remove_child(Resource::from(child), Resource::from(parent))
5158    }
5159
5160    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5161        self.table.get_mut().delete(Resource::from(id))
5162    }
5163
5164    fn push_future(&mut self, future: HostTaskFuture) {
5165        // Note that we can't directly push to `ConcurrentState::futures` here
5166        // since this may be called from a future that's being polled inside
5167        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5168        // so it has exclusive access while polling it.  Therefore, we push a
5169        // work item to the "high priority" queue, which will actually push to
5170        // `ConcurrentState::futures` later.
5171        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5172    }
5173
5174    fn push_high_priority(&mut self, item: WorkItem) {
5175        log::trace!("push high priority: {item:?}");
5176        self.high_priority.push(item);
5177    }
5178
5179    fn push_low_priority(&mut self, item: WorkItem) {
5180        log::trace!("push low priority: {item:?}");
5181        self.low_priority.push_front(item);
5182    }
5183
5184    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5185        if high_priority {
5186            self.push_high_priority(item);
5187        } else {
5188            self.push_low_priority(item);
5189        }
5190    }
5191
5192    fn promote_instance_local_thread_work_item(
5193        &mut self,
5194        current_instance: RuntimeComponentInstanceIndex,
5195    ) -> bool {
5196        self.promote_work_items_matching(|item: &WorkItem| match item {
5197            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5198                *instance == current_instance
5199            }
5200            _ => false,
5201        })
5202    }
5203
5204    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5205        self.promote_work_items_matching(|item: &WorkItem| match item {
5206            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5207                *t == thread
5208            }
5209            _ => false,
5210        })
5211    }
5212
5213    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5214    where
5215        F: FnMut(&WorkItem) -> bool,
5216    {
5217        // If there's a high-priority work item to resume the current guest thread,
5218        // we don't need to promote anything, but we return true to indicate that
5219        // work is pending for the current instance.
5220        if self.high_priority.iter().any(&mut predicate) {
5221            true
5222        }
5223        // Otherwise, look for a low-priority work item that matches the current
5224        // instance and promote it to high-priority.
5225        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5226            let item = self.low_priority.remove(idx).unwrap();
5227            self.push_high_priority(item);
5228            true
5229        } else {
5230            false
5231        }
5232    }
5233
5234    /// Returns whether there's a pending cancellation on the current guest thread,
5235    /// consuming the event if so.
5236    fn take_pending_cancellation(&mut self) -> Result<bool> {
5237        let thread = self.current_guest_thread()?;
5238        if let Some(event) = self.get_mut(thread.task)?.event.take() {
5239            assert!(matches!(event, Event::Cancelled));
5240            Ok(true)
5241        } else {
5242            Ok(false)
5243        }
5244    }
5245
5246    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5247        if self.may_block(task)? {
5248            Ok(())
5249        } else {
5250            Err(Trap::CannotBlockSyncTask.into())
5251        }
5252    }
5253
5254    fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5255        let task = self.get_mut(task)?;
5256        Ok(task.async_function || task.returned_or_cancelled())
5257    }
5258
5259    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5260    /// specified task.
5261    ///
5262    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5263    /// below.
5264    pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5265        let (task, is_host) = (task >> 1, task & 1 == 1);
5266        if is_host {
5267            let task: TableId<HostTask> = TableId::new(task);
5268            Ok(&mut self.get_mut(task)?.call_context)
5269        } else {
5270            let task: TableId<GuestTask> = TableId::new(task);
5271            Ok(&mut self.get_mut(task)?.call_context)
5272        }
5273    }
5274
5275    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5276    /// in the future.
5277    pub fn current_call_context_scope_id(&self) -> Result<u32> {
5278        let (bits, is_host) = match self.current_thread {
5279            CurrentThread::Guest(id) => (id.task.rep(), false),
5280            CurrentThread::Host(id) => (id.rep(), true),
5281            CurrentThread::None => bail_bug!("current thread is not set"),
5282        };
5283        assert_eq!((bits << 1) >> 1, bits);
5284        Ok((bits << 1) | u32::from(is_host))
5285    }
5286
5287    fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5288        match self.current_thread.guest() {
5289            Some(id) => Ok(*id),
5290            None => bail_bug!("current thread is not a guest thread"),
5291        }
5292    }
5293
5294    fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5295        match self.current_thread.host() {
5296            Some(id) => Ok(id),
5297            None => bail_bug!("current thread is not a host thread"),
5298        }
5299    }
5300
5301    fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5302        match self.futures.get_mut().as_mut() {
5303            Some(f) => Ok(f),
5304            None => bail_bug!("futures field of concurrent state is currently taken"),
5305        }
5306    }
5307
5308    pub(crate) fn table(&mut self) -> &mut ResourceTable {
5309        self.table.get_mut()
5310    }
5311}
5312
5313/// Provide a type hint to compiler about the shape of a parameter lower
5314/// closure.
5315fn for_any_lower<
5316    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5317>(
5318    fun: F,
5319) -> F {
5320    fun
5321}
5322
5323/// Provide a type hint to compiler about the shape of a result lift closure.
5324fn for_any_lift<
5325    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5326>(
5327    fun: F,
5328) -> F {
5329    fun
5330}
5331
5332/// Wrap the specified future in a `poll_fn` which asserts that the future is
5333/// only polled from the event loop of the specified `Store`.
5334///
5335/// See `StoreContextMut::run_concurrent` for details.
5336fn checked<F: Future + Send + 'static>(
5337    id: StoreId,
5338    fut: F,
5339) -> impl Future<Output = F::Output> + Send + 'static {
5340    async move {
5341        let mut fut = pin!(fut);
5342        future::poll_fn(move |cx| {
5343            let message = "\
5344                `Future`s which depend on asynchronous component tasks, streams, or \
5345                futures to complete may only be polled from the event loop of the \
5346                store to which they belong.  Please use \
5347                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5348            ";
5349            tls::try_get(|store| {
5350                let matched = match store {
5351                    tls::TryGet::Some(store) => store.id() == id,
5352                    tls::TryGet::Taken | tls::TryGet::None => false,
5353                };
5354
5355                if !matched {
5356                    panic!("{message}")
5357                }
5358            });
5359            fut.as_mut().poll(cx)
5360        })
5361        .await
5362    }
5363}
5364
5365/// Assert that `StoreContextMut::run_concurrent` has not been called from
5366/// within an store's event loop.
5367fn check_recursive_run() {
5368    tls::try_get(|store| {
5369        if !matches!(store, tls::TryGet::None) {
5370            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5371        }
5372    });
5373}
5374
5375fn unpack_callback_code(code: u32) -> (u32, u32) {
5376    (code & 0xF, code >> 4)
5377}
5378
5379/// Helper struct for packaging parameters to be passed to
5380/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5381/// `waitable-set.poll`.
5382struct WaitableCheckParams {
5383    set: TableId<WaitableSet>,
5384    options: OptionsIndex,
5385    payload: u32,
5386}
5387
5388/// Indicates whether `ComponentInstance::waitable_check` is being called for
5389/// `waitable-set.wait` or `waitable-set.poll`.
5390enum WaitableCheck {
5391    Wait,
5392    Poll,
5393}
5394
5395/// Represents a guest task called from the host, prepared using `prepare_call`.
5396pub(crate) struct PreparedCall<R> {
5397    /// The guest export to be called
5398    handle: Func,
5399    /// The guest thread created by `prepare_call`
5400    thread: QualifiedThreadId,
5401    /// The number of lowered core Wasm parameters to pass to the call.
5402    param_count: usize,
5403    /// The `oneshot::Receiver` to which the result of the call will be
5404    /// delivered when it is available.
5405    rx: oneshot::Receiver<LiftedResult>,
5406    /// The instance that this call is prepared for.
5407    runtime_instance: RuntimeInstance,
5408    _phantom: PhantomData<R>,
5409}
5410
5411impl<R> PreparedCall<R> {
5412    /// Get a copy of the `TaskId` for this `PreparedCall`.
5413    pub(crate) fn task_id(&self) -> TaskId {
5414        TaskId {
5415            task: self.thread.task,
5416            runtime_instance: self.runtime_instance,
5417        }
5418    }
5419}
5420
5421/// Represents a task created by `prepare_call`.
5422pub(crate) struct TaskId {
5423    task: TableId<GuestTask>,
5424    runtime_instance: RuntimeInstance,
5425}
5426
5427impl TaskId {
5428    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5429    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5430    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5431    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5432    /// and delete the task when all threads are done.
5433    pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5434        let task = store.concurrent_state_mut().get_mut(self.task)?;
5435        let delete = if !task.already_lowered_parameters() {
5436            store.cancel_guest_subtask_without_lowered_parameters(
5437                self.runtime_instance,
5438                self.task,
5439            )?;
5440            true
5441        } else {
5442            task.host_future_state = HostFutureState::Dropped;
5443            task.ready_to_delete()
5444        };
5445        if delete {
5446            Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5447        }
5448        Ok(())
5449    }
5450}
5451
5452/// Prepare a call to the specified exported Wasm function, providing functions
5453/// for lowering the parameters and lifting the result.
5454///
5455/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5456/// loop, use `queue_call`.
5457pub(crate) fn prepare_call<T, R>(
5458    mut store: StoreContextMut<T>,
5459    handle: Func,
5460    param_count: usize,
5461    host_future_present: bool,
5462    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5463    + Send
5464    + Sync
5465    + 'static,
5466    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5467    + Send
5468    + Sync
5469    + 'static,
5470) -> Result<PreparedCall<R>> {
5471    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5472
5473    let instance = handle.instance().id().get(store.0);
5474    let options = &instance.component().env_component().options[options];
5475    let ty = &instance.component().types()[ty];
5476    let async_function = ty.async_;
5477    let task_return_type = ty.results;
5478    let component_instance = raw_options.instance;
5479    let callback = options.callback.map(|i| instance.runtime_callback(i));
5480    let memory = options
5481        .memory()
5482        .map(|i| instance.runtime_memory(i))
5483        .map(SendSyncPtr::new);
5484    let string_encoding = options.string_encoding;
5485    let token = StoreToken::new(store.as_context_mut());
5486    let state = store.0.concurrent_state_mut();
5487
5488    let (tx, rx) = oneshot::channel();
5489
5490    let instance = handle.instance().runtime_instance(component_instance);
5491    let caller = state.current_thread;
5492    let thread = GuestTask::new(
5493        state,
5494        Box::new(for_any_lower(move |store, params| {
5495            lower_params(handle, token.as_context_mut(store), params)
5496        })),
5497        LiftResult {
5498            lift: Box::new(for_any_lift(move |store, result| {
5499                lift_result(handle, store, result)
5500            })),
5501            ty: task_return_type,
5502            memory,
5503            string_encoding,
5504        },
5505        Caller::Host {
5506            tx: Some(tx),
5507            host_future_present,
5508            caller,
5509        },
5510        callback.map(|callback| {
5511            let callback = SendSyncPtr::new(callback);
5512            let instance = handle.instance();
5513            Box::new(move |store: &mut dyn VMStore, event, handle| {
5514                let store = token.as_context_mut(store);
5515                // SAFETY: Per the contract of `prepare_call`, the callback
5516                // will remain valid at least as long is this task exists.
5517                unsafe { instance.call_callback(store, callback, event, handle) }
5518            }) as CallbackFn
5519        }),
5520        instance,
5521        async_function,
5522    )?;
5523
5524    if !store.0.may_enter(instance)? {
5525        bail!(Trap::CannotEnterComponent);
5526    }
5527
5528    Ok(PreparedCall {
5529        handle,
5530        thread,
5531        param_count,
5532        runtime_instance: instance,
5533        rx,
5534        _phantom: PhantomData,
5535    })
5536}
5537
5538/// Queue a call previously prepared using `prepare_call` to be run as part of
5539/// the associated `ComponentInstance`'s event loop.
5540///
5541/// The returned future will resolve to the result once it is available, but
5542/// must only be polled via the instance's event loop. See
5543/// `StoreContextMut::run_concurrent` for details.
5544pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5545    mut store: StoreContextMut<T>,
5546    prepared: PreparedCall<R>,
5547) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5548    let PreparedCall {
5549        handle,
5550        thread,
5551        param_count,
5552        rx,
5553        ..
5554    } = prepared;
5555
5556    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5557
5558    Ok(checked(
5559        store.0.id(),
5560        rx.map(move |result| match result {
5561            Ok(r) => match r.downcast() {
5562                Ok(r) => Ok(*r),
5563                Err(_) => bail_bug!("wrong type of value produced"),
5564            },
5565            Err(e) => Err(e.into()),
5566        }),
5567    ))
5568}
5569
5570/// Queue a call previously prepared using `prepare_call` to be run as part of
5571/// the associated `ComponentInstance`'s event loop.
5572fn queue_call0<T: 'static>(
5573    store: StoreContextMut<T>,
5574    handle: Func,
5575    guest_thread: QualifiedThreadId,
5576    param_count: usize,
5577) -> Result<()> {
5578    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5579    let is_concurrent = raw_options.async_;
5580    let callback = raw_options.callback;
5581    let instance = handle.instance();
5582    let callee = handle.lifted_core_func(store.0);
5583    let post_return = handle.post_return_core_func(store.0);
5584    let callback = callback.map(|i| {
5585        let instance = instance.id().get(store.0);
5586        SendSyncPtr::new(instance.runtime_callback(i))
5587    });
5588
5589    log::trace!("queueing call {guest_thread:?}");
5590
5591    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5592    // (with signatures appropriate for this call) and will remain valid as
5593    // long as this instance is valid.
5594    unsafe {
5595        instance.queue_call(
5596            store,
5597            guest_thread,
5598            SendSyncPtr::new(callee),
5599            param_count,
5600            1,
5601            is_concurrent,
5602            callback,
5603            post_return.map(SendSyncPtr::new),
5604        )
5605    }
5606}