Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::bail_bug;
54use crate::component::func::{self, Func, call_post_return};
55use crate::component::{
56    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::prelude::*;
60use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63use crate::{
64    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::component::{
87    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
88    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
89    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
90    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
91    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
92};
93use wasmtime_environ::packed_option::ReservedValue;
94use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112/// Constant defined in the Component Model spec to indicate that the async
113/// intrinsic (e.g. `future.write`) has not yet completed.
114const BLOCKED: u32 = 0xffff_ffff;
115
116/// Corresponds to `CallState` in the upstream spec.
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119    Starting = 0,
120    Started = 1,
121    Returned = 2,
122    StartCancelled = 3,
123    ReturnCancelled = 4,
124}
125
126impl Status {
127    /// Packs this status and the optional `waitable` provided into a 32-bit
128    /// result that the canonical ABI requires.
129    ///
130    /// The low 4 bits are reserved for the status while the upper 28 bits are
131    /// the waitable, if present.
132    pub fn pack(self, waitable: Option<u32>) -> u32 {
133        assert!(matches!(self, Status::Returned) == waitable.is_none());
134        let waitable = waitable.unwrap_or(0);
135        assert!(waitable < (1 << 28));
136        (waitable << 4) | (self as u32)
137    }
138}
139
140/// Corresponds to `EventCode` in the Component Model spec, plus related payload
141/// data.
142#[derive(Clone, Copy, Debug)]
143enum Event {
144    None,
145    Subtask {
146        status: Status,
147    },
148    StreamRead {
149        code: ReturnCode,
150        pending: Option<(TypeStreamTableIndex, u32)>,
151    },
152    StreamWrite {
153        code: ReturnCode,
154        pending: Option<(TypeStreamTableIndex, u32)>,
155    },
156    FutureRead {
157        code: ReturnCode,
158        pending: Option<(TypeFutureTableIndex, u32)>,
159    },
160    FutureWrite {
161        code: ReturnCode,
162        pending: Option<(TypeFutureTableIndex, u32)>,
163    },
164    Cancelled,
165}
166
167impl Event {
168    /// Lower this event to core Wasm integers for delivery to the guest.
169    ///
170    /// Note that the waitable handle, if any, is assumed to be lowered
171    /// separately.
172    fn parts(self) -> (u32, u32) {
173        const EVENT_NONE: u32 = 0;
174        const EVENT_SUBTASK: u32 = 1;
175        const EVENT_STREAM_READ: u32 = 2;
176        const EVENT_STREAM_WRITE: u32 = 3;
177        const EVENT_FUTURE_READ: u32 = 4;
178        const EVENT_FUTURE_WRITE: u32 = 5;
179        const EVENT_CANCELLED: u32 = 6;
180        match self {
181            Event::None => (EVENT_NONE, 0),
182            Event::Cancelled => (EVENT_CANCELLED, 0),
183            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188        }
189    }
190}
191
192/// Corresponds to `CallbackCode` in the spec.
193mod callback_code {
194    pub const EXIT: u32 = 0;
195    pub const YIELD: u32 = 1;
196    pub const WAIT: u32 = 2;
197}
198
199/// A flag indicating that the callee is an async-lowered export.
200///
201/// This may be passed to the `async-start` intrinsic from a fused adapter.
202const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204/// Provides access to either store data (via the `get` method) or the store
205/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
206/// instance to which the current host task belongs.
207///
208/// See [`Accessor::with`] for details.
209pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210    store: StoreContextMut<'a, T>,
211    get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216    D: HasData + ?Sized,
217    T: 'static,
218{
219    /// Creates a new [`Access`] from its component parts.
220    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221        Self { store, get_data }
222    }
223
224    /// Get mutable access to the store data.
225    pub fn data_mut(&mut self) -> &mut T {
226        self.store.data_mut()
227    }
228
229    /// Get mutable access to the store data.
230    pub fn get(&mut self) -> D::Data<'_> {
231        (self.get_data)(self.data_mut())
232    }
233
234    /// Spawn a background task.
235    ///
236    /// See [`Accessor::spawn`] for details.
237    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238    where
239        T: 'static,
240    {
241        let accessor = Accessor {
242            get_data: self.get_data,
243            token: StoreToken::new(self.store.as_context_mut()),
244        };
245        self.store
246            .as_context_mut()
247            .spawn_with_accessor(accessor, task)
248    }
249
250    /// Returns the getter this accessor is using to project from `T` into
251    /// `D::Data`.
252    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253        self.get_data
254    }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259    D: HasData + ?Sized,
260    T: 'static,
261{
262    type Data = T;
263
264    fn as_context(&self) -> StoreContext<'_, T> {
265        self.store.as_context()
266    }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271    D: HasData + ?Sized,
272    T: 'static,
273{
274    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275        self.store.as_context_mut()
276    }
277}
278
279/// Provides scoped mutable access to store data in the context of a concurrent
280/// host task future.
281///
282/// This allows multiple host task futures to execute concurrently and access
283/// the store between (but not across) `await` points.
284///
285/// # Rationale
286///
287/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
288/// `D::Data<'_>`. The problem this is solving, however, is that it does not
289/// literally store these values. The basic problem is that when a concurrent
290/// host future is being polled it has access to `&mut T` (and the whole
291/// `Store`) but when it's not being polled it does not have access to these
292/// values. This reflects how the store is only ever polling one future at a
293/// time so the store is effectively being passed between futures.
294///
295/// Rust's `Future` trait, however, has no means of passing a `Store`
296/// temporarily between futures. The [`Context`](std::task::Context) type does
297/// not have the ability to attach arbitrary information to it at this time.
298/// This type, [`Accessor`], is used to bridge this expressivity gap.
299///
300/// The [`Accessor`] type here represents the ability to acquire, temporarily in
301/// a synchronous manner, the current store. The [`Accessor::with`] function
302/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
303/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
304/// not take an `async` closure as its argument, instead it's a synchronous
305/// closure which must complete during on run of `Future::poll`. This reflects
306/// how the store is temporarily made available while a host future is being
307/// polled.
308///
309/// # Implementation
310///
311/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
312/// this type additionally doesn't even have a lifetime parameter. This is
313/// instead a representation of proof of the ability to acquire these while a
314/// future is being polled. Wasmtime will, when it polls a host future,
315/// configure ambient state such that the `Accessor` that a future closes over
316/// will work and be able to access the store.
317///
318/// This has a number of implications for users such as:
319///
320/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
321///   the lifetime of a single future.
322/// * A future is expected to, however, close over an `Accessor` and keep it
323///   alive probably for the duration of the entire future.
324/// * Different host futures will be given different `Accessor`s, and that's
325///   intentional.
326/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
327///   alleviates some otherwise required bounds to be written down.
328///
329/// # Using `Accessor` in `Drop`
330///
331/// The methods on `Accessor` are only expected to work in the context of
332/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
333/// host future can be dropped at any time throughout the system and Wasmtime
334/// store context is not necessarily available at that time. It's recommended to
335/// not use `Accessor` methods in anything connected to a `Drop` implementation
336/// as they will panic and have unintended results. If you run into this though
337/// feel free to file an issue on the Wasmtime repository.
338pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340    D: HasData + ?Sized,
341{
342    token: StoreToken<T>,
343    get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346/// A helper trait to take any type of accessor-with-data in functions.
347///
348/// This trait is similar to [`AsContextMut`] except that it's used when
349/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
350/// [`Accessor`] is the main type used in concurrent settings and is passed to
351/// functions such as [`Func::call_concurrent`].
352///
353/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
354/// this trait. This effectively means that regardless of the `D` in
355/// `Accessor<T, D>` it can still be passed to a function which just needs a
356/// store accessor.
357///
358/// Acquiring an [`Accessor`] can be done through
359/// [`StoreContextMut::run_concurrent`] for example or in a host function
360/// through
361/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
362pub trait AsAccessor {
363    /// The `T` in `Store<T>` that this accessor refers to.
364    type Data: 'static;
365
366    /// The `D` in `Accessor<T, D>`, or the projection out of
367    /// `Self::Data`.
368    type AccessorData: HasData + ?Sized;
369
370    /// Returns the accessor that this is referring to.
371    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375    type Data = T::Data;
376    type AccessorData = T::AccessorData;
377
378    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379        T::as_accessor(self)
380    }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384    type Data = T;
385    type AccessorData = D;
386
387    fn as_accessor(&self) -> &Accessor<T, D> {
388        self
389    }
390}
391
392// Note that it is intentional at this time that `Accessor` does not actually
393// store `&mut T` or anything similar. This distinctly enables the `Accessor`
394// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
395// that matter). This is used to ergonomically simplify bindings where the
396// majority of the time `Accessor` is closed over in a future which then needs
397// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
398// you already have to write `T: 'static`...) it helps to avoid this.
399//
400// Note as well that `Accessor` doesn't actually store its data at all. Instead
401// it's more of a "proof" of what can be accessed from TLS. API design around
402// `Accessor` and functions like `Linker::func_wrap_concurrent` are
403// intentionally made to ensure that `Accessor` is ideally only used in the
404// context that TLS variables are actually set. For example host functions are
405// given `&Accessor`, not `Accessor`, and this prevents them from persisting
406// the value outside of a future. Within the future the TLS variables are all
407// guaranteed to be set while the future is being polled.
408//
409// Finally though this is not an ironclad guarantee, but nor does it need to be.
410// The TLS APIs are designed to panic or otherwise model usage where they're
411// called recursively or similar. It's hoped that code cannot be constructed to
412// actually hit this at runtime but this is not a safety requirement at this
413// time.
414const _: () = {
415    const fn assert<T: Send + Sync>() {}
416    assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420    /// Creates a new `Accessor` backed by the specified functions.
421    ///
422    /// - `get`: used to retrieve the store
423    ///
424    /// - `get_data`: used to "project" from the store's associated data to
425    /// another type (e.g. a field of that data or a wrapper around it).
426    ///
427    /// - `spawn`: used to queue spawned background tasks to be run later
428    pub(crate) fn new(token: StoreToken<T>) -> Self {
429        Self {
430            token,
431            get_data: |x| x,
432        }
433    }
434}
435
436impl<T, D> Accessor<T, D>
437where
438    D: HasData + ?Sized,
439{
440    /// Run the specified closure, passing it mutable access to the store.
441    ///
442    /// This function is one of the main building blocks of the [`Accessor`]
443    /// type. This yields synchronous, blocking, access to the store via an
444    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
445    /// providing the ability to access `D` via [`Access::get`]. Note that the
446    /// `fun` here is given only temporary access to the store and `T`/`D`
447    /// meaning that the return value `R` here is not allowed to capture borrows
448    /// into the two. If access is needed to data within `T` or `D` outside of
449    /// this closure then it must be `clone`d out, for example.
450    ///
451    /// # Panics
452    ///
453    /// This function will panic if it is call recursively with any other
454    /// accessor already in scope. For example if `with` is called within `fun`,
455    /// then this function will panic. It is up to the embedder to ensure that
456    /// this does not happen.
457    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458        tls::get(|vmstore| {
459            fun(Access {
460                store: self.token.as_context_mut(vmstore),
461                get_data: self.get_data,
462            })
463        })
464    }
465
466    /// Returns the getter this accessor is using to project from `T` into
467    /// `D::Data`.
468    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469        self.get_data
470    }
471
472    /// Changes this accessor to access `D2` instead of the current type
473    /// parameter `D`.
474    ///
475    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
476    ///
477    /// # Panics
478    ///
479    /// When using this API the returned value is disconnected from `&self` and
480    /// the lifetime binding the `self` argument. An `Accessor` only works
481    /// within the context of the closure or async closure that it was
482    /// originally given to, however. This means that due to the fact that the
483    /// returned value has no lifetime connection it's possible to use the
484    /// accessor outside of `&self`, the original accessor, and panic.
485    ///
486    /// The returned value should only be used within the scope of the original
487    /// `Accessor` that `self` refers to.
488    pub fn with_getter<D2: HasData>(
489        &self,
490        get_data: fn(&mut T) -> D2::Data<'_>,
491    ) -> Accessor<T, D2> {
492        Accessor {
493            token: self.token,
494            get_data,
495        }
496    }
497
498    /// Spawn a background task which will receive an `&Accessor<T, D>` and
499    /// run concurrently with any other tasks in progress for the current
500    /// store.
501    ///
502    /// This is particularly useful for host functions which return a `stream`
503    /// or `future` such that the code to write to the write end of that
504    /// `stream` or `future` must run after the function returns.
505    ///
506    /// The returned [`JoinHandle`] may be used to cancel the task.
507    ///
508    /// # Panics
509    ///
510    /// Panics if called within a closure provided to the [`Accessor::with`]
511    /// function. This can only be called outside an active invocation of
512    /// [`Accessor::with`].
513    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514    where
515        T: 'static,
516    {
517        let accessor = self.clone_for_spawn();
518        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519    }
520
521    fn clone_for_spawn(&self) -> Self {
522        Self {
523            token: self.token,
524            get_data: self.get_data,
525        }
526    }
527
528    /// Polls to see if this store contains any "interesting" tasks still within
529    /// it.
530    ///
531    /// Returns `Poll::Ready(())` if there are no more interesting tasks, and
532    /// otherwise returns `Poll::Pending`. If pending is returned then whenever
533    /// the last remaining "interesting" task has exited the provided context's
534    /// waker will be notified. Note that only the waker passed to the last call
535    /// to `poll_no_interesting_tasks` will be notified, so this is only
536    /// appropriate to use one-at-a-time.
537    ///
538    /// The component model specification, as of this current date, does not
539    /// have a distinction between "interesting" tasks and not. The current
540    /// intention is that in a future revision of the component model this will
541    /// be distinguished at the component ABI level where tasks will be able to
542    /// flag themselves as "interesting" optionally. Additionally extra work can
543    /// be opted-in to being "interesting".
544    ///
545    /// For now what this means is that all component model tasks within this
546    /// store are considered interesting. This specifically includes the entire
547    /// duration of a task, so even all of the time after a task has returned
548    /// but before it has exited. This means that this function is, today,
549    /// effectively a proxy for "are there any more tasks still running in this
550    /// store". This can be used by embedders to determine whether there's any
551    /// more work going on, even in the background, for a particular guest.
552    /// Hosts can use this as a signal that the guest wants to stay alive a
553    /// little longer, even after a task has returned.
554    ///
555    /// In the future this predicate won't include all tasks in this store. Some
556    /// tasks will be able to flag themselves as not interesting, meaning that
557    /// when this returns ready it'd be possible that there are still tasks
558    /// remaining in the store.
559    ///
560    /// Note that at this time spawned threads within a task are always
561    /// considered uninteresting. If this function returns ready, then spawned
562    /// threads may still be in the store.
563    pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
564        self.with(|mut access| {
565            let store = access.as_context_mut().0;
566            let state = store.concurrent_state_mut();
567            if state.interesting_tasks == 0 {
568                Poll::Ready(())
569            } else {
570                state.interesting_tasks_empty_waker = Some(cx.waker().clone());
571                Poll::Pending
572            }
573        })
574    }
575}
576
577/// Represents a task which may be provided to `Accessor::spawn`,
578/// `Accessor::forward`, or `StorecContextMut::spawn`.
579// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
580// option.
581//
582// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
583// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
584// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
585// Send + Sync + 'static` fails with a type mismatch error when we try to pass
586// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
587// best we can do for the time being.
588pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
589where
590    D: HasData + ?Sized,
591{
592    /// Run the task.
593    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
594}
595
596/// Represents parameter and result metadata for the caller side of a
597/// guest->guest call orchestrated by a fused adapter.
598enum CallerInfo {
599    /// Metadata for a call to an async-lowered import
600    Async {
601        params: Vec<ValRaw>,
602        has_result: bool,
603    },
604    /// Metadata for a call to an sync-lowered import
605    Sync {
606        params: Vec<ValRaw>,
607        result_count: u32,
608    },
609}
610
611/// Indicates how a guest task is waiting on a waitable set.
612enum WaitMode {
613    /// The guest task is waiting using `task.wait`
614    Fiber(StoreFiber<'static>),
615    /// The guest task is waiting via a callback declared as part of an
616    /// async-lifted export.
617    Callback(Instance),
618}
619
620/// Represents the reason a fiber is suspending itself.
621#[derive(Debug)]
622enum SuspendReason {
623    /// The fiber is waiting for an event to be delivered to the specified
624    /// waitable set or task.
625    Waiting {
626        set: TableId<WaitableSet>,
627        thread: QualifiedThreadId,
628        skip_may_block_check: bool,
629    },
630    /// The fiber has finished handling its most recent work item and is waiting
631    /// for another (or to be dropped if it is no longer needed).
632    NeedWork,
633    /// The fiber is yielding and should be resumed once other tasks have had a
634    /// chance to run.
635    Yielding {
636        thread: QualifiedThreadId,
637        cancellable: bool,
638        skip_may_block_check: bool,
639    },
640    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
641    ExplicitlySuspending {
642        thread: QualifiedThreadId,
643        skip_may_block_check: bool,
644    },
645}
646
647/// Represents a pending call into guest code for a given guest task.
648enum GuestCallKind {
649    /// Indicates there's an event to deliver to the task, possibly related to a
650    /// waitable set the task has been waiting on or polling.
651    DeliverEvent {
652        /// The instance to which the task belongs.
653        instance: Instance,
654        /// The waitable set the event belongs to, if any.
655        ///
656        /// If this is `None` the event will be waiting in the
657        /// `GuestTask::event` field for the task.
658        set: Option<TableId<WaitableSet>>,
659    },
660    /// Indicates that a new guest task call is pending and may be executed
661    /// using the specified closure.
662    ///
663    /// If the closure returns `Ok(Some(call))`, the `call` should be run
664    /// immediately using `handle_guest_call`.
665    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
666    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
667}
668
669impl fmt::Debug for GuestCallKind {
670    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
671        match self {
672            Self::DeliverEvent { instance, set } => f
673                .debug_struct("DeliverEvent")
674                .field("instance", instance)
675                .field("set", set)
676                .finish(),
677            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
678            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
679        }
680    }
681}
682
683/// The target of a suspension intrinsic.
684#[derive(Copy, Clone, Debug)]
685pub enum SuspensionTarget {
686    SomeSuspended(u32),
687    Some(u32),
688    None,
689}
690
691impl SuspensionTarget {
692    fn is_none(&self) -> bool {
693        matches!(self, SuspensionTarget::None)
694    }
695    fn is_some(&self) -> bool {
696        !self.is_none()
697    }
698}
699
700/// Represents a pending call into guest code for a given guest thread.
701#[derive(Debug)]
702struct GuestCall {
703    thread: QualifiedThreadId,
704    kind: GuestCallKind,
705}
706
707impl GuestCall {
708    /// Returns whether or not the call is ready to run.
709    ///
710    /// A call will not be ready to run if either:
711    ///
712    /// - the (sub-)component instance to be called has already been entered and
713    /// cannot be reentered until an in-progress call completes
714    ///
715    /// - the call is for a not-yet started task and the (sub-)component
716    /// instance to be called has backpressure enabled
717    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
718        let instance = store
719            .concurrent_state_mut()
720            .get_mut(self.thread.task)?
721            .instance;
722        let state = store.instance_state(instance).concurrent_state();
723
724        let ready = match &self.kind {
725            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
726            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
727            GuestCallKind::StartExplicit(_) => true,
728        };
729        log::trace!(
730            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
731            state.do_not_enter,
732            state.backpressure
733        );
734        Ok(ready)
735    }
736}
737
738/// Job to be run on a worker fiber.
739enum WorkerItem {
740    GuestCall(GuestCall),
741    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
742}
743
744/// Represents a pending work item to be handled by the event loop for a given
745/// component instance.
746enum WorkItem {
747    /// A host task to be pushed to `ConcurrentState::futures`.
748    PushFuture(AlwaysMut<HostTaskFuture>),
749    /// A fiber to resume.
750    ResumeFiber(StoreFiber<'static>),
751    /// A thread to resume.
752    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
753    /// A pending call into guest code for a given guest task.
754    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
755    /// A job to run on a worker fiber.
756    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
757}
758
759impl fmt::Debug for WorkItem {
760    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
761        match self {
762            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
763            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
764            Self::ResumeThread(instance, thread) => f
765                .debug_tuple("ResumeThread")
766                .field(instance)
767                .field(thread)
768                .finish(),
769            Self::GuestCall(instance, call) => f
770                .debug_tuple("GuestCall")
771                .field(instance)
772                .field(call)
773                .finish(),
774            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
775        }
776    }
777}
778
779/// Whether a suspension intrinsic was cancelled or completed
780#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
781pub(crate) enum WaitResult {
782    Cancelled,
783    Completed,
784}
785
786/// Poll the specified future until it completes on behalf of a guest->host call
787/// using a sync-lowered import.
788///
789/// This is similar to `Instance::first_poll` except it's for sync-lowered
790/// imports, meaning we don't need to handle cancellation and we can block the
791/// caller until the task completes, at which point the caller can handle
792/// lowering the result to the guest's stack and linear memory.
793pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
794    store: &mut dyn VMStore,
795    future: impl Future<Output = Result<R>> + Send + 'static,
796) -> Result<R> {
797    let state = store.concurrent_state_mut();
798    let task = state.current_host_thread()?;
799
800    // Wrap the future in a closure which will take care of stashing the result
801    // in `GuestTask::result` and resuming this fiber when the host task
802    // completes.
803    let mut future = Box::pin(async move {
804        let result = future.await?;
805        tls::get(move |store| {
806            let state = store.concurrent_state_mut();
807            let host_state = &mut state.get_mut(task)?.state;
808            assert!(matches!(host_state, HostTaskState::CalleeStarted));
809            *host_state = HostTaskState::CalleeFinished(Box::new(result));
810
811            Waitable::Host(task).set_event(
812                state,
813                Some(Event::Subtask {
814                    status: Status::Returned,
815                }),
816            )?;
817
818            Ok(())
819        })
820    }) as HostTaskFuture;
821
822    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
823    // add the future to `ConcurrentState::futures` and poll it automatically
824    // from the event loop if it doesn't complete immediately here.
825    let poll = tls::set(store, || {
826        future
827            .as_mut()
828            .poll(&mut Context::from_waker(&Waker::noop()))
829    });
830
831    match poll {
832        // It completed immediately; check the result and delete the task.
833        Poll::Ready(result) => result?,
834
835        // It did not complete immediately; add it to
836        // `ConcurrentState::futures` so it will be polled via the event loop;
837        // then use `GuestThread::sync_call_set` to wait for the task to
838        // complete, suspending the current fiber until it does so.
839        Poll::Pending => {
840            let state = store.concurrent_state_mut();
841            state.push_future(future);
842
843            let caller = state.get_mut(task)?.caller;
844            let set = state.get_mut(caller.thread)?.sync_call_set;
845            Waitable::Host(task).join(state, Some(set))?;
846
847            store.suspend(SuspendReason::Waiting {
848                set,
849                thread: caller,
850                skip_may_block_check: false,
851            })?;
852
853            // Remove the `task` from the `sync_call_set` to ensure that when
854            // this function returns and the task is deleted that there are no
855            // more lingering references to this host task.
856            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
857        }
858    }
859
860    // Retrieve and return the result.
861    let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
862    match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
863        HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
864            Ok(result) => *result,
865            Err(_) => bail_bug!("host task finished with wrong type of result"),
866        }),
867        _ => bail_bug!("unexpected host task state after completion"),
868    }
869}
870
871/// Execute the specified guest call.
872fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
873    let mut next = Some(call);
874    while let Some(call) = next.take() {
875        match call.kind {
876            GuestCallKind::DeliverEvent { instance, set } => {
877                let (event, waitable) =
878                    match instance.get_event(store, call.thread.task, set, true)? {
879                        Some(pair) => pair,
880                        None => bail_bug!("delivering non-present event"),
881                    };
882                let state = store.concurrent_state_mut();
883                let task = state.get_mut(call.thread.task)?;
884                let runtime_instance = task.instance;
885                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
886
887                log::trace!(
888                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
889                    call.thread,
890                );
891
892                let old_thread = store.set_thread(call.thread)?;
893                log::trace!(
894                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
895                    call.thread
896                );
897
898                store.enter_instance(runtime_instance);
899
900                let Some(callback) = store
901                    .concurrent_state_mut()
902                    .get_mut(call.thread.task)?
903                    .callback
904                    .take()
905                else {
906                    bail_bug!("guest task callback field not present")
907                };
908
909                let code = callback(store, event, handle)?;
910
911                store
912                    .concurrent_state_mut()
913                    .get_mut(call.thread.task)?
914                    .callback = Some(callback);
915
916                store.exit_instance(runtime_instance)?;
917
918                store.set_thread(old_thread)?;
919
920                next = instance.handle_callback_code(
921                    store,
922                    call.thread,
923                    runtime_instance.index,
924                    code,
925                )?;
926
927                log::trace!(
928                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
929                );
930            }
931            GuestCallKind::StartImplicit(fun) => {
932                next = fun(store)?;
933            }
934            GuestCallKind::StartExplicit(fun) => {
935                fun(store)?;
936            }
937        }
938    }
939
940    Ok(())
941}
942
943impl<T> Store<T> {
944    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
945    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
946    where
947        T: Send + 'static,
948    {
949        ensure!(
950            self.as_context().0.concurrency_support(),
951            "cannot use `run_concurrent` when Config::concurrency_support disabled",
952        );
953        self.as_context_mut().run_concurrent(fun).await
954    }
955
956    #[doc(hidden)]
957    pub fn assert_concurrent_state_empty(&mut self) {
958        self.as_context_mut().assert_concurrent_state_empty();
959    }
960
961    #[doc(hidden)]
962    pub fn concurrent_state_table_size(&mut self) -> usize {
963        self.as_context_mut().concurrent_state_table_size()
964    }
965
966    /// Convenience wrapper for [`StoreContextMut::spawn`].
967    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
968    where
969        T: 'static,
970    {
971        self.as_context_mut().spawn(task)
972    }
973}
974
975impl<T> StoreContextMut<'_, T> {
976    /// Assert that all the relevant tables and queues in the concurrent state
977    /// for this store are empty.
978    ///
979    /// This is for sanity checking in integration tests
980    /// (e.g. `component-async-tests`) that the relevant state has been cleared
981    /// after each test concludes.  This should help us catch leaks, e.g. guest
982    /// tasks which haven't been deleted despite having completed and having
983    /// been dropped by their supertasks.
984    ///
985    /// Only intended for use in Wasmtime's own testing.
986    #[doc(hidden)]
987    pub fn assert_concurrent_state_empty(self) {
988        let store = self.0;
989        store
990            .store_data_mut()
991            .components
992            .assert_instance_states_empty();
993        let state = store.concurrent_state_mut();
994        assert!(
995            state.table.get_mut().is_empty(),
996            "non-empty table: {:?}",
997            state.table.get_mut()
998        );
999        assert!(state.high_priority.is_empty());
1000        assert!(state.low_priority.is_empty());
1001        assert!(state.current_thread.is_none());
1002        assert!(state.futures_mut().unwrap().is_empty());
1003        assert!(state.global_error_context_ref_counts.is_empty());
1004    }
1005
1006    /// Helper function to perform tests over the size of the concurrent state
1007    /// table which can be useful for detecting leaks.
1008    ///
1009    /// Only intended for use in Wasmtime's own testing.
1010    #[doc(hidden)]
1011    pub fn concurrent_state_table_size(&mut self) -> usize {
1012        self.0
1013            .concurrent_state_mut()
1014            .table
1015            .get_mut()
1016            .iter_mut()
1017            .count()
1018    }
1019
1020    /// Spawn a background task to run as part of this instance's event loop.
1021    ///
1022    /// The task will receive an `&Accessor<U>` and run concurrently with
1023    /// any other tasks in progress for the instance.
1024    ///
1025    /// Note that the task will only make progress if and when the event loop
1026    /// for this instance is run.
1027    ///
1028    /// The returned [`JoinHandle`] may be used to cancel the task.
1029    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1030    where
1031        T: 'static,
1032    {
1033        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1034        self.spawn_with_accessor(accessor, task)
1035    }
1036
1037    /// Internal implementation of `spawn` functions where a `store` is
1038    /// available along with an `Accessor`.
1039    fn spawn_with_accessor<D>(
1040        self,
1041        accessor: Accessor<T, D>,
1042        task: impl AccessorTask<T, D>,
1043    ) -> JoinHandle
1044    where
1045        T: 'static,
1046        D: HasData + ?Sized,
1047    {
1048        // Create an "abortable future" here where internally the future will
1049        // hook calls to poll and possibly spawn more background tasks on each
1050        // iteration.
1051        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1052        self.0
1053            .concurrent_state_mut()
1054            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1055        handle
1056    }
1057
1058    /// Run the specified closure `fun` to completion as part of this store's
1059    /// event loop.
1060    ///
1061    /// This will run `fun` as part of this store's event loop until it
1062    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1063    /// controlled access to the store and its data.
1064    ///
1065    /// This function can be used to invoke [`Func::call_concurrent`] for
1066    /// example within the async closure provided here.
1067    ///
1068    /// This function will unconditionally return an error if
1069    /// [`Config::concurrency_support`] is disabled.
1070    ///
1071    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1072    ///
1073    /// # Store-blocking behavior
1074    ///
1075    /// At this time there are certain situations in which the `Future` returned
1076    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1077    /// extended period of time, despite one or more `Waker::wake` events having
1078    /// occurred for the task to which it belongs.  This can manifest as the
1079    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1080    /// the `Store` being held by e.g. a blocking host function, preventing the
1081    /// `Future` from being polled. A canonical example of this is when the
1082    /// `fun` provided to this function attempts to set a timeout for an
1083    /// invocation of a wasm function. In this situation the async closure is
1084    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1085    /// to elapse. At this time this setup will not always work and the timeout
1086    /// may not reliably fire.
1087    ///
1088    /// This function will not block the current thread and as such is always
1089    /// suitable to run in an `async` context, but the current implementation of
1090    /// Wasmtime can lead to situations where a certain wasm computation is
1091    /// required to make progress the closure to make progress. This is an
1092    /// artifact of Wasmtime's historical implementation of `async` functions
1093    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1094    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1095    /// progress for a readiness notification of (b) to get delivered.
1096    ///
1097    /// This effectively means that it's not possible to reliably perform a
1098    /// "select" operation within the `fun` closure, which timeouts for example
1099    /// are based on. Fixing this requires some relatively major refactoring
1100    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1101    /// that is intended to be fixed one day. In the meantime it's recommended
1102    /// to apply timeouts or such to the entire `run_concurrent` call itself
1103    /// rather than internally.
1104    ///
1105    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1106    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1107    ///
1108    /// # Example
1109    ///
1110    /// ```
1111    /// # use {
1112    /// #   wasmtime::{
1113    /// #     error::{Result},
1114    /// #     component::{ Component, Linker, Resource, ResourceTable},
1115    /// #     Config, Engine, Store
1116    /// #   },
1117    /// # };
1118    /// #
1119    /// # struct MyResource(u32);
1120    /// # struct Ctx { table: ResourceTable }
1121    /// #
1122    /// # async fn foo() -> Result<()> {
1123    /// # let mut config = Config::new();
1124    /// # let engine = Engine::new(&config)?;
1125    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1126    /// # let mut linker = Linker::new(&engine);
1127    /// # let component = Component::new(&engine, "")?;
1128    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1129    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1130    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1131    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1132    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1133    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1134    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1135    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1136    ///    Ok(())
1137    /// }).await??;
1138    /// # Ok(())
1139    /// # }
1140    /// ```
1141    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1142    where
1143        T: Send + 'static,
1144    {
1145        ensure!(
1146            self.0.concurrency_support(),
1147            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1148        );
1149        self.do_run_concurrent(fun, false).await
1150    }
1151
1152    pub(super) async fn run_concurrent_trap_on_idle<R>(
1153        self,
1154        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1155    ) -> Result<R>
1156    where
1157        T: Send + 'static,
1158    {
1159        self.do_run_concurrent(fun, true).await
1160    }
1161
1162    async fn do_run_concurrent<R>(
1163        mut self,
1164        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1165        trap_on_idle: bool,
1166    ) -> Result<R>
1167    where
1168        T: Send + 'static,
1169    {
1170        debug_assert!(self.0.concurrency_support());
1171        check_recursive_run();
1172        let token = StoreToken::new(self.as_context_mut());
1173
1174        struct Dropper<'a, T: 'static, V> {
1175            store: StoreContextMut<'a, T>,
1176            value: ManuallyDrop<V>,
1177        }
1178
1179        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1180            fn drop(&mut self) {
1181                tls::set(self.store.0, || {
1182                    // SAFETY: Here we drop the value without moving it for the
1183                    // first and only time -- per the contract for `Drop::drop`,
1184                    // this code won't run again, and the `value` field will no
1185                    // longer be accessible.
1186                    unsafe { ManuallyDrop::drop(&mut self.value) }
1187                });
1188            }
1189        }
1190
1191        let accessor = &Accessor::new(token);
1192        let dropper = &mut Dropper {
1193            store: self,
1194            value: ManuallyDrop::new(fun(accessor)),
1195        };
1196        // SAFETY: We never move `dropper` nor its `value` field.
1197        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1198
1199        dropper
1200            .store
1201            .as_context_mut()
1202            .poll_until(future, trap_on_idle)
1203            .await
1204    }
1205
1206    /// Run this store's event loop.
1207    ///
1208    /// The returned future will resolve when the specified future completes or,
1209    /// if `trap_on_idle` is true, when the event loop can't make further
1210    /// progress.
1211    async fn poll_until<R>(
1212        mut self,
1213        mut future: Pin<&mut impl Future<Output = R>>,
1214        trap_on_idle: bool,
1215    ) -> Result<R>
1216    where
1217        T: Send + 'static,
1218    {
1219        struct Reset<'a, T: 'static> {
1220            store: StoreContextMut<'a, T>,
1221            futures: Option<FuturesUnordered<HostTaskFuture>>,
1222        }
1223
1224        impl<'a, T> Drop for Reset<'a, T> {
1225            fn drop(&mut self) {
1226                if let Some(futures) = self.futures.take() {
1227                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1228                }
1229            }
1230        }
1231
1232        loop {
1233            // Take `ConcurrentState::futures` out of the store so we can poll
1234            // it while also safely giving any of the futures inside access to
1235            // `self`.
1236            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1237            let mut reset = Reset {
1238                store: self.as_context_mut(),
1239                futures,
1240            };
1241            let mut next = match reset.futures.as_mut() {
1242                Some(f) => pin!(f.next()),
1243                None => bail_bug!("concurrent state missing futures field"),
1244            };
1245
1246            enum PollResult<R> {
1247                Complete(R),
1248                ProcessWork {
1249                    ready: Vec<WorkItem>,
1250                    low_priority: bool,
1251                },
1252            }
1253
1254            let result = future::poll_fn(|cx| {
1255                // First, poll the future we were passed as an argument and
1256                // return immediately if it's ready.
1257                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1258                    return Poll::Ready(Ok(PollResult::Complete(value)));
1259                }
1260
1261                // Next, poll `ConcurrentState::futures` (which includes any
1262                // pending host tasks and/or background tasks), returning
1263                // immediately if one of them fails.
1264                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1265                    Poll::Ready(Some(output)) => {
1266                        match output {
1267                            Err(e) => return Poll::Ready(Err(e)),
1268                            Ok(()) => {}
1269                        }
1270                        Poll::Ready(true)
1271                    }
1272                    Poll::Ready(None) => Poll::Ready(false),
1273                    Poll::Pending => Poll::Pending,
1274                };
1275
1276                // Next, collect the next batch of work items to process, if
1277                // any.  This will be either all of the high-priority work
1278                // items, or if there are none, a single low-priority work item.
1279                let state = reset.store.0.concurrent_state_mut();
1280                let mut ready = mem::take(&mut state.high_priority);
1281                let mut low_priority = false;
1282                if ready.is_empty() {
1283                    if let Some(item) = state.low_priority.pop_back() {
1284                        ready.push(item);
1285                        low_priority = true;
1286                    }
1287                }
1288                if !ready.is_empty() {
1289                    return Poll::Ready(Ok(PollResult::ProcessWork {
1290                        ready,
1291                        low_priority,
1292                    }));
1293                }
1294
1295                // Finally, if we have nothing else to do right now, determine what to do
1296                // based on whether there are any pending futures in
1297                // `ConcurrentState::futures`.
1298                return match next {
1299                    Poll::Ready(true) => {
1300                        // In this case, one of the futures in
1301                        // `ConcurrentState::futures` completed
1302                        // successfully, so we return now and continue
1303                        // the outer loop in case there is another one
1304                        // ready to complete.
1305                        Poll::Ready(Ok(PollResult::ProcessWork {
1306                            ready: Vec::new(),
1307                            low_priority: false,
1308                        }))
1309                    }
1310                    Poll::Ready(false) => {
1311                        // Poll the future we were passed one last time
1312                        // in case one of `ConcurrentState::futures` had
1313                        // the side effect of unblocking it.
1314                        if let Poll::Ready(value) =
1315                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1316                        {
1317                            Poll::Ready(Ok(PollResult::Complete(value)))
1318                        } else {
1319                            // In this case, there are no more pending
1320                            // futures in `ConcurrentState::futures`,
1321                            // there are no remaining work items, _and_
1322                            // the future we were passed as an argument
1323                            // still hasn't completed.
1324                            if trap_on_idle {
1325                                // `trap_on_idle` is true, so we exit
1326                                // immediately.
1327                                Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1328                            } else {
1329                                // `trap_on_idle` is false, so we assume
1330                                // that future will wake up and give us
1331                                // more work to do when it's ready to.
1332                                Poll::Pending
1333                            }
1334                        }
1335                    }
1336                    // There is at least one pending future in
1337                    // `ConcurrentState::futures` and we have nothing
1338                    // else to do but wait for now, so we return
1339                    // `Pending`.
1340                    Poll::Pending => Poll::Pending,
1341                };
1342            })
1343            .await;
1344
1345            // Put the `ConcurrentState::futures` back into the store before we
1346            // return or handle any work items since one or more of those items
1347            // might append more futures.
1348            drop(reset);
1349
1350            match result? {
1351                // The future we were passed as an argument completed, so we
1352                // return the result.
1353                PollResult::Complete(value) => break Ok(value),
1354                // The future we were passed has not yet completed, so handle
1355                // any work items and then loop again.
1356                PollResult::ProcessWork {
1357                    ready,
1358                    low_priority,
1359                } => {
1360                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1361                        store: StoreContextMut<'a, T>,
1362                        ready: I,
1363                    }
1364
1365                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1366                        fn drop(&mut self) {
1367                            while let Some(item) = self.ready.next() {
1368                                match item {
1369                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1370                                    WorkItem::PushFuture(future) => {
1371                                        tls::set(self.store.0, move || drop(future))
1372                                    }
1373                                    _ => {}
1374                                }
1375                            }
1376                        }
1377                    }
1378
1379                    let mut dispose = Dispose {
1380                        store: self.as_context_mut(),
1381                        ready: ready.into_iter(),
1382                    };
1383
1384                    // If we're about to run a low-priority task, first yield to
1385                    // the executor.  This ensures that it won't be starved of
1386                    // the ability to e.g. update the readiness of sockets,
1387                    // etc. which the guest may be using `thread.yield` along
1388                    // with `waitable-set.poll` to monitor in a CPU-heavy loop.
1389                    //
1390                    // This works for e.g. `thread.yield` and callbacks which
1391                    // return `CALLBACK_CODE_YIELD` because we queue a low
1392                    // priority item to resume the task (i.e. resume the thread
1393                    // or call the callback, respectively) just prior to
1394                    // suspending it.  Indeed, as of this writing those are the
1395                    // _only_ situations we queue low-priority tasks.
1396                    // Therefore, we interpret the guest's request to yield as
1397                    // meaning "yield to other guest tasks _and_/_or_ host
1398                    // operations such as updating socket readiness", the latter
1399                    // being the async runtime's responsibility.
1400                    //
1401                    // In the future, if this ends up causing measurable
1402                    // performance issues, this could be optimized such that we
1403                    // only yield periodically (e.g. for batches of low priority
1404                    // items) and not for each and every idividual item.
1405                    if low_priority {
1406                        dispose.store.0.yield_now().await
1407                    }
1408
1409                    while let Some(item) = dispose.ready.next() {
1410                        dispose
1411                            .store
1412                            .as_context_mut()
1413                            .handle_work_item(item)
1414                            .await?;
1415                    }
1416                }
1417            }
1418        }
1419    }
1420
1421    /// Handle the specified work item, possibly resuming a fiber if applicable.
1422    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1423    where
1424        T: Send,
1425    {
1426        log::trace!("handle work item {item:?}");
1427        match item {
1428            WorkItem::PushFuture(future) => {
1429                self.0
1430                    .concurrent_state_mut()
1431                    .futures_mut()?
1432                    .push(future.into_inner());
1433            }
1434            WorkItem::ResumeFiber(fiber) => {
1435                self.0.resume_fiber(fiber).await?;
1436            }
1437            WorkItem::ResumeThread(_, thread) => {
1438                if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1439                    &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1440                    GuestThreadState::Running,
1441                ) {
1442                    self.0.resume_fiber(fiber).await?;
1443                } else {
1444                    bail_bug!("cannot resume non-pending thread {thread:?}");
1445                }
1446            }
1447            WorkItem::GuestCall(_, call) => {
1448                if call.is_ready(self.0)? {
1449                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1450                } else {
1451                    let state = self.0.concurrent_state_mut();
1452                    let task = state.get_mut(call.thread.task)?;
1453                    if !task.starting_sent {
1454                        task.starting_sent = true;
1455                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1456                            Waitable::Guest(call.thread.task).set_event(
1457                                state,
1458                                Some(Event::Subtask {
1459                                    status: Status::Starting,
1460                                }),
1461                            )?;
1462                        }
1463                    }
1464
1465                    let instance = state.get_mut(call.thread.task)?.instance;
1466                    self.0
1467                        .instance_state(instance)
1468                        .concurrent_state()
1469                        .pending
1470                        .insert(call.thread, call.kind);
1471                }
1472            }
1473            WorkItem::WorkerFunction(fun) => {
1474                self.run_on_worker(WorkerItem::Function(fun)).await?;
1475            }
1476        }
1477
1478        Ok(())
1479    }
1480
1481    /// Execute the specified guest call on a worker fiber.
1482    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1483    where
1484        T: Send,
1485    {
1486        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1487            fiber
1488        } else {
1489            fiber::make_fiber(self.0, move |store| {
1490                loop {
1491                    let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1492                        bail_bug!("worker_item not present when resuming fiber")
1493                    };
1494                    match item {
1495                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1496                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1497                    }
1498
1499                    store.suspend(SuspendReason::NeedWork)?;
1500                }
1501            })?
1502        };
1503
1504        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1505        assert!(worker_item.is_none());
1506        *worker_item = Some(item);
1507
1508        self.0.resume_fiber(worker).await
1509    }
1510
1511    /// Wrap the specified host function in a future which will call it, passing
1512    /// it an `&Accessor<T>`.
1513    ///
1514    /// See the `Accessor` documentation for details.
1515    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1516    where
1517        T: 'static,
1518        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1519            + Send
1520            + Sync
1521            + 'static,
1522        R: Send + Sync + 'static,
1523    {
1524        let token = StoreToken::new(self);
1525        async move {
1526            let mut accessor = Accessor::new(token);
1527            closure(&mut accessor).await
1528        }
1529    }
1530}
1531
1532impl StoreOpaque {
1533    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1534    /// guest-to-guest call or a sync host-to-guest call.
1535    ///
1536    /// This task will only be used for the purpose of handling calls to
1537    /// intrinsic functions; both parameter lowering and result lifting are
1538    /// assumed to be taken care of elsewhere.
1539    pub(crate) fn enter_guest_sync_call(
1540        &mut self,
1541        guest_caller: Option<RuntimeInstance>,
1542        callee_async: bool,
1543        callee: RuntimeInstance,
1544    ) -> Result<()> {
1545        log::trace!("enter sync call {callee:?}");
1546        if !self.concurrency_support() {
1547            return self.enter_call_not_concurrent();
1548        }
1549
1550        let state = self.concurrent_state_mut();
1551        let thread = state.current_thread;
1552        let instance = if let Some(thread) = thread.guest() {
1553            Some(state.get_mut(thread.task)?.instance)
1554        } else {
1555            None
1556        };
1557        if guest_caller.is_some() {
1558            debug_assert_eq!(instance, guest_caller);
1559        }
1560        let guest_thread = GuestTask::new(
1561            state,
1562            Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1563            LiftResult {
1564                lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1565                ty: TypeTupleIndex::reserved_value(),
1566                memory: None,
1567                string_encoding: StringEncoding::Utf8,
1568            },
1569            if let Some(thread) = thread.guest() {
1570                Caller::Guest { thread: *thread }
1571            } else {
1572                Caller::Host {
1573                    tx: None,
1574                    host_future_present: false,
1575                    caller: thread,
1576                }
1577            },
1578            None,
1579            callee,
1580            callee_async,
1581        )?;
1582
1583        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1584            guest_thread.thread,
1585            self,
1586            callee.index,
1587        )?;
1588        self.set_thread(guest_thread)?;
1589
1590        Ok(())
1591    }
1592
1593    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1594    pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1595        if !self.concurrency_support() {
1596            return Ok(self.exit_call_not_concurrent());
1597        }
1598        let thread = match self.set_thread(CurrentThread::None)?.guest() {
1599            Some(t) => *t,
1600            None => bail_bug!("expected task when exiting"),
1601        };
1602        let task = self.concurrent_state_mut().get_mut(thread.task)?;
1603        let instance = task.instance;
1604        let caller = match &task.caller {
1605            &Caller::Guest { thread } => thread.into(),
1606            &Caller::Host { caller, .. } => caller,
1607        };
1608        task.lift_result = None;
1609        task.exited = true;
1610        self.set_thread(caller)?;
1611
1612        log::trace!("exit sync call {instance:?}");
1613        self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1614
1615        Ok(())
1616    }
1617
1618    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1619    /// transition to the host.
1620    ///
1621    /// FIXME: this is called for all guest->host transitions and performs some
1622    /// relatively expensive table manipulations. This would ideally be
1623    /// optimized to avoid the full allocation of a `HostTask` in at least some
1624    /// situations.
1625    pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1626        if !self.concurrency_support() {
1627            self.enter_call_not_concurrent()?;
1628            return Ok(None);
1629        }
1630        let state = self.concurrent_state_mut();
1631        let caller = state.current_guest_thread()?;
1632        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1633        log::trace!("new host task {task:?}");
1634        self.set_thread(task)?;
1635        Ok(Some(task))
1636    }
1637
1638    /// Invoked before lowering the results of a host task to the guest.
1639    ///
1640    /// This is used to update the current thread annotations within the store
1641    /// to ensure that it reflects the guest task, not the host task, since
1642    /// lowering may execute guest code.
1643    pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1644        if !self.concurrency_support() {
1645            return Ok(());
1646        }
1647        let task = self.concurrent_state_mut().current_host_thread()?;
1648        let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1649        self.set_thread(caller)?;
1650        Ok(())
1651    }
1652
1653    /// Dual of `host_task_create` and signifies that the host has finished and
1654    /// will be cleaned up.
1655    ///
1656    /// Note that this isn't invoked when the host is invoked asynchronously and
1657    /// the host isn't complete yet. In that situation the host task persists
1658    /// and will be cleaned up separately in `subtask_drop`
1659    pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1660        match task {
1661            Some(task) => {
1662                log::trace!("delete host task {task:?}");
1663                self.concurrent_state_mut().delete(task)?;
1664            }
1665            None => {
1666                self.exit_call_not_concurrent();
1667            }
1668        }
1669        Ok(())
1670    }
1671
1672    /// Determine whether the specified instance may be entered from the host.
1673    ///
1674    /// We return `true` here only if all of the following hold:
1675    ///
1676    /// - The top-level instance is not already on the current task's call stack.
1677    /// - The instance is not in need of a post-return function call.
1678    /// - `self` has not been poisoned due to a trap.
1679    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1680        if self.trapped() {
1681            return Ok(false);
1682        }
1683        if !self.concurrency_support() {
1684            return Ok(true);
1685        }
1686        let state = self.concurrent_state_mut();
1687        let mut cur = state.current_thread;
1688        loop {
1689            match cur {
1690                CurrentThread::None => break Ok(true),
1691                CurrentThread::Guest(thread) => {
1692                    let task = state.get_mut(thread.task)?;
1693
1694                    // Note that we only compare top-level instance IDs here.
1695                    // The idea is that the host is not allowed to recursively
1696                    // enter a top-level instance even if the specific leaf
1697                    // instance is not on the stack. This the behavior defined
1698                    // in the spec, and it allows us to elide runtime checks in
1699                    // guest-to-guest adapters.
1700                    if task.instance.instance == instance.instance {
1701                        break Ok(false);
1702                    }
1703                    cur = match task.caller {
1704                        Caller::Host { caller, .. } => caller,
1705                        Caller::Guest { thread } => thread.into(),
1706                    };
1707                }
1708                CurrentThread::Host(id) => {
1709                    cur = state.get_mut(id)?.caller.into();
1710                }
1711            }
1712        }
1713    }
1714
1715    /// Helper function to retrieve the `InstanceState` for the
1716    /// specified instance.
1717    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1718        self.component_instance_mut(instance.instance)
1719            .instance_state(instance.index)
1720    }
1721
1722    /// Configure the currently running `thread`.
1723    ///
1724    /// This will save off any state necessary for the previous thread, if
1725    /// applicable, and then it'll additionally update state for `thread` if
1726    /// needed too.
1727    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1728        let thread = thread.into();
1729        let state = self.concurrent_state_mut();
1730        let old_thread = mem::replace(&mut state.current_thread, thread);
1731
1732        // First thing to do after swapping threads is updating the context
1733        // slots for this thread within the store. This restores the behavior of
1734        // `context.{get,set}`. This involves taking the old state out of the
1735        // store, saving it in the thread that's being swapped from, and doing
1736        // the inverse for the new thread. When debug assertions are enabled
1737        // this also leaves behind sentinel values to try to uncover bugs where
1738        // this may be forgotten.
1739        if let Some(old_thread) = old_thread.guest() {
1740            let old_context = self.vm_store_context().component_context;
1741            self.concurrent_state_mut()
1742                .get_mut(old_thread.thread)?
1743                .context = old_context;
1744        }
1745        if cfg!(debug_assertions) {
1746            self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1747        }
1748        if let Some(thread) = thread.guest() {
1749            let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1750            let context = thread.context;
1751            if cfg!(debug_assertions) {
1752                thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1753            }
1754            self.vm_store_context_mut().component_context = context;
1755        }
1756
1757        // Each time we switch threads, we conservatively set `task_may_block`
1758        // to `false` for the component instance we're switching away from (if
1759        // any), meaning it will be `false` for any new thread created for that
1760        // instance unless explicitly set otherwise.
1761        //
1762        // Additionally if we're switching to a new thread, set its component
1763        // instance's `task_may_block` according to where it left off.
1764        let state = self.concurrent_state_mut();
1765        if let Some(old_thread) = old_thread.guest() {
1766            let instance = state.get_mut(old_thread.task)?.instance.instance;
1767            self.component_instance_mut(instance)
1768                .set_task_may_block(false)
1769        }
1770
1771        if thread.guest().is_some() {
1772            self.set_task_may_block()?;
1773        }
1774
1775        Ok(old_thread)
1776    }
1777
1778    /// Set the global variable representing whether the current task may block
1779    /// prior to entering Wasm code.
1780    fn set_task_may_block(&mut self) -> Result<()> {
1781        let state = self.concurrent_state_mut();
1782        let guest_thread = state.current_guest_thread()?;
1783        let instance = state.get_mut(guest_thread.task)?.instance.instance;
1784        let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1785        self.component_instance_mut(instance)
1786            .set_task_may_block(may_block);
1787        Ok(())
1788    }
1789
1790    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1791        if !self.concurrency_support() {
1792            return Ok(());
1793        }
1794        let state = self.concurrent_state_mut();
1795        let task = state.current_guest_thread()?.task;
1796        let instance = state.get_mut(task)?.instance.instance;
1797        let task_may_block = self.component_instance(instance).get_task_may_block();
1798
1799        if task_may_block {
1800            Ok(())
1801        } else {
1802            Err(Trap::CannotBlockSyncTask.into())
1803        }
1804    }
1805
1806    /// Record that we're about to enter a (sub-)component instance which does
1807    /// not support more than one concurrent, stackful activation, meaning it
1808    /// cannot be entered again until the next call returns.
1809    fn enter_instance(&mut self, instance: RuntimeInstance) {
1810        log::trace!("enter {instance:?}");
1811        self.instance_state(instance)
1812            .concurrent_state()
1813            .do_not_enter = true;
1814    }
1815
1816    /// Record that we've exited a (sub-)component instance previously entered
1817    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1818    /// See the documentation for the latter for details.
1819    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1820        log::trace!("exit {instance:?}");
1821        self.instance_state(instance)
1822            .concurrent_state()
1823            .do_not_enter = false;
1824        self.partition_pending(instance)
1825    }
1826
1827    /// Iterate over `InstanceState::pending`, moving any ready items into the
1828    /// "high priority" work item queue.
1829    ///
1830    /// See `GuestCall::is_ready` for details.
1831    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1832        for (thread, kind) in
1833            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1834        {
1835            let call = GuestCall { thread, kind };
1836            if call.is_ready(self)? {
1837                self.concurrent_state_mut()
1838                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1839            } else {
1840                self.instance_state(instance)
1841                    .concurrent_state()
1842                    .pending
1843                    .insert(call.thread, call.kind);
1844            }
1845        }
1846
1847        Ok(())
1848    }
1849
1850    /// Implements the `backpressure.{inc,dec}` intrinsics.
1851    pub(crate) fn backpressure_modify(
1852        &mut self,
1853        caller_instance: RuntimeInstance,
1854        modify: impl FnOnce(u16) -> Option<u16>,
1855    ) -> Result<()> {
1856        let state = self.instance_state(caller_instance).concurrent_state();
1857        let old = state.backpressure;
1858        let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1859        state.backpressure = new;
1860
1861        if old > 0 && new == 0 {
1862            // Backpressure was previously enabled and is now disabled; move any
1863            // newly-eligible guest calls to the "high priority" queue.
1864            self.partition_pending(caller_instance)?;
1865        }
1866
1867        Ok(())
1868    }
1869
1870    /// Resume the specified fiber, giving it exclusive access to the specified
1871    /// store.
1872    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1873        let old_thread = self.concurrent_state_mut().current_thread;
1874        log::trace!("resume_fiber: save current thread {old_thread:?}");
1875
1876        let fiber = fiber::resolve_or_release(self, fiber).await?;
1877
1878        self.set_thread(old_thread)?;
1879
1880        let state = self.concurrent_state_mut();
1881
1882        if let Some(ot) = old_thread.guest() {
1883            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1884        }
1885        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1886
1887        if let Some(mut fiber) = fiber {
1888            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1889            // See the `SuspendReason` documentation for what each case means.
1890            let reason = match state.suspend_reason.take() {
1891                Some(r) => r,
1892                None => bail_bug!("suspend reason missing when resuming fiber"),
1893            };
1894            match reason {
1895                SuspendReason::NeedWork => {
1896                    if state.worker.is_none() {
1897                        state.worker = Some(fiber);
1898                    } else {
1899                        fiber.dispose(self);
1900                    }
1901                }
1902                SuspendReason::Yielding {
1903                    thread,
1904                    cancellable,
1905                    ..
1906                } => {
1907                    state.get_mut(thread.thread)?.state =
1908                        GuestThreadState::Ready { fiber, cancellable };
1909                    let instance = state.get_mut(thread.task)?.instance.index;
1910                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1911                }
1912                SuspendReason::ExplicitlySuspending { thread, .. } => {
1913                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1914                }
1915                SuspendReason::Waiting { set, thread, .. } => {
1916                    let old = state
1917                        .get_mut(set)?
1918                        .waiting
1919                        .insert(thread, WaitMode::Fiber(fiber));
1920                    assert!(old.is_none());
1921                }
1922            };
1923        } else {
1924            log::trace!("resume_fiber: fiber has exited");
1925        }
1926
1927        Ok(())
1928    }
1929
1930    /// Suspend the current fiber, storing the reason in
1931    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1932    /// it should be resumed.
1933    ///
1934    /// See the `SuspendReason` documentation for details.
1935    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1936        log::trace!("suspend fiber: {reason:?}");
1937
1938        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1939        // pop the call context which manages resource borrows before suspending
1940        // and then push it again once we've resumed.
1941        let task = match &reason {
1942            SuspendReason::Yielding { thread, .. }
1943            | SuspendReason::Waiting { thread, .. }
1944            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1945            SuspendReason::NeedWork => None,
1946        };
1947
1948        let old_guest_thread = if task.is_some() {
1949            self.concurrent_state_mut().current_thread
1950        } else {
1951            CurrentThread::None
1952        };
1953
1954        // We should not have reached here unless either there's no current
1955        // task, or the current task is permitted to block.  In addition, we
1956        // special-case `thread.switch-to` and waiting for a subtask to go from
1957        // `starting` to `started`, both of which we consider non-blocking
1958        // operations despite requiring a suspend.
1959        debug_assert!(
1960            matches!(
1961                reason,
1962                SuspendReason::ExplicitlySuspending {
1963                    skip_may_block_check: true,
1964                    ..
1965                } | SuspendReason::Waiting {
1966                    skip_may_block_check: true,
1967                    ..
1968                } | SuspendReason::Yielding {
1969                    skip_may_block_check: true,
1970                    ..
1971                }
1972            ) || old_guest_thread
1973                .guest()
1974                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1975                .transpose()?
1976                .unwrap_or(true)
1977        );
1978
1979        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1980        assert!(suspend_reason.is_none());
1981        *suspend_reason = Some(reason);
1982
1983        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1984
1985        if task.is_some() {
1986            self.set_thread(old_guest_thread)?;
1987        }
1988
1989        Ok(())
1990    }
1991
1992    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1993        let state = self.concurrent_state_mut();
1994
1995        if waitable.common(state)?.set.is_some() {
1996            bail!(Trap::WaitableSyncAndAsync);
1997        }
1998
1999        let caller = state.current_guest_thread()?;
2000        let set = state.get_mut(caller.thread)?.sync_call_set;
2001        waitable.join(state, Some(set))?;
2002        self.suspend(SuspendReason::Waiting {
2003            set,
2004            thread: caller,
2005            skip_may_block_check: false,
2006        })?;
2007        let state = self.concurrent_state_mut();
2008        waitable.join(state, None)
2009    }
2010
2011    /// Cleans up the data structures backing the `guest_thread` specified,
2012    /// removing it from the internal tables of `runtime_instance` as well.
2013    ///
2014    /// This function is used whenever a guest thread has fully exited and
2015    /// completed. This'll clean up the associated `GuestThread` structure and
2016    /// related resources it contains.
2017    ///
2018    /// Other functionality that this implements is:
2019    ///
2020    /// * This will perform conditional cleanup of the `GuestTask` that owns
2021    ///   this thread if `cleanup_task` is `CleanupTask::Yes`.
2022    /// * If there are no more threads in the `GuestTask` that this thread is
2023    ///   associated with, and if the task hasn't produced a result (e.g. it's not
2024    ///   returned or cancelled), then a trap will be raised that a result
2025    ///   wasn't ever produced.
2026    /// * If this task is finished, meaning the top-level thread exited and
2027    ///   additionally it's been returned or cancelled, then this will handle
2028    ///   management of the store's "active interesting tasks" counter.
2029    ///
2030    /// Effectively this is intended to be a "narrow waist" through which many
2031    /// destruction operations are funneled through.
2032    fn cleanup_thread(
2033        &mut self,
2034        guest_thread: QualifiedThreadId,
2035        runtime_instance: RuntimeInstance,
2036        cleanup_task: CleanupTask,
2037    ) -> Result<()> {
2038        let state = self.concurrent_state_mut();
2039        let thread_data = state.get_mut(guest_thread.thread)?;
2040        let sync_call_set = thread_data.sync_call_set;
2041        if let Some(guest_id) = thread_data.instance_rep {
2042            self.instance_state(runtime_instance)
2043                .thread_handle_table()
2044                .guest_thread_remove(guest_id)?;
2045        }
2046        let state = self.concurrent_state_mut();
2047
2048        // Clean up any pending subtasks in the sync_call_set
2049        for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2050            if let Some(Event::Subtask {
2051                status: Status::Returned | Status::ReturnCancelled,
2052            }) = waitable.common(state)?.event
2053            {
2054                waitable.delete_from(state)?;
2055            }
2056        }
2057
2058        state.delete(guest_thread.thread)?;
2059        state.delete(sync_call_set)?;
2060        let task = state.get_mut(guest_thread.task)?;
2061        task.threads.remove(&guest_thread.thread);
2062
2063        if task.threads.is_empty() && !task.returned_or_cancelled() {
2064            bail!(Trap::NoAsyncResult);
2065        }
2066        let ready_to_delete = task.ready_to_delete();
2067
2068        if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2069            task.decremented_interesting_task_count = true;
2070
2071            debug_assert!(state.interesting_tasks > 0);
2072            state.interesting_tasks -= 1;
2073            if state.interesting_tasks == 0
2074                && let Some(waker) = state.interesting_tasks_empty_waker.take()
2075            {
2076                waker.wake();
2077            }
2078        }
2079
2080        match cleanup_task {
2081            CleanupTask::Yes => {
2082                if ready_to_delete {
2083                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2084                }
2085            }
2086            CleanupTask::No => {}
2087        }
2088
2089        Ok(())
2090    }
2091
2092    /// Performs cancellation of the `guest_task` specified with the
2093    /// precondition that the task hasn't lowered its parameters.
2094    ///
2095    /// In this situation the task hasn't ever been started meaning it hasn't
2096    /// actually run any wasm code yet. This requires cleaning up metadata such
2097    /// as thread information attached to the task.
2098    ///
2099    /// The main two entrypoints for this function are:
2100    ///
2101    /// * Task cancellation via `subtask.cancel`, the intrinsic.
2102    /// * Dropping a host `call_async` future which needs to cancel the task
2103    ///   because it cannot reference its parameters any more.
2104    fn cancel_guest_subtask_without_lowered_parameters(
2105        &mut self,
2106        caller_instance: RuntimeInstance,
2107        guest_task: TableId<GuestTask>,
2108    ) -> Result<()> {
2109        let concurrent_state = self.concurrent_state_mut();
2110        let task = concurrent_state.get_mut(guest_task)?;
2111        assert!(!task.already_lowered_parameters());
2112        // The task is in a `starting` state, meaning it hasn't run at
2113        // all yet.  Here we update its fields to indicate that it is
2114        // ready to delete immediately once `subtask.drop` is called.
2115        task.lower_params = None;
2116        task.lift_result = None;
2117        task.exited = true;
2118        let instance = task.instance;
2119
2120        // Clean up the thread within this task as it's now never going
2121        // to run.
2122        assert_eq!(1, task.threads.len());
2123        let thread = *task.threads.iter().next().unwrap();
2124        self.cleanup_thread(
2125            QualifiedThreadId {
2126                task: guest_task,
2127                thread,
2128            },
2129            caller_instance,
2130            CleanupTask::No,
2131        )?;
2132
2133        // Not yet started; cancel and remove from pending
2134        let pending = &mut self.instance_state(instance).concurrent_state().pending;
2135        let pending_count = pending.len();
2136        pending.retain(|thread, _| thread.task != guest_task);
2137        // If there were no pending threads for this task, we're in an error state
2138        if pending.len() == pending_count {
2139            bail!(Trap::SubtaskCancelAfterTerminal);
2140        }
2141        Ok(())
2142    }
2143}
2144
2145enum CleanupTask {
2146    Yes,
2147    No,
2148}
2149
2150impl Instance {
2151    /// Get the next pending event for the specified task and (optional)
2152    /// waitable set, along with the waitable handle if applicable.
2153    fn get_event(
2154        self,
2155        store: &mut StoreOpaque,
2156        guest_task: TableId<GuestTask>,
2157        set: Option<TableId<WaitableSet>>,
2158        cancellable: bool,
2159    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2160        let state = store.concurrent_state_mut();
2161
2162        let event = &mut state.get_mut(guest_task)?.event;
2163        if let Some(ev) = event
2164            && (cancellable || !matches!(ev, Event::Cancelled))
2165        {
2166            log::trace!("deliver event {ev:?} to {guest_task:?}");
2167            let ev = *ev;
2168            *event = None;
2169            return Ok(Some((ev, None)));
2170        }
2171
2172        let set = match set {
2173            Some(set) => set,
2174            None => return Ok(None),
2175        };
2176        let waitable = match state.get_mut(set)?.ready.pop_first() {
2177            Some(v) => v,
2178            None => return Ok(None),
2179        };
2180
2181        let common = waitable.common(state)?;
2182        let handle = match common.handle {
2183            Some(h) => h,
2184            None => bail_bug!("handle not set when delivering event"),
2185        };
2186        let event = match common.event.take() {
2187            Some(e) => e,
2188            None => bail_bug!("event not set when delivering event"),
2189        };
2190
2191        log::trace!(
2192            "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2193        );
2194
2195        waitable.on_delivery(store, self, event)?;
2196
2197        Ok(Some((event, Some((waitable, handle)))))
2198    }
2199
2200    /// Handle the `CallbackCode` returned from an async-lifted export or its
2201    /// callback.
2202    ///
2203    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
2204    /// using `handle_guest_call`.
2205    fn handle_callback_code(
2206        self,
2207        store: &mut StoreOpaque,
2208        guest_thread: QualifiedThreadId,
2209        runtime_instance: RuntimeComponentInstanceIndex,
2210        code: u32,
2211    ) -> Result<Option<GuestCall>> {
2212        let (code, set) = unpack_callback_code(code);
2213
2214        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2215
2216        let state = store.concurrent_state_mut();
2217
2218        let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2219            let set = store
2220                .instance_state(self.runtime_instance(runtime_instance))
2221                .handle_table()
2222                .waitable_set_rep(handle)?;
2223
2224            Ok(TableId::<WaitableSet>::new(set))
2225        };
2226
2227        Ok(match code {
2228            callback_code::EXIT => {
2229                log::trace!("implicit thread {guest_thread:?} completed");
2230                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2231                task.exited = true;
2232                task.callback = None;
2233                store.cleanup_thread(
2234                    guest_thread,
2235                    self.runtime_instance(runtime_instance),
2236                    CleanupTask::Yes,
2237                )?;
2238                None
2239            }
2240            callback_code::YIELD => {
2241                let task = state.get_mut(guest_thread.task)?;
2242                // If an `Event::Cancelled` is pending, we'll deliver that;
2243                // otherwise, we'll deliver `Event::None`.  Note that
2244                // `GuestTask::event` is only ever set to one of those two
2245                // `Event` variants.
2246                if let Some(event) = task.event {
2247                    assert!(matches!(event, Event::None | Event::Cancelled));
2248                } else {
2249                    task.event = Some(Event::None);
2250                }
2251                let call = GuestCall {
2252                    thread: guest_thread,
2253                    kind: GuestCallKind::DeliverEvent {
2254                        instance: self,
2255                        set: None,
2256                    },
2257                };
2258                if state.may_block(guest_thread.task)? {
2259                    // Push this thread onto the "low priority" queue so it runs
2260                    // after any other threads have had a chance to run.
2261                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2262                    None
2263                } else {
2264                    // Yielding in a non-blocking context is defined as a no-op
2265                    // according to the spec, so we must run this thread
2266                    // immediately without allowing any others to run.
2267                    Some(call)
2268                }
2269            }
2270            callback_code::WAIT => {
2271                // The task may only return `WAIT` if it was created for a call
2272                // to an async export).  Otherwise, we'll trap.
2273                state.check_blocking_for(guest_thread.task)?;
2274
2275                let set = get_set(store, set)?;
2276                let state = store.concurrent_state_mut();
2277
2278                if state.get_mut(guest_thread.task)?.event.is_some()
2279                    || !state.get_mut(set)?.ready.is_empty()
2280                {
2281                    // An event is immediately available; deliver it ASAP.
2282                    state.push_high_priority(WorkItem::GuestCall(
2283                        runtime_instance,
2284                        GuestCall {
2285                            thread: guest_thread,
2286                            kind: GuestCallKind::DeliverEvent {
2287                                instance: self,
2288                                set: Some(set),
2289                            },
2290                        },
2291                    ));
2292                } else {
2293                    // No event is immediately available.
2294                    //
2295                    // We're waiting, so register to be woken up when an event
2296                    // is published for this waitable set.
2297                    //
2298                    // Here we also set `GuestTask::wake_on_cancel` which allows
2299                    // `subtask.cancel` to interrupt the wait.
2300                    let old = state
2301                        .get_mut(guest_thread.thread)?
2302                        .wake_on_cancel
2303                        .replace(set);
2304                    if !old.is_none() {
2305                        bail_bug!("thread unexpectedly had wake_on_cancel set");
2306                    }
2307                    let old = state
2308                        .get_mut(set)?
2309                        .waiting
2310                        .insert(guest_thread, WaitMode::Callback(self));
2311                    if !old.is_none() {
2312                        bail_bug!("set's waiting set already had this thread registered");
2313                    }
2314                }
2315                None
2316            }
2317            _ => bail!(Trap::UnsupportedCallbackCode),
2318        })
2319    }
2320
2321    /// Add the specified guest call to the "high priority" work item queue, to
2322    /// be started as soon as backpressure and/or reentrance rules allow.
2323    ///
2324    /// SAFETY: The raw pointer arguments must be valid references to guest
2325    /// functions (with the appropriate signatures) when the closures queued by
2326    /// this function are called.
2327    unsafe fn queue_call<T: 'static>(
2328        self,
2329        mut store: StoreContextMut<T>,
2330        guest_thread: QualifiedThreadId,
2331        callee: SendSyncPtr<VMFuncRef>,
2332        param_count: usize,
2333        result_count: usize,
2334        async_: bool,
2335        callback: Option<SendSyncPtr<VMFuncRef>>,
2336        post_return: Option<SendSyncPtr<VMFuncRef>>,
2337    ) -> Result<()> {
2338        /// Return a closure which will call the specified function in the scope
2339        /// of the specified task.
2340        ///
2341        /// This will use `GuestTask::lower_params` to lower the parameters, but
2342        /// will not lift the result; instead, it returns a
2343        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2344        /// any, may be lifted.  Note that an async-lifted export will have
2345        /// returned its result using the `task.return` intrinsic (or not
2346        /// returned a result at all, in the case of `task.cancel`), in which
2347        /// case the "result" of this call will either be a callback code or
2348        /// nothing.
2349        ///
2350        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2351        /// the returned closure is called.
2352        unsafe fn make_call<T: 'static>(
2353            store: StoreContextMut<T>,
2354            guest_thread: QualifiedThreadId,
2355            callee: SendSyncPtr<VMFuncRef>,
2356            param_count: usize,
2357            result_count: usize,
2358        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2359        + Send
2360        + Sync
2361        + 'static
2362        + use<T> {
2363            let token = StoreToken::new(store);
2364            move |store: &mut dyn VMStore| {
2365                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2366
2367                store
2368                    .concurrent_state_mut()
2369                    .get_mut(guest_thread.thread)?
2370                    .state = GuestThreadState::Running;
2371                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2372                let lower = match task.lower_params.take() {
2373                    Some(l) => l,
2374                    None => bail_bug!("lower_params missing"),
2375                };
2376
2377                lower(store, &mut storage[..param_count])?;
2378
2379                let mut store = token.as_context_mut(store);
2380
2381                // SAFETY: Per the contract documented in `make_call's`
2382                // documentation, `callee` must be a valid pointer.
2383                unsafe {
2384                    crate::Func::call_unchecked_raw(
2385                        &mut store,
2386                        callee.as_non_null(),
2387                        NonNull::new(
2388                            &mut storage[..param_count.max(result_count)]
2389                                as *mut [MaybeUninit<ValRaw>] as _,
2390                        )
2391                        .unwrap(),
2392                    )?;
2393                }
2394
2395                Ok(storage)
2396            }
2397        }
2398
2399        // SAFETY: Per the contract described in this function documentation,
2400        // the `callee` pointer which `call` closes over must be valid when
2401        // called by the closure we queue below.
2402        let call = unsafe {
2403            make_call(
2404                store.as_context_mut(),
2405                guest_thread,
2406                callee,
2407                param_count,
2408                result_count,
2409            )
2410        };
2411
2412        let callee_instance = store
2413            .0
2414            .concurrent_state_mut()
2415            .get_mut(guest_thread.task)?
2416            .instance;
2417
2418        let fun = if callback.is_some() {
2419            assert!(async_);
2420
2421            Box::new(move |store: &mut dyn VMStore| {
2422                self.add_guest_thread_to_instance_table(
2423                    guest_thread.thread,
2424                    store,
2425                    callee_instance.index,
2426                )?;
2427                let old_thread = store.set_thread(guest_thread)?;
2428                log::trace!(
2429                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2430                );
2431
2432                store.enter_instance(callee_instance);
2433
2434                // SAFETY: See the documentation for `make_call` to review the
2435                // contract we must uphold for `call` here.
2436                //
2437                // Per the contract described in the `queue_call`
2438                // documentation, the `callee` pointer which `call` closes
2439                // over must be valid.
2440                let storage = call(store)?;
2441
2442                store.exit_instance(callee_instance)?;
2443
2444                store.set_thread(old_thread)?;
2445                let state = store.concurrent_state_mut();
2446                if let Some(t) = old_thread.guest() {
2447                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
2448                }
2449                log::trace!("stackless call: restored {old_thread:?} as current thread");
2450
2451                // SAFETY: `wasmparser` will have validated that the callback
2452                // function returns a `i32` result.
2453                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2454
2455                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2456            })
2457                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2458        } else {
2459            let token = StoreToken::new(store.as_context_mut());
2460            Box::new(move |store: &mut dyn VMStore| {
2461                self.add_guest_thread_to_instance_table(
2462                    guest_thread.thread,
2463                    store,
2464                    callee_instance.index,
2465                )?;
2466                let old_thread = store.set_thread(guest_thread)?;
2467                log::trace!(
2468                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2469                );
2470                let flags = self.id().get(store).instance_flags(callee_instance.index);
2471
2472                // Unless this is a callback-less (i.e. stackful)
2473                // async-lifted export, we need to record that the instance
2474                // cannot be entered until the call returns.
2475                if !async_ {
2476                    store.enter_instance(callee_instance);
2477                }
2478
2479                // SAFETY: See the documentation for `make_call` to review the
2480                // contract we must uphold for `call` here.
2481                //
2482                // Per the contract described in the `queue_call`
2483                // documentation, the `callee` pointer which `call` closes
2484                // over must be valid.
2485                let storage = call(store)?;
2486
2487                if !async_ {
2488                    // This is a sync-lifted export, so now is when we lift the
2489                    // result, optionally call the post-return function, if any,
2490                    // and finally notify any current or future waiters that the
2491                    // subtask has returned.
2492
2493                    let lift = {
2494                        store.exit_instance(callee_instance)?;
2495
2496                        let state = store.concurrent_state_mut();
2497                        if !state.get_mut(guest_thread.task)?.result.is_none() {
2498                            bail_bug!("task has already produced a result");
2499                        }
2500
2501                        match state.get_mut(guest_thread.task)?.lift_result.take() {
2502                            Some(lift) => lift,
2503                            None => bail_bug!("lift_result field is missing"),
2504                        }
2505                    };
2506
2507                    // SAFETY: `result_count` represents the number of core Wasm
2508                    // results returned, per `wasmparser`.
2509                    let result = (lift.lift)(store, unsafe {
2510                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2511                            &storage[..result_count],
2512                        )
2513                    })?;
2514
2515                    let post_return_arg = match result_count {
2516                        0 => ValRaw::i32(0),
2517                        // SAFETY: `result_count` represents the number of
2518                        // core Wasm results returned, per `wasmparser`.
2519                        1 => unsafe { storage[0].assume_init() },
2520                        _ => unreachable!(),
2521                    };
2522
2523                    unsafe {
2524                        call_post_return(
2525                            token.as_context_mut(store),
2526                            post_return.map(|v| v.as_non_null()),
2527                            post_return_arg,
2528                            flags,
2529                        )?;
2530                    }
2531
2532                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2533                }
2534
2535                store.set_thread(old_thread)?;
2536
2537                store
2538                    .concurrent_state_mut()
2539                    .get_mut(guest_thread.task)?
2540                    .exited = true;
2541
2542                // This is a callback-less call, so the implicit thread has now completed
2543                store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2544                Ok(None)
2545            })
2546        };
2547
2548        store
2549            .0
2550            .concurrent_state_mut()
2551            .push_high_priority(WorkItem::GuestCall(
2552                callee_instance.index,
2553                GuestCall {
2554                    thread: guest_thread,
2555                    kind: GuestCallKind::StartImplicit(fun),
2556                },
2557            ));
2558
2559        Ok(())
2560    }
2561
2562    /// Prepare (but do not start) a guest->guest call.
2563    ///
2564    /// This is called from fused adapter code generated in
2565    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2566    /// are synthesized Wasm functions which move the parameters from the caller
2567    /// to the callee and the result from the callee to the caller,
2568    /// respectively.  The adapter will call `Self::start_call` immediately
2569    /// after calling this function.
2570    ///
2571    /// SAFETY: All the pointer arguments must be valid pointers to guest
2572    /// entities (and with the expected signatures for the function references
2573    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2574    unsafe fn prepare_call<T: 'static>(
2575        self,
2576        mut store: StoreContextMut<T>,
2577        start: NonNull<VMFuncRef>,
2578        return_: NonNull<VMFuncRef>,
2579        caller_instance: RuntimeComponentInstanceIndex,
2580        callee_instance: RuntimeComponentInstanceIndex,
2581        task_return_type: TypeTupleIndex,
2582        callee_async: bool,
2583        memory: *mut VMMemoryDefinition,
2584        string_encoding: StringEncoding,
2585        caller_info: CallerInfo,
2586    ) -> Result<()> {
2587        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2588            // A task may only call an async-typed function via a sync lower if
2589            // it was created by a call to an async export.  Otherwise, we'll
2590            // trap.
2591            store.0.check_blocking()?;
2592        }
2593
2594        enum ResultInfo {
2595            Heap { results: u32 },
2596            Stack { result_count: u32 },
2597        }
2598
2599        let result_info = match &caller_info {
2600            CallerInfo::Async {
2601                has_result: true,
2602                params,
2603            } => ResultInfo::Heap {
2604                results: match params.last() {
2605                    Some(r) => r.get_u32(),
2606                    None => bail_bug!("retptr missing"),
2607                },
2608            },
2609            CallerInfo::Async {
2610                has_result: false, ..
2611            } => ResultInfo::Stack { result_count: 0 },
2612            CallerInfo::Sync {
2613                result_count,
2614                params,
2615            } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2616                results: match params.last() {
2617                    Some(r) => r.get_u32(),
2618                    None => bail_bug!("arg ptr missing"),
2619                },
2620            },
2621            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2622                result_count: *result_count,
2623            },
2624        };
2625
2626        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2627
2628        // Create a new guest task for the call, closing over the `start` and
2629        // `return_` functions to lift the parameters and lower the result,
2630        // respectively.
2631        let start = SendSyncPtr::new(start);
2632        let return_ = SendSyncPtr::new(return_);
2633        let token = StoreToken::new(store.as_context_mut());
2634        let state = store.0.concurrent_state_mut();
2635        let old_thread = state.current_guest_thread()?;
2636
2637        debug_assert_eq!(
2638            state.get_mut(old_thread.task)?.instance,
2639            self.runtime_instance(caller_instance)
2640        );
2641
2642        let guest_thread = GuestTask::new(
2643            state,
2644            Box::new(move |store, dst| {
2645                let mut store = token.as_context_mut(store);
2646                assert!(dst.len() <= MAX_FLAT_PARAMS);
2647                // The `+ 1` here accounts for the return pointer, if any:
2648                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2649                let count = match caller_info {
2650                    // Async callers, if they have a result, use the last
2651                    // parameter as a return pointer so chop that off if
2652                    // relevant here.
2653                    CallerInfo::Async { params, has_result } => {
2654                        let params = &params[..params.len() - usize::from(has_result)];
2655                        for (param, src) in params.iter().zip(&mut src) {
2656                            src.write(*param);
2657                        }
2658                        params.len()
2659                    }
2660
2661                    // Sync callers forward everything directly.
2662                    CallerInfo::Sync { params, .. } => {
2663                        for (param, src) in params.iter().zip(&mut src) {
2664                            src.write(*param);
2665                        }
2666                        params.len()
2667                    }
2668                };
2669                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2670                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2671                // how it was constructed (see
2672                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2673                // for details) we know it takes count parameters and returns
2674                // `dst.len()` results.
2675                unsafe {
2676                    crate::Func::call_unchecked_raw(
2677                        &mut store,
2678                        start.as_non_null(),
2679                        NonNull::new(
2680                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2681                        )
2682                        .unwrap(),
2683                    )?;
2684                }
2685                dst.copy_from_slice(&src[..dst.len()]);
2686                let state = store.0.concurrent_state_mut();
2687                Waitable::Guest(state.current_guest_thread()?.task).set_event(
2688                    state,
2689                    Some(Event::Subtask {
2690                        status: Status::Started,
2691                    }),
2692                )?;
2693                Ok(())
2694            }),
2695            LiftResult {
2696                lift: Box::new(move |store, src| {
2697                    // SAFETY: See comment in closure passed as `lower_params`
2698                    // parameter above.
2699                    let mut store = token.as_context_mut(store);
2700                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2701                    if let ResultInfo::Heap { results } = &result_info {
2702                        my_src.push(ValRaw::u32(*results));
2703                    }
2704
2705                    // Execute the `return_` hook, generated by Wasmtime's FACT
2706                    // compiler, in the context of the old thread. The old
2707                    // thread, this thread's caller, may have `realloc`
2708                    // callbacks invoked for example and those need the correct
2709                    // context set for the current thread.
2710                    let prev = store.0.set_thread(old_thread)?;
2711
2712                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2713                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2714                    // on how it was constructed (see
2715                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2716                    // for details) we know it takes `src.len()` parameters and
2717                    // returns up to 1 result.
2718                    unsafe {
2719                        crate::Func::call_unchecked_raw(
2720                            &mut store,
2721                            return_.as_non_null(),
2722                            my_src.as_mut_slice().into(),
2723                        )?;
2724                    }
2725
2726                    // Restore the previous current thread after the
2727                    // lifting/lowering has returned.
2728                    store.0.set_thread(prev)?;
2729
2730                    let state = store.0.concurrent_state_mut();
2731                    let thread = state.current_guest_thread()?;
2732                    if sync_caller {
2733                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2734                            if let ResultInfo::Stack { result_count } = &result_info {
2735                                match result_count {
2736                                    0 => None,
2737                                    1 => Some(my_src[0]),
2738                                    _ => unreachable!(),
2739                                }
2740                            } else {
2741                                None
2742                            },
2743                        );
2744                    }
2745                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2746                }),
2747                ty: task_return_type,
2748                memory: NonNull::new(memory).map(SendSyncPtr::new),
2749                string_encoding,
2750            },
2751            Caller::Guest { thread: old_thread },
2752            None,
2753            self.runtime_instance(callee_instance),
2754            callee_async,
2755        )?;
2756
2757        // Make the new thread the current one so that `Self::start_call` knows
2758        // which one to start.
2759        store.0.set_thread(guest_thread)?;
2760        log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2761
2762        Ok(())
2763    }
2764
2765    /// Call the specified callback function for an async-lifted export.
2766    ///
2767    /// SAFETY: `function` must be a valid reference to a guest function of the
2768    /// correct signature for a callback.
2769    unsafe fn call_callback<T>(
2770        self,
2771        mut store: StoreContextMut<T>,
2772        function: SendSyncPtr<VMFuncRef>,
2773        event: Event,
2774        handle: u32,
2775    ) -> Result<u32> {
2776        let (ordinal, result) = event.parts();
2777        let params = &mut [
2778            ValRaw::u32(ordinal),
2779            ValRaw::u32(handle),
2780            ValRaw::u32(result),
2781        ];
2782        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2783        // `wasmtime-cranelift`-generated fused adapter code or
2784        // `component::Options`.  Per `wasmparser` callback signature
2785        // validation, we know it takes three parameters and returns one.
2786        unsafe {
2787            crate::Func::call_unchecked_raw(
2788                &mut store,
2789                function.as_non_null(),
2790                params.as_mut_slice().into(),
2791            )?;
2792        }
2793        Ok(params[0].get_u32())
2794    }
2795
2796    /// Start a guest->guest call previously prepared using
2797    /// `Self::prepare_call`.
2798    ///
2799    /// This is called from fused adapter code generated in
2800    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2801    /// this function immediately after calling `Self::prepare_call`.
2802    ///
2803    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2804    /// functions with the appropriate signatures for the current guest task.
2805    /// If this is a call to an async-lowered import, the actual call may be
2806    /// deferred and run after this function returns, in which case the pointer
2807    /// arguments must also be valid when the call happens.
2808    unsafe fn start_call<T: 'static>(
2809        self,
2810        mut store: StoreContextMut<T>,
2811        callback: *mut VMFuncRef,
2812        post_return: *mut VMFuncRef,
2813        callee: NonNull<VMFuncRef>,
2814        param_count: u32,
2815        result_count: u32,
2816        flags: u32,
2817        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2818    ) -> Result<u32> {
2819        let token = StoreToken::new(store.as_context_mut());
2820        let async_caller = storage.is_none();
2821        let state = store.0.concurrent_state_mut();
2822        let guest_thread = state.current_guest_thread()?;
2823        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2824        let callee = SendSyncPtr::new(callee);
2825        let param_count = usize::try_from(param_count)?;
2826        assert!(param_count <= MAX_FLAT_PARAMS);
2827        let result_count = usize::try_from(result_count)?;
2828        assert!(result_count <= MAX_FLAT_RESULTS);
2829
2830        let task = state.get_mut(guest_thread.task)?;
2831        if let Some(callback) = NonNull::new(callback) {
2832            // We're calling an async-lifted export with a callback, so store
2833            // the callback and related context as part of the task so we can
2834            // call it later when needed.
2835            let callback = SendSyncPtr::new(callback);
2836            task.callback = Some(Box::new(move |store, event, handle| {
2837                let store = token.as_context_mut(store);
2838                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2839            }));
2840        }
2841
2842        let Caller::Guest { thread: caller } = &task.caller else {
2843            // As of this writing, `start_call` is only used for guest->guest
2844            // calls.
2845            bail_bug!("start_call unexpectedly invoked for host->guest call");
2846        };
2847        let caller = *caller;
2848        let caller_instance = state.get_mut(caller.task)?.instance;
2849
2850        // Queue the call as a "high priority" work item.
2851        unsafe {
2852            self.queue_call(
2853                store.as_context_mut(),
2854                guest_thread,
2855                callee,
2856                param_count,
2857                result_count,
2858                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2859                NonNull::new(callback).map(SendSyncPtr::new),
2860                NonNull::new(post_return).map(SendSyncPtr::new),
2861            )?;
2862        }
2863
2864        let state = store.0.concurrent_state_mut();
2865
2866        // Use the caller's `GuestThread::sync_call_set` to register interest in
2867        // the subtask...
2868        let guest_waitable = Waitable::Guest(guest_thread.task);
2869        let old_set = guest_waitable.common(state)?.set;
2870        let set = state.get_mut(caller.thread)?.sync_call_set;
2871        guest_waitable.join(state, Some(set))?;
2872
2873        store.0.set_thread(CurrentThread::None)?;
2874
2875        // ... and suspend this fiber temporarily while we wait for it to start.
2876        //
2877        // Note that we _could_ call the callee directly using the current fiber
2878        // rather than suspend this one, but that would make reasoning about the
2879        // event loop more complicated and is probably only worth doing if
2880        // there's a measurable performance benefit.  In addition, it would mean
2881        // blocking the caller if the callee calls a blocking sync-lowered
2882        // import, and as of this writing the spec says we must not do that.
2883        //
2884        // Alternatively, the fused adapter code could be modified to call the
2885        // callee directly without calling a host-provided intrinsic at all (in
2886        // which case it would need to do its own, inline backpressure checks,
2887        // etc.).  Again, we'd want to see a measurable performance benefit
2888        // before committing to such an optimization.  And again, we'd need to
2889        // update the spec to allow that.
2890        let (status, waitable) = loop {
2891            store.0.suspend(SuspendReason::Waiting {
2892                set,
2893                thread: caller,
2894                // Normally, `StoreOpaque::suspend` would assert it's being
2895                // called from a context where blocking is allowed.  However, if
2896                // `async_caller` is `true`, we'll only "block" long enough for
2897                // the callee to start, i.e. we won't repeat this loop, so we
2898                // tell `suspend` it's okay even if we're not allowed to block.
2899                // Alternatively, if the callee is not an async function, then
2900                // we know it won't block anyway.
2901                skip_may_block_check: async_caller || !callee_async,
2902            })?;
2903
2904            let state = store.0.concurrent_state_mut();
2905
2906            log::trace!("taking event for {:?}", guest_thread.task);
2907            let event = guest_waitable.take_event(state)?;
2908            let Some(Event::Subtask { status }) = event else {
2909                bail_bug!("subtasks should only get subtask events, got {event:?}")
2910            };
2911
2912            log::trace!("status {status:?} for {:?}", guest_thread.task);
2913
2914            if status == Status::Returned {
2915                // It returned, so we can stop waiting.
2916                break (status, None);
2917            } else if async_caller {
2918                // It hasn't returned yet, but the caller is calling via an
2919                // async-lowered import, so we generate a handle for the task
2920                // waitable and return the status.
2921                let handle = store
2922                    .0
2923                    .instance_state(caller_instance)
2924                    .handle_table()
2925                    .subtask_insert_guest(guest_thread.task.rep())?;
2926                store
2927                    .0
2928                    .concurrent_state_mut()
2929                    .get_mut(guest_thread.task)?
2930                    .common
2931                    .handle = Some(handle);
2932                break (status, Some(handle));
2933            } else {
2934                // The callee hasn't returned yet, and the caller is calling via
2935                // a sync-lowered import, so we loop and keep waiting until the
2936                // callee returns.
2937            }
2938        };
2939
2940        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2941
2942        // Reset the current thread to point to the caller as it resumes control.
2943        store.0.set_thread(caller)?;
2944        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2945        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2946
2947        if let Some(storage) = storage {
2948            // The caller used a sync-lowered import to call an async-lifted
2949            // export, in which case the result, if any, has been stashed in
2950            // `GuestTask::sync_result`.
2951            let state = store.0.concurrent_state_mut();
2952            let task = state.get_mut(guest_thread.task)?;
2953            if let Some(result) = task.sync_result.take()? {
2954                if let Some(result) = result {
2955                    storage[0] = MaybeUninit::new(result);
2956                }
2957
2958                if task.exited && task.ready_to_delete() {
2959                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2960                }
2961            }
2962        }
2963
2964        Ok(status.pack(waitable))
2965    }
2966
2967    /// Poll the specified future once on behalf of a guest->host call using an
2968    /// async-lowered import.
2969    ///
2970    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2971    /// `Pending`, add it to the set of futures to be polled as part of this
2972    /// instance's event loop until it completes, and then return
2973    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2974    ///
2975    /// Whether the future returns `Ready` immediately or later, the `lower`
2976    /// function will be used to lower the result, if any, into the guest caller's
2977    /// stack and linear memory. The `lower` function is invoked with `None` if
2978    /// the future is cancelled.
2979    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2980        self,
2981        mut store: StoreContextMut<'_, T>,
2982        future: impl Future<Output = Result<R>> + Send + 'static,
2983        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2984    ) -> Result<Option<u32>> {
2985        let token = StoreToken::new(store.as_context_mut());
2986        let state = store.0.concurrent_state_mut();
2987        let task = state.current_host_thread()?;
2988
2989        // Create an abortable future which hooks calls to poll and manages call
2990        // context state for the future.
2991        let (join_handle, future) = JoinHandle::run(future);
2992        {
2993            let state = &mut state.get_mut(task)?.state;
2994            assert!(matches!(state, HostTaskState::CalleeStarted));
2995            *state = HostTaskState::CalleeRunning(join_handle);
2996        }
2997
2998        let mut future = Box::pin(future);
2999
3000        // Finally, poll the future.  We can use a dummy `Waker` here because
3001        // we'll add the future to `ConcurrentState::futures` and poll it
3002        // automatically from the event loop if it doesn't complete immediately
3003        // here.
3004        let poll = tls::set(store.0, || {
3005            future
3006                .as_mut()
3007                .poll(&mut Context::from_waker(&Waker::noop()))
3008        });
3009
3010        match poll {
3011            // It finished immediately; lower the result and delete the task.
3012            Poll::Ready(result) => {
3013                let result = result.transpose()?;
3014                lower(store.as_context_mut(), result)?;
3015                return Ok(None);
3016            }
3017
3018            // Future isn't ready yet, so fall through.
3019            Poll::Pending => {}
3020        }
3021
3022        // It hasn't finished yet; add the future to
3023        // `ConcurrentState::futures` so it will be polled by the event
3024        // loop and allocate a waitable handle to return to the guest.
3025
3026        // Wrap the future in a closure responsible for lowering the result into
3027        // the guest's stack and memory, as well as notifying any waiters that
3028        // the task returned.
3029        let future = Box::pin(async move {
3030            let result = match future.await {
3031                Some(result) => Some(result?),
3032                None => None,
3033            };
3034            let on_complete = move |store: &mut dyn VMStore| {
3035                // Restore the `current_thread` to be the host so `lower` knows
3036                // how to manipulate borrows and knows which scope of borrows
3037                // to check.
3038                let mut store = token.as_context_mut(store);
3039                let old = store.0.set_thread(task)?;
3040
3041                let status = if result.is_some() {
3042                    Status::Returned
3043                } else {
3044                    Status::ReturnCancelled
3045                };
3046
3047                lower(store.as_context_mut(), result)?;
3048                let state = store.0.concurrent_state_mut();
3049                match &mut state.get_mut(task)?.state {
3050                    // The task is already flagged as finished because it was
3051                    // cancelled. No need to transition further.
3052                    HostTaskState::CalleeDone { .. } => {}
3053
3054                    // Otherwise transition this task to the done state.
3055                    other => *other = HostTaskState::CalleeDone { cancelled: false },
3056                }
3057                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3058
3059                store.0.set_thread(old)?;
3060                Ok(())
3061            };
3062
3063            // Here we schedule a task to run on a worker fiber to do the
3064            // lowering since it may involve a call to the guest's realloc
3065            // function. This is necessary because calling the guest while
3066            // there are host embedder frames on the stack is unsound.
3067            tls::get(move |store| {
3068                store
3069                    .concurrent_state_mut()
3070                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3071                        on_complete,
3072                    ))));
3073                Ok(())
3074            })
3075        });
3076
3077        // Make this task visible to the guest and then record what it
3078        // was made visible as.
3079        let state = store.0.concurrent_state_mut();
3080        state.push_future(future);
3081        let caller = state.get_mut(task)?.caller;
3082        let instance = state.get_mut(caller.task)?.instance;
3083        let handle = store
3084            .0
3085            .instance_state(instance)
3086            .handle_table()
3087            .subtask_insert_host(task.rep())?;
3088        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3089        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3090
3091        // Restore the currently running thread to this host task's
3092        // caller. Note that the host task isn't deallocated as it's
3093        // within the store and will get deallocated later.
3094        store.0.set_thread(caller)?;
3095        Ok(Some(handle))
3096    }
3097
3098    /// Implements the `task.return` intrinsic, lifting the result for the
3099    /// current guest task.
3100    pub(crate) fn task_return(
3101        self,
3102        store: &mut dyn VMStore,
3103        ty: TypeTupleIndex,
3104        options: OptionsIndex,
3105        storage: &[ValRaw],
3106    ) -> Result<()> {
3107        let state = store.concurrent_state_mut();
3108        let guest_thread = state.current_guest_thread()?;
3109        let lift = state
3110            .get_mut(guest_thread.task)?
3111            .lift_result
3112            .take()
3113            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3114        if !state.get_mut(guest_thread.task)?.result.is_none() {
3115            bail_bug!("task result unexpectedly already set");
3116        }
3117
3118        let CanonicalOptions {
3119            string_encoding,
3120            data_model,
3121            ..
3122        } = &self.id().get(store).component().env_component().options[options];
3123
3124        let invalid = ty != lift.ty
3125            || string_encoding != &lift.string_encoding
3126            || match data_model {
3127                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3128                    Some(memory) => {
3129                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3130                        let actual = self.id().get(store).runtime_memory(memory);
3131                        expected != actual.as_ptr()
3132                    }
3133                    // Memory not specified, meaning it didn't need to be
3134                    // specified per validation, so not invalid.
3135                    None => false,
3136                },
3137                // Always invalid as this isn't supported.
3138                CanonicalOptionsDataModel::Gc { .. } => true,
3139            };
3140
3141        if invalid {
3142            bail!(Trap::TaskReturnInvalid);
3143        }
3144
3145        log::trace!("task.return for {guest_thread:?}");
3146
3147        let result = (lift.lift)(store, storage)?;
3148        self.task_complete(store, guest_thread.task, result, Status::Returned)
3149    }
3150
3151    /// Implements the `task.cancel` intrinsic.
3152    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3153        let state = store.concurrent_state_mut();
3154        let guest_thread = state.current_guest_thread()?;
3155        let task = state.get_mut(guest_thread.task)?;
3156        if !task.cancel_sent {
3157            bail!(Trap::TaskCancelNotCancelled);
3158        }
3159        _ = task
3160            .lift_result
3161            .take()
3162            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3163
3164        if !task.result.is_none() {
3165            bail_bug!("task result should not bet set yet");
3166        }
3167
3168        log::trace!("task.cancel for {guest_thread:?}");
3169
3170        self.task_complete(
3171            store,
3172            guest_thread.task,
3173            Box::new(DummyResult),
3174            Status::ReturnCancelled,
3175        )
3176    }
3177
3178    /// Complete the specified guest task (i.e. indicate that it has either
3179    /// returned a (possibly empty) result or cancelled itself).
3180    ///
3181    /// This will return any resource borrows and notify any current or future
3182    /// waiters that the task has completed.
3183    fn task_complete(
3184        self,
3185        store: &mut StoreOpaque,
3186        guest_task: TableId<GuestTask>,
3187        result: Box<dyn Any + Send + Sync>,
3188        status: Status,
3189    ) -> Result<()> {
3190        store
3191            .component_resource_tables(Some(self))
3192            .validate_scope_exit()?;
3193
3194        let state = store.concurrent_state_mut();
3195        let task = state.get_mut(guest_task)?;
3196
3197        if let Caller::Host { tx, .. } = &mut task.caller {
3198            if let Some(tx) = tx.take() {
3199                _ = tx.send(result);
3200            }
3201        } else {
3202            task.result = Some(result);
3203            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3204        }
3205
3206        Ok(())
3207    }
3208
3209    /// Implements the `waitable-set.new` intrinsic.
3210    pub(crate) fn waitable_set_new(
3211        self,
3212        store: &mut StoreOpaque,
3213        caller_instance: RuntimeComponentInstanceIndex,
3214    ) -> Result<u32> {
3215        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3216        let handle = store
3217            .instance_state(self.runtime_instance(caller_instance))
3218            .handle_table()
3219            .waitable_set_insert(set.rep())?;
3220        log::trace!("new waitable set {set:?} (handle {handle})");
3221        Ok(handle)
3222    }
3223
3224    /// Implements the `waitable-set.drop` intrinsic.
3225    pub(crate) fn waitable_set_drop(
3226        self,
3227        store: &mut StoreOpaque,
3228        caller_instance: RuntimeComponentInstanceIndex,
3229        set: u32,
3230    ) -> Result<()> {
3231        let rep = store
3232            .instance_state(self.runtime_instance(caller_instance))
3233            .handle_table()
3234            .waitable_set_remove(set)?;
3235
3236        log::trace!("drop waitable set {rep} (handle {set})");
3237
3238        // Note that we're careful to check for waiters _before_ deleting the
3239        // set to avoid dropping any waiters in `WaitMode::Fiber(_)`, which
3240        // would panic.  See `drop-waitable-set-with-waiters.wast` for details.
3241        if !store
3242            .concurrent_state_mut()
3243            .get_mut(TableId::<WaitableSet>::new(rep))?
3244            .waiting
3245            .is_empty()
3246        {
3247            bail!(Trap::WaitableSetDropHasWaiters);
3248        }
3249
3250        store
3251            .concurrent_state_mut()
3252            .delete(TableId::<WaitableSet>::new(rep))?;
3253
3254        Ok(())
3255    }
3256
3257    /// Implements the `waitable.join` intrinsic.
3258    pub(crate) fn waitable_join(
3259        self,
3260        store: &mut StoreOpaque,
3261        caller_instance: RuntimeComponentInstanceIndex,
3262        waitable_handle: u32,
3263        set_handle: u32,
3264    ) -> Result<()> {
3265        let mut instance = self.id().get_mut(store);
3266        let waitable =
3267            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3268
3269        let set = if set_handle == 0 {
3270            None
3271        } else {
3272            let set = instance.instance_states().0[caller_instance]
3273                .handle_table()
3274                .waitable_set_rep(set_handle)?;
3275
3276            let state = store.concurrent_state_mut();
3277            if let Some(old) = waitable.common(state)?.set
3278                && state.get_mut(old)?.is_sync_call_set
3279            {
3280                bail!(Trap::WaitableSyncAndAsync);
3281            }
3282
3283            Some(TableId::<WaitableSet>::new(set))
3284        };
3285
3286        log::trace!(
3287            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3288        );
3289
3290        waitable.join(store.concurrent_state_mut(), set)
3291    }
3292
3293    /// Implements the `subtask.drop` intrinsic.
3294    pub(crate) fn subtask_drop(
3295        self,
3296        store: &mut StoreOpaque,
3297        caller_instance: RuntimeComponentInstanceIndex,
3298        task_id: u32,
3299    ) -> Result<()> {
3300        self.waitable_join(store, caller_instance, task_id, 0)?;
3301
3302        let (rep, is_host) = store
3303            .instance_state(self.runtime_instance(caller_instance))
3304            .handle_table()
3305            .subtask_remove(task_id)?;
3306
3307        let concurrent_state = store.concurrent_state_mut();
3308        let (waitable, delete) = if is_host {
3309            let id = TableId::<HostTask>::new(rep);
3310            let task = concurrent_state.get_mut(id)?;
3311            match &task.state {
3312                HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3313                HostTaskState::CalleeDone { .. } => {}
3314                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3315                    bail_bug!("invalid state for callee in `subtask.drop`")
3316                }
3317            }
3318            (Waitable::Host(id), true)
3319        } else {
3320            let id = TableId::<GuestTask>::new(rep);
3321            let task = concurrent_state.get_mut(id)?;
3322            if task.lift_result.is_some() {
3323                bail!(Trap::SubtaskDropNotResolved);
3324            }
3325            (
3326                Waitable::Guest(id),
3327                concurrent_state.get_mut(id)?.ready_to_delete(),
3328            )
3329        };
3330
3331        waitable.common(concurrent_state)?.handle = None;
3332
3333        // If this subtask has an event that means that the terminal status of
3334        // this subtask wasn't yet received so it can't be dropped yet.
3335        if waitable.take_event(concurrent_state)?.is_some() {
3336            bail!(Trap::SubtaskDropNotResolved);
3337        }
3338
3339        if delete {
3340            waitable.delete_from(concurrent_state)?;
3341        }
3342
3343        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3344        Ok(())
3345    }
3346
3347    /// Implements the `waitable-set.wait` intrinsic.
3348    pub(crate) fn waitable_set_wait(
3349        self,
3350        store: &mut StoreOpaque,
3351        options: OptionsIndex,
3352        set: u32,
3353        payload: u32,
3354    ) -> Result<u32> {
3355        if !self.options(store, options).async_ {
3356            // The caller may only call `waitable-set.wait` from an async task
3357            // (i.e. a task created via a call to an async export).
3358            // Otherwise, we'll trap.
3359            store.check_blocking()?;
3360        }
3361
3362        let &CanonicalOptions {
3363            cancellable,
3364            instance: caller_instance,
3365            ..
3366        } = &self.id().get(store).component().env_component().options[options];
3367        let rep = store
3368            .instance_state(self.runtime_instance(caller_instance))
3369            .handle_table()
3370            .waitable_set_rep(set)?;
3371
3372        self.waitable_check(
3373            store,
3374            cancellable,
3375            WaitableCheck::Wait,
3376            WaitableCheckParams {
3377                set: TableId::new(rep),
3378                options,
3379                payload,
3380            },
3381        )
3382    }
3383
3384    /// Implements the `waitable-set.poll` intrinsic.
3385    pub(crate) fn waitable_set_poll(
3386        self,
3387        store: &mut StoreOpaque,
3388        options: OptionsIndex,
3389        set: u32,
3390        payload: u32,
3391    ) -> Result<u32> {
3392        let &CanonicalOptions {
3393            cancellable,
3394            instance: caller_instance,
3395            ..
3396        } = &self.id().get(store).component().env_component().options[options];
3397        let rep = store
3398            .instance_state(self.runtime_instance(caller_instance))
3399            .handle_table()
3400            .waitable_set_rep(set)?;
3401
3402        self.waitable_check(
3403            store,
3404            cancellable,
3405            WaitableCheck::Poll,
3406            WaitableCheckParams {
3407                set: TableId::new(rep),
3408                options,
3409                payload,
3410            },
3411        )
3412    }
3413
3414    /// Implements the `thread.index` intrinsic.
3415    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3416        let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3417        match store
3418            .concurrent_state_mut()
3419            .get_mut(thread_id)?
3420            .instance_rep
3421        {
3422            Some(r) => Ok(r),
3423            None => bail_bug!("thread should have instance_rep by now"),
3424        }
3425    }
3426
3427    /// Implements the `thread.new-indirect` intrinsic.
3428    pub(crate) fn thread_new_indirect<T: 'static>(
3429        self,
3430        mut store: StoreContextMut<T>,
3431        runtime_instance: RuntimeComponentInstanceIndex,
3432        _func_ty_idx: TypeFuncIndex, // currently unused
3433        start_func_table_idx: RuntimeTableIndex,
3434        start_func_idx: u32,
3435        context: i32,
3436    ) -> Result<u32> {
3437        log::trace!("creating new thread");
3438
3439        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3440        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3441        let callee = instance
3442            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3443            .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3444        if callee.type_index(store.0) != start_func_ty.type_index() {
3445            bail!(Trap::ThreadNewIndirectInvalidType);
3446        }
3447
3448        let token = StoreToken::new(store.as_context_mut());
3449        let start_func = Box::new(
3450            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3451                let old_thread = store.set_thread(guest_thread)?;
3452                log::trace!(
3453                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3454                );
3455
3456                let mut store = token.as_context_mut(store);
3457                let mut params = [ValRaw::i32(context)];
3458                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3459                // on a separate fiber if we're running in an async store.
3460                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3461
3462                store.0.set_thread(old_thread)?;
3463
3464                store.0.cleanup_thread(
3465                    guest_thread,
3466                    self.runtime_instance(runtime_instance),
3467                    CleanupTask::Yes,
3468                )?;
3469                log::trace!("explicit thread {guest_thread:?} completed");
3470                let state = store.0.concurrent_state_mut();
3471                if let Some(t) = old_thread.guest() {
3472                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
3473                }
3474                log::trace!("thread start: restored {old_thread:?} as current thread");
3475
3476                Ok(())
3477            },
3478        );
3479
3480        let state = store.0.concurrent_state_mut();
3481        let current_thread = state.current_guest_thread()?;
3482        let parent_task = current_thread.task;
3483
3484        let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3485        let thread_id = state.push(new_thread)?;
3486        state.get_mut(parent_task)?.threads.insert(thread_id);
3487
3488        log::trace!("new thread with id {thread_id:?} created");
3489
3490        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3491    }
3492
3493    pub(crate) fn resume_thread(
3494        self,
3495        store: &mut StoreOpaque,
3496        runtime_instance: RuntimeComponentInstanceIndex,
3497        thread_idx: u32,
3498        high_priority: bool,
3499        allow_ready: bool,
3500    ) -> Result<()> {
3501        let thread_id =
3502            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3503        let state = store.concurrent_state_mut();
3504        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3505        let thread = state.get_mut(guest_thread.thread)?;
3506
3507        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3508            GuestThreadState::NotStartedExplicit(start_func) => {
3509                log::trace!("starting thread {guest_thread:?}");
3510                let guest_call = WorkItem::GuestCall(
3511                    runtime_instance,
3512                    GuestCall {
3513                        thread: guest_thread,
3514                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3515                            start_func(store, guest_thread)
3516                        })),
3517                    },
3518                );
3519                store
3520                    .concurrent_state_mut()
3521                    .push_work_item(guest_call, high_priority);
3522            }
3523            GuestThreadState::Suspended(fiber) => {
3524                log::trace!("resuming thread {thread_id:?} that was suspended");
3525                store
3526                    .concurrent_state_mut()
3527                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3528            }
3529            GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3530                log::trace!("resuming thread {thread_id:?} that was ready");
3531                thread.state = GuestThreadState::Ready { fiber, cancellable };
3532                store
3533                    .concurrent_state_mut()
3534                    .promote_thread_work_item(guest_thread);
3535            }
3536            other => {
3537                thread.state = other;
3538                bail!(Trap::CannotResumeThread);
3539            }
3540        }
3541        Ok(())
3542    }
3543
3544    fn add_guest_thread_to_instance_table(
3545        self,
3546        thread_id: TableId<GuestThread>,
3547        store: &mut StoreOpaque,
3548        runtime_instance: RuntimeComponentInstanceIndex,
3549    ) -> Result<u32> {
3550        let guest_id = store
3551            .instance_state(self.runtime_instance(runtime_instance))
3552            .thread_handle_table()
3553            .guest_thread_insert(thread_id.rep())?;
3554        store
3555            .concurrent_state_mut()
3556            .get_mut(thread_id)?
3557            .instance_rep = Some(guest_id);
3558        Ok(guest_id)
3559    }
3560
3561    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3562    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3563    pub(crate) fn suspension_intrinsic(
3564        self,
3565        store: &mut StoreOpaque,
3566        caller: RuntimeComponentInstanceIndex,
3567        cancellable: bool,
3568        yielding: bool,
3569        to_thread: SuspensionTarget,
3570    ) -> Result<WaitResult> {
3571        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3572        if to_thread.is_none() {
3573            let state = store.concurrent_state_mut();
3574            if yielding {
3575                // This is a `thread.yield` call
3576                if !state.may_block(guest_thread.task)? {
3577                    // In a non-blocking context, a `thread.yield` may trigger
3578                    // other threads in the same component instance to run.
3579                    if !state.promote_instance_local_thread_work_item(caller) {
3580                        // No other threads are runnable, so just return
3581                        return Ok(WaitResult::Completed);
3582                    }
3583                }
3584            } else {
3585                // The caller may only call `thread.suspend` from an async task
3586                // (i.e. a task created via a call to an async export).
3587                // Otherwise, we'll trap.
3588                store.check_blocking()?;
3589            }
3590        }
3591
3592        // There could be a pending cancellation from a previous uncancellable wait
3593        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3594            return Ok(WaitResult::Cancelled);
3595        }
3596
3597        match to_thread {
3598            SuspensionTarget::SomeSuspended(thread) => {
3599                self.resume_thread(store, caller, thread, true, false)?
3600            }
3601            SuspensionTarget::Some(thread) => {
3602                self.resume_thread(store, caller, thread, true, true)?
3603            }
3604            SuspensionTarget::None => { /* nothing to do */ }
3605        }
3606
3607        let reason = if yielding {
3608            SuspendReason::Yielding {
3609                thread: guest_thread,
3610                cancellable,
3611                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3612                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3613                // panic if we called it in a non-blocking context.
3614                skip_may_block_check: to_thread.is_some(),
3615            }
3616        } else {
3617            SuspendReason::ExplicitlySuspending {
3618                thread: guest_thread,
3619                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3620                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3621                // panic if we called it in a non-blocking context.
3622                skip_may_block_check: to_thread.is_some(),
3623            }
3624        };
3625
3626        store.suspend(reason)?;
3627
3628        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3629            Ok(WaitResult::Cancelled)
3630        } else {
3631            Ok(WaitResult::Completed)
3632        }
3633    }
3634
3635    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3636    fn waitable_check(
3637        self,
3638        store: &mut StoreOpaque,
3639        cancellable: bool,
3640        check: WaitableCheck,
3641        params: WaitableCheckParams,
3642    ) -> Result<u32> {
3643        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3644
3645        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3646
3647        let state = store.concurrent_state_mut();
3648        let task = state.get_mut(guest_thread.task)?;
3649
3650        // If we're waiting, and there are no events immediately available,
3651        // suspend the fiber until that changes.
3652        match &check {
3653            WaitableCheck::Wait => {
3654                let set = params.set;
3655
3656                if (task.event.is_none()
3657                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3658                    && state.get_mut(set)?.ready.is_empty()
3659                {
3660                    if cancellable {
3661                        let old = state
3662                            .get_mut(guest_thread.thread)?
3663                            .wake_on_cancel
3664                            .replace(set);
3665                        if !old.is_none() {
3666                            bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3667                        }
3668                    }
3669
3670                    store.suspend(SuspendReason::Waiting {
3671                        set,
3672                        thread: guest_thread,
3673                        skip_may_block_check: false,
3674                    })?;
3675                }
3676            }
3677            WaitableCheck::Poll => {}
3678        }
3679
3680        log::trace!(
3681            "waitable check for {guest_thread:?}; set {:?}, part two",
3682            params.set
3683        );
3684
3685        // Deliver any pending events to the guest and return.
3686        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3687
3688        let (ordinal, handle, result) = match &check {
3689            WaitableCheck::Wait => {
3690                let (event, waitable) = match event {
3691                    Some(p) => p,
3692                    None => bail_bug!("event expected to be present"),
3693                };
3694                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3695                let (ordinal, result) = event.parts();
3696                (ordinal, handle, result)
3697            }
3698            WaitableCheck::Poll => {
3699                if let Some((event, waitable)) = event {
3700                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3701                    let (ordinal, result) = event.parts();
3702                    (ordinal, handle, result)
3703                } else {
3704                    log::trace!(
3705                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3706                        guest_thread.task,
3707                        params.set
3708                    );
3709                    let (ordinal, result) = Event::None.parts();
3710                    (ordinal, 0, result)
3711                }
3712            }
3713        };
3714        let memory = self.options_memory_mut(store, params.options);
3715        let ptr = func::validate_inbounds_dynamic(
3716            &CanonicalAbiInfo::POINTER_PAIR,
3717            memory,
3718            &ValRaw::u32(params.payload),
3719        )?;
3720        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3721        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3722        Ok(ordinal)
3723    }
3724
3725    /// Implements the `subtask.cancel` intrinsic.
3726    pub(crate) fn subtask_cancel(
3727        self,
3728        store: &mut StoreOpaque,
3729        caller_instance: RuntimeComponentInstanceIndex,
3730        async_: bool,
3731        task_id: u32,
3732    ) -> Result<u32> {
3733        if !async_ {
3734            // The caller may only sync call `subtask.cancel` from an async task
3735            // (i.e. a task created via a call to an async export).  Otherwise,
3736            // we'll trap.
3737            store.check_blocking()?;
3738        }
3739
3740        let (rep, is_host) = store
3741            .instance_state(self.runtime_instance(caller_instance))
3742            .handle_table()
3743            .subtask_rep(task_id)?;
3744        let waitable = if is_host {
3745            Waitable::Host(TableId::<HostTask>::new(rep))
3746        } else {
3747            Waitable::Guest(TableId::<GuestTask>::new(rep))
3748        };
3749        let concurrent_state = store.concurrent_state_mut();
3750
3751        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3752
3753        let needs_block;
3754        if let Waitable::Host(host_task) = waitable {
3755            let state = &mut concurrent_state.get_mut(host_task)?.state;
3756            match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3757                // If the callee is still running, signal an abort is requested.
3758                //
3759                // After cancelling this falls through to block waiting for the
3760                // host task to actually finish assuming that `async_` is false.
3761                // This blocking behavior resolves the race of `handle.abort()`
3762                // with the task actually getting cancelled or finishing.
3763                HostTaskState::CalleeRunning(handle) => {
3764                    handle.abort();
3765                    needs_block = true;
3766                }
3767
3768                // Cancellation was already requested, so fail as the task can't
3769                // be cancelled twice.
3770                HostTaskState::CalleeDone { cancelled } => {
3771                    if cancelled {
3772                        bail!(Trap::SubtaskCancelAfterTerminal);
3773                    } else {
3774                        // The callee is already done so there's no need to
3775                        // block further for an event.
3776                        needs_block = false;
3777                    }
3778                }
3779
3780                // These states should not be possible for a subtask that's
3781                // visible from the guest, so trap here.
3782                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3783                    bail_bug!("invalid states for host callee")
3784                }
3785            }
3786        } else {
3787            let guest_task = TableId::<GuestTask>::new(rep);
3788            let task = concurrent_state.get_mut(guest_task)?;
3789            if !task.already_lowered_parameters() {
3790                store.cancel_guest_subtask_without_lowered_parameters(
3791                    self.runtime_instance(caller_instance),
3792                    guest_task,
3793                )?;
3794                return Ok(Status::StartCancelled as u32);
3795            } else if !task.returned_or_cancelled() {
3796                // Started, but not yet returned or cancelled; send the
3797                // `CANCELLED` event
3798                task.cancel_sent = true;
3799                // Note that this might overwrite an event that was set earlier
3800                // (e.g. `Event::None` if the task is yielding, or
3801                // `Event::Cancelled` if it was already cancelled), but that's
3802                // okay -- this should supersede the previous state.
3803                task.event = Some(Event::Cancelled);
3804                let runtime_instance = task.instance.index;
3805                for thread in task.threads.clone() {
3806                    let thread = QualifiedThreadId {
3807                        task: guest_task,
3808                        thread,
3809                    };
3810                    let thread_mut = concurrent_state.get_mut(thread.thread)?;
3811                    if let Some(set) = thread_mut.wake_on_cancel.take() {
3812                        // The thread is in a cancellable wait, so wake it up:
3813                        let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3814                            Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3815                            Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3816                                runtime_instance,
3817                                GuestCall {
3818                                    thread,
3819                                    kind: GuestCallKind::DeliverEvent {
3820                                        instance,
3821                                        set: None,
3822                                    },
3823                                },
3824                            ),
3825                            None => bail_bug!("thread not present in wake_on_cancel set"),
3826                        };
3827                        concurrent_state.push_high_priority(item);
3828
3829                        let caller = concurrent_state.current_guest_thread()?;
3830                        store.suspend(SuspendReason::Yielding {
3831                            thread: caller,
3832                            cancellable: false,
3833                            // `subtask.cancel` is not allowed to be called in a
3834                            // sync context, so we cannot skip the may-block check.
3835                            skip_may_block_check: false,
3836                        })?;
3837                        break;
3838                    } else if let GuestThreadState::Ready {
3839                        cancellable: true, ..
3840                    } = &thread_mut.state
3841                    {
3842                        // The thread is in a cancellable yield, so yield back
3843                        // to it.
3844                        let caller = concurrent_state.current_guest_thread()?;
3845                        concurrent_state.promote_thread_work_item(thread);
3846                        store.suspend(SuspendReason::Yielding {
3847                            thread: caller,
3848                            cancellable: false,
3849                            skip_may_block_check: false,
3850                        })?;
3851                        break;
3852                    }
3853                }
3854
3855                // Guest tasks need to block if they have not yet returned or
3856                // cancelled, even as a result of the event delivery above.
3857                needs_block = !store
3858                    .concurrent_state_mut()
3859                    .get_mut(guest_task)?
3860                    .returned_or_cancelled()
3861            } else {
3862                needs_block = false;
3863            }
3864        };
3865
3866        // If we need to block waiting on the terminal status of this subtask
3867        // then return immediately in `async` mode, or otherwise wait for the
3868        // event to get signaled through the store.
3869        if needs_block {
3870            if async_ {
3871                return Ok(BLOCKED);
3872            }
3873
3874            // Wait for this waitable to get signaled with its terminal status
3875            // from the completion callback enqueued by `first_poll`. Once
3876            // that's done fall through to the sahred
3877            store.wait_for_event(waitable)?;
3878
3879            // .. fall through to determine what event's in store for us.
3880        }
3881
3882        let event = waitable.take_event(store.concurrent_state_mut())?;
3883        if let Some(Event::Subtask {
3884            status: status @ (Status::Returned | Status::ReturnCancelled),
3885        }) = event
3886        {
3887            Ok(status as u32)
3888        } else {
3889            bail!(Trap::SubtaskCancelAfterTerminal);
3890        }
3891    }
3892}
3893
3894/// Trait representing component model ABI async intrinsics and fused adapter
3895/// helper functions.
3896///
3897/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3898/// which must be valid for at least the duration of the call (and possibly for
3899/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3900/// pointers used for async calls).
3901pub trait VMComponentAsyncStore {
3902    /// A helper function for fused adapter modules involving calls where the
3903    /// one of the caller or callee is async.
3904    ///
3905    /// This helper is not used when the caller and callee both use the sync
3906    /// ABI, only when at least one is async is this used.
3907    unsafe fn prepare_call(
3908        &mut self,
3909        instance: Instance,
3910        memory: *mut VMMemoryDefinition,
3911        start: NonNull<VMFuncRef>,
3912        return_: NonNull<VMFuncRef>,
3913        caller_instance: RuntimeComponentInstanceIndex,
3914        callee_instance: RuntimeComponentInstanceIndex,
3915        task_return_type: TypeTupleIndex,
3916        callee_async: bool,
3917        string_encoding: StringEncoding,
3918        result_count: u32,
3919        storage: *mut ValRaw,
3920        storage_len: usize,
3921    ) -> Result<()>;
3922
3923    /// A helper function for fused adapter modules involving calls where the
3924    /// caller is sync-lowered but the callee is async-lifted.
3925    unsafe fn sync_start(
3926        &mut self,
3927        instance: Instance,
3928        callback: *mut VMFuncRef,
3929        callee: NonNull<VMFuncRef>,
3930        param_count: u32,
3931        storage: *mut MaybeUninit<ValRaw>,
3932        storage_len: usize,
3933    ) -> Result<()>;
3934
3935    /// A helper function for fused adapter modules involving calls where the
3936    /// caller is async-lowered.
3937    unsafe fn async_start(
3938        &mut self,
3939        instance: Instance,
3940        callback: *mut VMFuncRef,
3941        post_return: *mut VMFuncRef,
3942        callee: NonNull<VMFuncRef>,
3943        param_count: u32,
3944        result_count: u32,
3945        flags: u32,
3946    ) -> Result<u32>;
3947
3948    /// The `future.write` intrinsic.
3949    fn future_write(
3950        &mut self,
3951        instance: Instance,
3952        caller: RuntimeComponentInstanceIndex,
3953        ty: TypeFutureTableIndex,
3954        options: OptionsIndex,
3955        future: u32,
3956        address: u32,
3957    ) -> Result<u32>;
3958
3959    /// The `future.read` intrinsic.
3960    fn future_read(
3961        &mut self,
3962        instance: Instance,
3963        caller: RuntimeComponentInstanceIndex,
3964        ty: TypeFutureTableIndex,
3965        options: OptionsIndex,
3966        future: u32,
3967        address: u32,
3968    ) -> Result<u32>;
3969
3970    /// The `future.drop-writable` intrinsic.
3971    fn future_drop_writable(
3972        &mut self,
3973        instance: Instance,
3974        ty: TypeFutureTableIndex,
3975        writer: u32,
3976    ) -> Result<()>;
3977
3978    /// The `stream.write` intrinsic.
3979    fn stream_write(
3980        &mut self,
3981        instance: Instance,
3982        caller: RuntimeComponentInstanceIndex,
3983        ty: TypeStreamTableIndex,
3984        options: OptionsIndex,
3985        stream: u32,
3986        address: u32,
3987        count: u32,
3988    ) -> Result<u32>;
3989
3990    /// The `stream.read` intrinsic.
3991    fn stream_read(
3992        &mut self,
3993        instance: Instance,
3994        caller: RuntimeComponentInstanceIndex,
3995        ty: TypeStreamTableIndex,
3996        options: OptionsIndex,
3997        stream: u32,
3998        address: u32,
3999        count: u32,
4000    ) -> Result<u32>;
4001
4002    /// The "fast-path" implementation of the `stream.write` intrinsic for
4003    /// "flat" (i.e. memcpy-able) payloads.
4004    fn flat_stream_write(
4005        &mut self,
4006        instance: Instance,
4007        caller: RuntimeComponentInstanceIndex,
4008        ty: TypeStreamTableIndex,
4009        options: OptionsIndex,
4010        payload_size: u32,
4011        payload_align: u32,
4012        stream: u32,
4013        address: u32,
4014        count: u32,
4015    ) -> Result<u32>;
4016
4017    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
4018    /// (i.e. memcpy-able) payloads.
4019    fn flat_stream_read(
4020        &mut self,
4021        instance: Instance,
4022        caller: RuntimeComponentInstanceIndex,
4023        ty: TypeStreamTableIndex,
4024        options: OptionsIndex,
4025        payload_size: u32,
4026        payload_align: u32,
4027        stream: u32,
4028        address: u32,
4029        count: u32,
4030    ) -> Result<u32>;
4031
4032    /// The `stream.drop-writable` intrinsic.
4033    fn stream_drop_writable(
4034        &mut self,
4035        instance: Instance,
4036        ty: TypeStreamTableIndex,
4037        writer: u32,
4038    ) -> Result<()>;
4039
4040    /// The `error-context.debug-message` intrinsic.
4041    fn error_context_debug_message(
4042        &mut self,
4043        instance: Instance,
4044        ty: TypeComponentLocalErrorContextTableIndex,
4045        options: OptionsIndex,
4046        err_ctx_handle: u32,
4047        debug_msg_address: u32,
4048    ) -> Result<()>;
4049
4050    /// The `thread.new-indirect` intrinsic
4051    fn thread_new_indirect(
4052        &mut self,
4053        instance: Instance,
4054        caller: RuntimeComponentInstanceIndex,
4055        func_ty_idx: TypeFuncIndex,
4056        start_func_table_idx: RuntimeTableIndex,
4057        start_func_idx: u32,
4058        context: i32,
4059    ) -> Result<u32>;
4060}
4061
4062/// SAFETY: See trait docs.
4063impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4064    unsafe fn prepare_call(
4065        &mut self,
4066        instance: Instance,
4067        memory: *mut VMMemoryDefinition,
4068        start: NonNull<VMFuncRef>,
4069        return_: NonNull<VMFuncRef>,
4070        caller_instance: RuntimeComponentInstanceIndex,
4071        callee_instance: RuntimeComponentInstanceIndex,
4072        task_return_type: TypeTupleIndex,
4073        callee_async: bool,
4074        string_encoding: StringEncoding,
4075        result_count_or_max_if_async: u32,
4076        storage: *mut ValRaw,
4077        storage_len: usize,
4078    ) -> Result<()> {
4079        // SAFETY: The `wasmtime_cranelift`-generated code that calls
4080        // this method will have ensured that `storage` is a valid
4081        // pointer containing at least `storage_len` items.
4082        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
4083
4084        unsafe {
4085            instance.prepare_call(
4086                StoreContextMut(self),
4087                start,
4088                return_,
4089                caller_instance,
4090                callee_instance,
4091                task_return_type,
4092                callee_async,
4093                memory,
4094                string_encoding,
4095                match result_count_or_max_if_async {
4096                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4097                        params,
4098                        has_result: false,
4099                    },
4100                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4101                        params,
4102                        has_result: true,
4103                    },
4104                    result_count => CallerInfo::Sync {
4105                        params,
4106                        result_count,
4107                    },
4108                },
4109            )
4110        }
4111    }
4112
4113    unsafe fn sync_start(
4114        &mut self,
4115        instance: Instance,
4116        callback: *mut VMFuncRef,
4117        callee: NonNull<VMFuncRef>,
4118        param_count: u32,
4119        storage: *mut MaybeUninit<ValRaw>,
4120        storage_len: usize,
4121    ) -> Result<()> {
4122        unsafe {
4123            instance
4124                .start_call(
4125                    StoreContextMut(self),
4126                    callback,
4127                    ptr::null_mut(),
4128                    callee,
4129                    param_count,
4130                    1,
4131                    START_FLAG_ASYNC_CALLEE,
4132                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4133                    // this method will have ensured that `storage` is a valid
4134                    // pointer containing at least `storage_len` items.
4135                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4136                )
4137                .map(drop)
4138        }
4139    }
4140
4141    unsafe fn async_start(
4142        &mut self,
4143        instance: Instance,
4144        callback: *mut VMFuncRef,
4145        post_return: *mut VMFuncRef,
4146        callee: NonNull<VMFuncRef>,
4147        param_count: u32,
4148        result_count: u32,
4149        flags: u32,
4150    ) -> Result<u32> {
4151        unsafe {
4152            instance.start_call(
4153                StoreContextMut(self),
4154                callback,
4155                post_return,
4156                callee,
4157                param_count,
4158                result_count,
4159                flags,
4160                None,
4161            )
4162        }
4163    }
4164
4165    fn future_write(
4166        &mut self,
4167        instance: Instance,
4168        caller: RuntimeComponentInstanceIndex,
4169        ty: TypeFutureTableIndex,
4170        options: OptionsIndex,
4171        future: u32,
4172        address: u32,
4173    ) -> Result<u32> {
4174        instance
4175            .guest_write(
4176                StoreContextMut(self),
4177                caller,
4178                TransmitIndex::Future(ty),
4179                options,
4180                None,
4181                future,
4182                address,
4183                1,
4184            )
4185            .map(|result| result.encode())
4186    }
4187
4188    fn future_read(
4189        &mut self,
4190        instance: Instance,
4191        caller: RuntimeComponentInstanceIndex,
4192        ty: TypeFutureTableIndex,
4193        options: OptionsIndex,
4194        future: u32,
4195        address: u32,
4196    ) -> Result<u32> {
4197        instance
4198            .guest_read(
4199                StoreContextMut(self),
4200                caller,
4201                TransmitIndex::Future(ty),
4202                options,
4203                None,
4204                future,
4205                address,
4206                1,
4207            )
4208            .map(|result| result.encode())
4209    }
4210
4211    fn stream_write(
4212        &mut self,
4213        instance: Instance,
4214        caller: RuntimeComponentInstanceIndex,
4215        ty: TypeStreamTableIndex,
4216        options: OptionsIndex,
4217        stream: u32,
4218        address: u32,
4219        count: u32,
4220    ) -> Result<u32> {
4221        instance
4222            .guest_write(
4223                StoreContextMut(self),
4224                caller,
4225                TransmitIndex::Stream(ty),
4226                options,
4227                None,
4228                stream,
4229                address,
4230                count,
4231            )
4232            .map(|result| result.encode())
4233    }
4234
4235    fn stream_read(
4236        &mut self,
4237        instance: Instance,
4238        caller: RuntimeComponentInstanceIndex,
4239        ty: TypeStreamTableIndex,
4240        options: OptionsIndex,
4241        stream: u32,
4242        address: u32,
4243        count: u32,
4244    ) -> Result<u32> {
4245        instance
4246            .guest_read(
4247                StoreContextMut(self),
4248                caller,
4249                TransmitIndex::Stream(ty),
4250                options,
4251                None,
4252                stream,
4253                address,
4254                count,
4255            )
4256            .map(|result| result.encode())
4257    }
4258
4259    fn future_drop_writable(
4260        &mut self,
4261        instance: Instance,
4262        ty: TypeFutureTableIndex,
4263        writer: u32,
4264    ) -> Result<()> {
4265        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4266    }
4267
4268    fn flat_stream_write(
4269        &mut self,
4270        instance: Instance,
4271        caller: RuntimeComponentInstanceIndex,
4272        ty: TypeStreamTableIndex,
4273        options: OptionsIndex,
4274        payload_size: u32,
4275        payload_align: u32,
4276        stream: u32,
4277        address: u32,
4278        count: u32,
4279    ) -> Result<u32> {
4280        instance
4281            .guest_write(
4282                StoreContextMut(self),
4283                caller,
4284                TransmitIndex::Stream(ty),
4285                options,
4286                Some(FlatAbi {
4287                    size: payload_size,
4288                    align: payload_align,
4289                }),
4290                stream,
4291                address,
4292                count,
4293            )
4294            .map(|result| result.encode())
4295    }
4296
4297    fn flat_stream_read(
4298        &mut self,
4299        instance: Instance,
4300        caller: RuntimeComponentInstanceIndex,
4301        ty: TypeStreamTableIndex,
4302        options: OptionsIndex,
4303        payload_size: u32,
4304        payload_align: u32,
4305        stream: u32,
4306        address: u32,
4307        count: u32,
4308    ) -> Result<u32> {
4309        instance
4310            .guest_read(
4311                StoreContextMut(self),
4312                caller,
4313                TransmitIndex::Stream(ty),
4314                options,
4315                Some(FlatAbi {
4316                    size: payload_size,
4317                    align: payload_align,
4318                }),
4319                stream,
4320                address,
4321                count,
4322            )
4323            .map(|result| result.encode())
4324    }
4325
4326    fn stream_drop_writable(
4327        &mut self,
4328        instance: Instance,
4329        ty: TypeStreamTableIndex,
4330        writer: u32,
4331    ) -> Result<()> {
4332        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4333    }
4334
4335    fn error_context_debug_message(
4336        &mut self,
4337        instance: Instance,
4338        ty: TypeComponentLocalErrorContextTableIndex,
4339        options: OptionsIndex,
4340        err_ctx_handle: u32,
4341        debug_msg_address: u32,
4342    ) -> Result<()> {
4343        instance.error_context_debug_message(
4344            StoreContextMut(self),
4345            ty,
4346            options,
4347            err_ctx_handle,
4348            debug_msg_address,
4349        )
4350    }
4351
4352    fn thread_new_indirect(
4353        &mut self,
4354        instance: Instance,
4355        caller: RuntimeComponentInstanceIndex,
4356        func_ty_idx: TypeFuncIndex,
4357        start_func_table_idx: RuntimeTableIndex,
4358        start_func_idx: u32,
4359        context: i32,
4360    ) -> Result<u32> {
4361        instance.thread_new_indirect(
4362            StoreContextMut(self),
4363            caller,
4364            func_ty_idx,
4365            start_func_table_idx,
4366            start_func_idx,
4367            context,
4368        )
4369    }
4370}
4371
4372type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4373
4374/// Represents the state of a pending host task.
4375///
4376/// This is used to represent tasks when the guest calls into the host.
4377pub(crate) struct HostTask {
4378    common: WaitableCommon,
4379
4380    /// Guest thread which called the host.
4381    caller: QualifiedThreadId,
4382
4383    /// State of borrows/etc the host needs to track. Used when the guest passes
4384    /// borrows to the host, for example.
4385    call_context: CallContext,
4386
4387    state: HostTaskState,
4388}
4389
4390enum HostTaskState {
4391    /// A host task has been created and it's considered "started".
4392    ///
4393    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4394    /// is where this will get updated further.
4395    CalleeStarted,
4396
4397    /// State used for tasks in `first_poll` meaning that the guest did an async
4398    /// lower of a host async function which is blocked. The specified handle is
4399    /// linked to the future in the main `FuturesUnordered` of a store which is
4400    /// used to cancel it if the guest requests cancellation.
4401    CalleeRunning(JoinHandle),
4402
4403    /// Terminal state used for tasks in `poll_and_block` to store the result of
4404    /// their computation. Note that this state is not used for tasks in
4405    /// `first_poll`.
4406    CalleeFinished(LiftedResult),
4407
4408    /// Terminal state for host tasks meaning that the task was cancelled or the
4409    /// result was taken.
4410    CalleeDone { cancelled: bool },
4411}
4412
4413impl HostTask {
4414    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4415        Self {
4416            common: WaitableCommon::default(),
4417            call_context: CallContext::default(),
4418            caller,
4419            state,
4420        }
4421    }
4422}
4423
4424impl TableDebug for HostTask {
4425    fn type_name() -> &'static str {
4426        "HostTask"
4427    }
4428}
4429
4430type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4431
4432/// Represents the caller of a given guest task.
4433enum Caller {
4434    /// The host called the guest task.
4435    Host {
4436        /// If present, may be used to deliver the result.
4437        tx: Option<oneshot::Sender<LiftedResult>>,
4438        /// If true, there's a host future that must be dropped before the task
4439        /// can be deleted.
4440        host_future_present: bool,
4441        /// Represents the caller of the host function which called back into a
4442        /// guest. Note that this thread could belong to an entirely unrelated
4443        /// top-level component instance than the one the host called into.
4444        caller: CurrentThread,
4445    },
4446    /// Another guest thread called the guest task
4447    Guest {
4448        /// The id of the caller
4449        thread: QualifiedThreadId,
4450    },
4451}
4452
4453/// Represents a closure and related canonical ABI parameters required to
4454/// validate a `task.return` call at runtime and lift the result.
4455struct LiftResult {
4456    lift: RawLift,
4457    ty: TypeTupleIndex,
4458    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4459    string_encoding: StringEncoding,
4460}
4461
4462/// The table ID for a guest thread, qualified by the task to which it belongs.
4463///
4464/// This exists to minimize table lookups and the necessity to pass stores around mutably
4465/// for the common case of identifying the task to which a thread belongs.
4466#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4467pub(crate) struct QualifiedThreadId {
4468    task: TableId<GuestTask>,
4469    thread: TableId<GuestThread>,
4470}
4471
4472impl QualifiedThreadId {
4473    fn qualify(
4474        state: &mut ConcurrentState,
4475        thread: TableId<GuestThread>,
4476    ) -> Result<QualifiedThreadId> {
4477        Ok(QualifiedThreadId {
4478            task: state.get_mut(thread)?.parent_task,
4479            thread,
4480        })
4481    }
4482}
4483
4484impl fmt::Debug for QualifiedThreadId {
4485    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4486        f.debug_tuple("QualifiedThreadId")
4487            .field(&self.task.rep())
4488            .field(&self.thread.rep())
4489            .finish()
4490    }
4491}
4492
4493enum GuestThreadState {
4494    NotStartedImplicit,
4495    NotStartedExplicit(
4496        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4497    ),
4498    Running,
4499    Suspended(StoreFiber<'static>),
4500    Ready {
4501        fiber: StoreFiber<'static>,
4502        cancellable: bool,
4503    },
4504    Completed,
4505}
4506pub struct GuestThread {
4507    /// Context-local state used to implement the `context.{get,set}`
4508    /// intrinsics.
4509    context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4510    /// The owning guest task.
4511    parent_task: TableId<GuestTask>,
4512    /// If present, indicates that the thread is currently waiting on the
4513    /// specified set but may be cancelled and woken immediately.
4514    wake_on_cancel: Option<TableId<WaitableSet>>,
4515    /// The execution state of this guest thread
4516    state: GuestThreadState,
4517    /// The index of this thread in the component instance's handle table.
4518    /// This must always be `Some` after initialization.
4519    instance_rep: Option<u32>,
4520    /// Scratch waitable set used to watch subtasks during synchronous calls.
4521    sync_call_set: TableId<WaitableSet>,
4522}
4523
4524impl GuestThread {
4525    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4526    /// handle.
4527    fn from_instance(
4528        state: Pin<&mut ComponentInstance>,
4529        caller_instance: RuntimeComponentInstanceIndex,
4530        guest_thread: u32,
4531    ) -> Result<TableId<Self>> {
4532        let rep = state.instance_states().0[caller_instance]
4533            .thread_handle_table()
4534            .guest_thread_rep(guest_thread)?;
4535        Ok(TableId::new(rep))
4536    }
4537
4538    fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4539        let sync_call_set = state.push(WaitableSet {
4540            is_sync_call_set: true,
4541            ..WaitableSet::default()
4542        })?;
4543        Ok(Self {
4544            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4545            parent_task,
4546            wake_on_cancel: None,
4547            state: GuestThreadState::NotStartedImplicit,
4548            instance_rep: None,
4549            sync_call_set,
4550        })
4551    }
4552
4553    fn new_explicit(
4554        state: &mut ConcurrentState,
4555        parent_task: TableId<GuestTask>,
4556        start_func: Box<
4557            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4558        >,
4559    ) -> Result<Self> {
4560        let sync_call_set = state.push(WaitableSet {
4561            is_sync_call_set: true,
4562            ..WaitableSet::default()
4563        })?;
4564        Ok(Self {
4565            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4566            parent_task,
4567            wake_on_cancel: None,
4568            state: GuestThreadState::NotStartedExplicit(start_func),
4569            instance_rep: None,
4570            sync_call_set,
4571        })
4572    }
4573}
4574
4575impl TableDebug for GuestThread {
4576    fn type_name() -> &'static str {
4577        "GuestThread"
4578    }
4579}
4580
4581enum SyncResult {
4582    NotProduced,
4583    Produced(Option<ValRaw>),
4584    Taken,
4585}
4586
4587impl SyncResult {
4588    fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4589        Ok(match mem::replace(self, SyncResult::Taken) {
4590            SyncResult::NotProduced => None,
4591            SyncResult::Produced(val) => Some(val),
4592            SyncResult::Taken => {
4593                bail_bug!("attempted to take a synchronous result that was already taken")
4594            }
4595        })
4596    }
4597}
4598
4599#[derive(Debug)]
4600enum HostFutureState {
4601    NotApplicable,
4602    Live,
4603    Dropped,
4604}
4605
4606/// Represents a pending guest task.
4607pub(crate) struct GuestTask {
4608    /// See `WaitableCommon`
4609    common: WaitableCommon,
4610    /// Closure to lower the parameters passed to this task.
4611    lower_params: Option<RawLower>,
4612    /// See `LiftResult`
4613    lift_result: Option<LiftResult>,
4614    /// A place to stash the type-erased lifted result if it can't be delivered
4615    /// immediately.
4616    result: Option<LiftedResult>,
4617    /// Closure to call the callback function for an async-lifted export, if
4618    /// provided.
4619    callback: Option<CallbackFn>,
4620    /// See `Caller`
4621    caller: Caller,
4622    /// Borrow state for this task.
4623    ///
4624    /// Keeps track of `borrow<T>` received to this task to ensure that
4625    /// everything is dropped by the time it exits.
4626    call_context: CallContext,
4627    /// A place to stash the lowered result for a sync-to-async call until it
4628    /// can be returned to the caller.
4629    sync_result: SyncResult,
4630    /// Whether or not the task has been cancelled (i.e. whether the task is
4631    /// permitted to call `task.cancel`).
4632    cancel_sent: bool,
4633    /// Whether or not we've sent a `Status::Starting` event to any current or
4634    /// future waiters for this waitable.
4635    starting_sent: bool,
4636    /// The runtime instance to which the exported function for this guest task
4637    /// belongs.
4638    ///
4639    /// Note that the task may do a sync->sync call via a fused adapter which
4640    /// results in that task executing code in a different instance, and it may
4641    /// call host functions and intrinsics from that other instance.
4642    instance: RuntimeInstance,
4643    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4644    /// delivered to this task.
4645    event: Option<Event>,
4646    /// Whether or not the task has exited.
4647    exited: bool,
4648    /// Threads belonging to this task
4649    threads: HashSet<TableId<GuestThread>>,
4650    /// The state of the host future that represents an async task, which must
4651    /// be dropped before we can delete the task.
4652    host_future_state: HostFutureState,
4653    /// Indicates whether this task was created for a call to an async-lifted
4654    /// export.
4655    async_function: bool,
4656
4657    decremented_interesting_task_count: bool,
4658}
4659
4660impl GuestTask {
4661    fn already_lowered_parameters(&self) -> bool {
4662        // We reset `lower_params` after we lower the parameters
4663        self.lower_params.is_none()
4664    }
4665
4666    fn returned_or_cancelled(&self) -> bool {
4667        // We reset `lift_result` after we return or exit
4668        self.lift_result.is_none()
4669    }
4670
4671    fn ready_to_delete(&self) -> bool {
4672        let threads_completed = self.threads.is_empty();
4673        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4674        let pending_completion_event = matches!(
4675            self.common.event,
4676            Some(Event::Subtask {
4677                status: Status::Returned | Status::ReturnCancelled
4678            })
4679        );
4680        let ready = threads_completed
4681            && !has_sync_result
4682            && !pending_completion_event
4683            && !matches!(self.host_future_state, HostFutureState::Live);
4684        log::trace!(
4685            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4686            threads_completed,
4687            has_sync_result,
4688            pending_completion_event,
4689            self.host_future_state
4690        );
4691        ready
4692    }
4693
4694    fn new(
4695        state: &mut ConcurrentState,
4696        lower_params: RawLower,
4697        lift_result: LiftResult,
4698        caller: Caller,
4699        callback: Option<CallbackFn>,
4700        instance: RuntimeInstance,
4701        async_function: bool,
4702    ) -> Result<QualifiedThreadId> {
4703        let host_future_state = match &caller {
4704            Caller::Guest { .. } => HostFutureState::NotApplicable,
4705            Caller::Host {
4706                host_future_present,
4707                ..
4708            } => {
4709                if *host_future_present {
4710                    HostFutureState::Live
4711                } else {
4712                    HostFutureState::NotApplicable
4713                }
4714            }
4715        };
4716        let task = state.push(Self {
4717            common: WaitableCommon::default(),
4718            lower_params: Some(lower_params),
4719            lift_result: Some(lift_result),
4720            result: None,
4721            callback,
4722            caller,
4723            call_context: CallContext::default(),
4724            sync_result: SyncResult::NotProduced,
4725            cancel_sent: false,
4726            starting_sent: false,
4727            instance,
4728            event: None,
4729            exited: false,
4730            threads: HashSet::new(),
4731            host_future_state,
4732            async_function,
4733            decremented_interesting_task_count: false,
4734        })?;
4735        let new_thread = GuestThread::new_implicit(state, task)?;
4736        let thread = state.push(new_thread)?;
4737        state.get_mut(task)?.threads.insert(thread);
4738        state.interesting_tasks += 1;
4739        Ok(QualifiedThreadId { task, thread })
4740    }
4741}
4742
4743impl TableDebug for GuestTask {
4744    fn type_name() -> &'static str {
4745        "GuestTask"
4746    }
4747}
4748
4749/// Represents state common to all kinds of waitables.
4750#[derive(Default)]
4751struct WaitableCommon {
4752    /// The currently pending event for this waitable, if any.
4753    event: Option<Event>,
4754    /// The set to which this waitable belongs, if any.
4755    set: Option<TableId<WaitableSet>>,
4756    /// The handle with which the guest refers to this waitable, if any.
4757    handle: Option<u32>,
4758}
4759
4760/// Represents a Component Model Async `waitable`.
4761#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4762enum Waitable {
4763    /// A host task
4764    Host(TableId<HostTask>),
4765    /// A guest task
4766    Guest(TableId<GuestTask>),
4767    /// The read or write end of a stream or future
4768    Transmit(TableId<TransmitHandle>),
4769}
4770
4771impl Waitable {
4772    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4773    /// handle.
4774    fn from_instance(
4775        state: Pin<&mut ComponentInstance>,
4776        caller_instance: RuntimeComponentInstanceIndex,
4777        waitable: u32,
4778    ) -> Result<Self> {
4779        use crate::runtime::vm::component::Waitable;
4780
4781        let (waitable, kind) = state.instance_states().0[caller_instance]
4782            .handle_table()
4783            .waitable_rep(waitable)?;
4784
4785        Ok(match kind {
4786            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4787            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4788            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4789        })
4790    }
4791
4792    /// Retrieve the host-visible identifier for this `Waitable`.
4793    fn rep(&self) -> u32 {
4794        match self {
4795            Self::Host(id) => id.rep(),
4796            Self::Guest(id) => id.rep(),
4797            Self::Transmit(id) => id.rep(),
4798        }
4799    }
4800
4801    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4802    /// remove it from any set it may currently belong to (when `set` is
4803    /// `None`).
4804    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4805        log::trace!("waitable {self:?} join set {set:?}");
4806
4807        let old = mem::replace(&mut self.common(state)?.set, set);
4808
4809        if let Some(old) = old {
4810            match *self {
4811                Waitable::Host(id) => state.remove_child(id, old),
4812                Waitable::Guest(id) => state.remove_child(id, old),
4813                Waitable::Transmit(id) => state.remove_child(id, old),
4814            }?;
4815
4816            state.get_mut(old)?.ready.remove(self);
4817        }
4818
4819        if let Some(set) = set {
4820            match *self {
4821                Waitable::Host(id) => state.add_child(id, set),
4822                Waitable::Guest(id) => state.add_child(id, set),
4823                Waitable::Transmit(id) => state.add_child(id, set),
4824            }?;
4825
4826            if self.common(state)?.event.is_some() {
4827                self.mark_ready(state)?;
4828            }
4829        }
4830
4831        Ok(())
4832    }
4833
4834    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4835    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4836        Ok(match self {
4837            Self::Host(id) => &mut state.get_mut(*id)?.common,
4838            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4839            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4840        })
4841    }
4842
4843    /// Set or clear the pending event for this waitable and either deliver it
4844    /// to the first waiter, if any, or mark it as ready to be delivered to the
4845    /// next waiter that arrives.
4846    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4847        log::trace!("set event for {self:?}: {event:?}");
4848        self.common(state)?.event = event;
4849        self.mark_ready(state)
4850    }
4851
4852    /// Take the pending event from this waitable, leaving `None` in its place.
4853    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4854        let common = self.common(state)?;
4855        let event = common.event.take();
4856        if let Some(set) = self.common(state)?.set {
4857            state.get_mut(set)?.ready.remove(self);
4858        }
4859
4860        Ok(event)
4861    }
4862
4863    /// Deliver the current event for this waitable to the first waiter, if any,
4864    /// or else mark it as ready to be delivered to the next waiter that
4865    /// arrives.
4866    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4867        if let Some(set) = self.common(state)?.set {
4868            state.get_mut(set)?.ready.insert(*self);
4869            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4870                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4871                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4872
4873                let item = match mode {
4874                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4875                    WaitMode::Callback(instance) => WorkItem::GuestCall(
4876                        state.get_mut(thread.task)?.instance.index,
4877                        GuestCall {
4878                            thread,
4879                            kind: GuestCallKind::DeliverEvent {
4880                                instance,
4881                                set: Some(set),
4882                            },
4883                        },
4884                    ),
4885                };
4886                state.push_high_priority(item);
4887            }
4888        }
4889        Ok(())
4890    }
4891
4892    /// Remove this waitable from the instance's rep table.
4893    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4894        match self {
4895            Self::Host(task) => {
4896                log::trace!("delete host task {task:?}");
4897                state.delete(*task)?;
4898            }
4899            Self::Guest(task) => {
4900                log::trace!("delete guest task {task:?}");
4901                let task = state.delete(*task)?;
4902
4903                // When a guest task is created it increments the
4904                // `ConcurrentState::interesting_tasks` counter, and that needs
4905                // to be paired with a decrement. There are a few situations in
4906                // which the decrement needs to happen which don't all funnel
4907                // through here, so in lieu of that at least try to catch issues
4908                // where we forgot to do a decrement.
4909                debug_assert!(task.decremented_interesting_task_count);
4910            }
4911            Self::Transmit(task) => {
4912                state.delete(*task)?;
4913            }
4914        }
4915
4916        Ok(())
4917    }
4918}
4919
4920impl fmt::Debug for Waitable {
4921    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4922        match self {
4923            Self::Host(id) => write!(f, "{id:?}"),
4924            Self::Guest(id) => write!(f, "{id:?}"),
4925            Self::Transmit(id) => write!(f, "{id:?}"),
4926        }
4927    }
4928}
4929
4930/// Represents a Component Model Async `waitable-set`.
4931#[derive(Default)]
4932struct WaitableSet {
4933    /// Which waitables in this set have pending events, if any.
4934    ready: BTreeSet<Waitable>,
4935    /// Which guest threads are currently waiting on this set, if any.
4936    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4937    /// Whether this set is a synthetic, internal one meant for handling
4938    /// synchronous calls.
4939    is_sync_call_set: bool,
4940}
4941
4942impl TableDebug for WaitableSet {
4943    fn type_name() -> &'static str {
4944        "WaitableSet"
4945    }
4946}
4947
4948/// Type-erased closure to lower the parameters for a guest task.
4949type RawLower =
4950    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4951
4952/// Type-erased closure to lift the result for a guest task.
4953type RawLift = Box<
4954    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4955>;
4956
4957/// Type erased result of a guest task which may be downcast to the expected
4958/// type by a host caller (or simply ignored in the case of a guest caller; see
4959/// `DummyResult`).
4960type LiftedResult = Box<dyn Any + Send + Sync>;
4961
4962/// Used to return a result from a `LiftFn` when the actual result has already
4963/// been lowered to a guest task's stack and linear memory.
4964struct DummyResult;
4965
4966/// Represents the Component Model Async state of a (sub-)component instance.
4967#[derive(Default)]
4968pub struct ConcurrentInstanceState {
4969    /// Whether backpressure is set for this instance (enabled if >0)
4970    backpressure: u16,
4971    /// Whether this instance can be entered
4972    do_not_enter: bool,
4973    /// Pending calls for this instance which require `Self::backpressure` to be
4974    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4975    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4976}
4977
4978impl ConcurrentInstanceState {
4979    pub fn pending_is_empty(&self) -> bool {
4980        self.pending.is_empty()
4981    }
4982}
4983
4984#[derive(Debug, Copy, Clone)]
4985pub(crate) enum CurrentThread {
4986    Guest(QualifiedThreadId),
4987    Host(TableId<HostTask>),
4988    None,
4989}
4990
4991impl CurrentThread {
4992    fn guest(&self) -> Option<&QualifiedThreadId> {
4993        match self {
4994            Self::Guest(id) => Some(id),
4995            _ => None,
4996        }
4997    }
4998
4999    fn host(&self) -> Option<TableId<HostTask>> {
5000        match self {
5001            Self::Host(id) => Some(*id),
5002            _ => None,
5003        }
5004    }
5005
5006    fn is_none(&self) -> bool {
5007        matches!(self, Self::None)
5008    }
5009}
5010
5011impl From<QualifiedThreadId> for CurrentThread {
5012    fn from(id: QualifiedThreadId) -> Self {
5013        Self::Guest(id)
5014    }
5015}
5016
5017impl From<TableId<HostTask>> for CurrentThread {
5018    fn from(id: TableId<HostTask>) -> Self {
5019        Self::Host(id)
5020    }
5021}
5022
5023/// Represents the Component Model Async state of a store.
5024pub struct ConcurrentState {
5025    /// The currently running thread, if any.
5026    current_thread: CurrentThread,
5027
5028    /// The set of pending host and background tasks, if any.
5029    ///
5030    /// See `ComponentInstance::poll_until` for where we temporarily take this
5031    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
5032    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5033    /// The table of waitables, waitable sets, etc.
5034    table: AlwaysMut<ResourceTable>,
5035    /// The "high priority" work queue for this store's event loop.
5036    high_priority: Vec<WorkItem>,
5037    /// The "low priority" work queue for this store's event loop.
5038    low_priority: VecDeque<WorkItem>,
5039    /// A place to stash the reason a fiber is suspending so that the code which
5040    /// resumed it will know under what conditions the fiber should be resumed
5041    /// again.
5042    suspend_reason: Option<SuspendReason>,
5043    /// A cached fiber which is waiting for work to do.
5044    ///
5045    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
5046    worker: Option<StoreFiber<'static>>,
5047    /// A place to stash the work item for which we're resuming a worker fiber.
5048    worker_item: Option<WorkerItem>,
5049
5050    /// Reference counts for all component error contexts
5051    ///
5052    /// NOTE: it is possible the global ref count to be *greater* than the sum of
5053    /// (sub)component ref counts as tracked by `error_context_tables`, for
5054    /// example when the host holds one or more references to error contexts.
5055    ///
5056    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
5057    /// component-wide representation) of the index into concurrent state for a given
5058    /// stored `ErrorContext`.
5059    ///
5060    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
5061    /// as a `TableId<ErrorContextState>`.
5062    global_error_context_ref_counts:
5063        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5064
5065    /// The number of "interesting tasks" currently executing in the store.
5066    ///
5067    /// This tracks the concept of a component instance lifetime as defined in
5068    /// https://github.com/WebAssembly/component-model/pull/643. Specifically
5069    /// all tasks currently increment this counter which then gets decremented
5070    /// when they exit. In the future some tasks might not increment this
5071    /// counter, but for now all do.
5072    ///
5073    /// This is used to implement `Accessor::poll_no_interesting_tasks` to
5074    /// inform the embedder when all tasks have completed. This is then
5075    /// used in wasmtime-wasi-http, for example, to know when an instance is
5076    /// idle.
5077    interesting_tasks: usize,
5078
5079    /// Single waker to notify when `interesting_tasks` reaches 0.
5080    ///
5081    /// Used in the implementation of `Accessor::poll_no_interesting_tasks`.
5082    interesting_tasks_empty_waker: Option<Waker>,
5083}
5084
5085impl Default for ConcurrentState {
5086    fn default() -> Self {
5087        Self {
5088            current_thread: CurrentThread::None,
5089            table: AlwaysMut::new(ResourceTable::new()),
5090            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5091            high_priority: Vec::new(),
5092            low_priority: VecDeque::new(),
5093            suspend_reason: None,
5094            worker: None,
5095            worker_item: None,
5096            global_error_context_ref_counts: BTreeMap::new(),
5097            interesting_tasks: 0,
5098            interesting_tasks_empty_waker: None,
5099        }
5100    }
5101}
5102
5103impl ConcurrentState {
5104    /// Take ownership of any fibers and futures owned by this object.
5105    ///
5106    /// This should be used when disposing of the `Store` containing this object
5107    /// in order to gracefully resolve any and all fibers using
5108    /// `StoreFiber::dispose`.  This is necessary to avoid possible
5109    /// use-after-free bugs due to fibers which may still have access to the
5110    /// `Store`.
5111    ///
5112    /// Additionally, the futures collected with this function should be dropped
5113    /// within a `tls::set` call, which will ensure than any futures closing
5114    /// over an `&Accessor` will have access to the store when dropped, allowing
5115    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
5116    /// panicking.
5117    ///
5118    /// Note that this will leave the object in an inconsistent and unusable
5119    /// state, so it should only be used just prior to dropping it.
5120    pub(crate) fn take_fibers_and_futures(
5121        &mut self,
5122        fibers: &mut Vec<StoreFiber<'static>>,
5123        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5124    ) {
5125        for entry in self.table.get_mut().iter_mut() {
5126            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5127                for mode in mem::take(&mut set.waiting).into_values() {
5128                    if let WaitMode::Fiber(fiber) = mode {
5129                        fibers.push(fiber);
5130                    }
5131                }
5132            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5133                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5134                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5135                {
5136                    fibers.push(fiber);
5137                }
5138            }
5139        }
5140
5141        if let Some(fiber) = self.worker.take() {
5142            fibers.push(fiber);
5143        }
5144
5145        let mut handle_item = |item| match item {
5146            WorkItem::ResumeFiber(fiber) => {
5147                fibers.push(fiber);
5148            }
5149            WorkItem::PushFuture(future) => {
5150                self.futures
5151                    .get_mut()
5152                    .as_mut()
5153                    .unwrap()
5154                    .push(future.into_inner());
5155            }
5156            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5157            }
5158        };
5159
5160        for item in mem::take(&mut self.high_priority) {
5161            handle_item(item);
5162        }
5163        for item in mem::take(&mut self.low_priority) {
5164            handle_item(item);
5165        }
5166
5167        if let Some(them) = self.futures.get_mut().take() {
5168            futures.push(them);
5169        }
5170    }
5171
5172    fn push<V: Send + Sync + 'static>(
5173        &mut self,
5174        value: V,
5175    ) -> Result<TableId<V>, ResourceTableError> {
5176        self.table.get_mut().push(value).map(TableId::from)
5177    }
5178
5179    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5180        self.table.get_mut().get_mut(&Resource::from(id))
5181    }
5182
5183    pub fn add_child<T: 'static, U: 'static>(
5184        &mut self,
5185        child: TableId<T>,
5186        parent: TableId<U>,
5187    ) -> Result<(), ResourceTableError> {
5188        self.table
5189            .get_mut()
5190            .add_child(Resource::from(child), Resource::from(parent))
5191    }
5192
5193    pub fn remove_child<T: 'static, U: 'static>(
5194        &mut self,
5195        child: TableId<T>,
5196        parent: TableId<U>,
5197    ) -> Result<(), ResourceTableError> {
5198        self.table
5199            .get_mut()
5200            .remove_child(Resource::from(child), Resource::from(parent))
5201    }
5202
5203    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5204        self.table.get_mut().delete(Resource::from(id))
5205    }
5206
5207    fn push_future(&mut self, future: HostTaskFuture) {
5208        // Note that we can't directly push to `ConcurrentState::futures` here
5209        // since this may be called from a future that's being polled inside
5210        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5211        // so it has exclusive access while polling it.  Therefore, we push a
5212        // work item to the "high priority" queue, which will actually push to
5213        // `ConcurrentState::futures` later.
5214        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5215    }
5216
5217    fn push_high_priority(&mut self, item: WorkItem) {
5218        log::trace!("push high priority: {item:?}");
5219        self.high_priority.push(item);
5220    }
5221
5222    fn push_low_priority(&mut self, item: WorkItem) {
5223        log::trace!("push low priority: {item:?}");
5224        self.low_priority.push_front(item);
5225    }
5226
5227    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5228        if high_priority {
5229            self.push_high_priority(item);
5230        } else {
5231            self.push_low_priority(item);
5232        }
5233    }
5234
5235    fn promote_instance_local_thread_work_item(
5236        &mut self,
5237        current_instance: RuntimeComponentInstanceIndex,
5238    ) -> bool {
5239        self.promote_work_items_matching(|item: &WorkItem| match item {
5240            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5241                *instance == current_instance
5242            }
5243            _ => false,
5244        })
5245    }
5246
5247    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5248        self.promote_work_items_matching(|item: &WorkItem| match item {
5249            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5250                *t == thread
5251            }
5252            _ => false,
5253        })
5254    }
5255
5256    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5257    where
5258        F: FnMut(&WorkItem) -> bool,
5259    {
5260        // If there's a high-priority work item to resume the current guest thread,
5261        // we don't need to promote anything, but we return true to indicate that
5262        // work is pending for the current instance.
5263        if self.high_priority.iter().any(&mut predicate) {
5264            true
5265        }
5266        // Otherwise, look for a low-priority work item that matches the current
5267        // instance and promote it to high-priority.
5268        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5269            let item = self.low_priority.remove(idx).unwrap();
5270            self.push_high_priority(item);
5271            true
5272        } else {
5273            false
5274        }
5275    }
5276
5277    /// Returns whether there's a pending cancellation on the current guest thread,
5278    /// consuming the event if so.
5279    fn take_pending_cancellation(&mut self) -> Result<bool> {
5280        let thread = self.current_guest_thread()?;
5281        if let Some(event) = self.get_mut(thread.task)?.event.take() {
5282            assert!(matches!(event, Event::Cancelled));
5283            Ok(true)
5284        } else {
5285            Ok(false)
5286        }
5287    }
5288
5289    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5290        if self.may_block(task)? {
5291            Ok(())
5292        } else {
5293            Err(Trap::CannotBlockSyncTask.into())
5294        }
5295    }
5296
5297    fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5298        let task = self.get_mut(task)?;
5299        Ok(task.async_function || task.returned_or_cancelled())
5300    }
5301
5302    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5303    /// specified task.
5304    ///
5305    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5306    /// below.
5307    pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5308        let (task, is_host) = (task >> 1, task & 1 == 1);
5309        if is_host {
5310            let task: TableId<HostTask> = TableId::new(task);
5311            Ok(&mut self.get_mut(task)?.call_context)
5312        } else {
5313            let task: TableId<GuestTask> = TableId::new(task);
5314            Ok(&mut self.get_mut(task)?.call_context)
5315        }
5316    }
5317
5318    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5319    /// in the future.
5320    pub fn current_call_context_scope_id(&self) -> Result<u32> {
5321        let (bits, is_host) = match self.current_thread {
5322            CurrentThread::Guest(id) => (id.task.rep(), false),
5323            CurrentThread::Host(id) => (id.rep(), true),
5324            CurrentThread::None => bail_bug!("current thread is not set"),
5325        };
5326        assert_eq!((bits << 1) >> 1, bits);
5327        Ok((bits << 1) | u32::from(is_host))
5328    }
5329
5330    fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5331        match self.current_thread.guest() {
5332            Some(id) => Ok(*id),
5333            None => bail_bug!("current thread is not a guest thread"),
5334        }
5335    }
5336
5337    fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5338        match self.current_thread.host() {
5339            Some(id) => Ok(id),
5340            None => bail_bug!("current thread is not a host thread"),
5341        }
5342    }
5343
5344    fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5345        match self.futures.get_mut().as_mut() {
5346            Some(f) => Ok(f),
5347            None => bail_bug!("futures field of concurrent state is currently taken"),
5348        }
5349    }
5350
5351    pub(crate) fn table(&mut self) -> &mut ResourceTable {
5352        self.table.get_mut()
5353    }
5354}
5355
5356/// Provide a type hint to compiler about the shape of a parameter lower
5357/// closure.
5358fn for_any_lower<
5359    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5360>(
5361    fun: F,
5362) -> F {
5363    fun
5364}
5365
5366/// Provide a type hint to compiler about the shape of a result lift closure.
5367fn for_any_lift<
5368    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5369>(
5370    fun: F,
5371) -> F {
5372    fun
5373}
5374
5375/// Wrap the specified future in a `poll_fn` which asserts that the future is
5376/// only polled from the event loop of the specified `Store`.
5377///
5378/// See `StoreContextMut::run_concurrent` for details.
5379fn checked<F: Future + Send + 'static>(
5380    id: StoreId,
5381    fut: F,
5382) -> impl Future<Output = F::Output> + Send + 'static {
5383    async move {
5384        let mut fut = pin!(fut);
5385        future::poll_fn(move |cx| {
5386            let message = "\
5387                `Future`s which depend on asynchronous component tasks, streams, or \
5388                futures to complete may only be polled from the event loop of the \
5389                store to which they belong.  Please use \
5390                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5391            ";
5392            tls::try_get(|store| {
5393                let matched = match store {
5394                    tls::TryGet::Some(store) => store.id() == id,
5395                    tls::TryGet::Taken | tls::TryGet::None => false,
5396                };
5397
5398                if !matched {
5399                    panic!("{message}")
5400                }
5401            });
5402            fut.as_mut().poll(cx)
5403        })
5404        .await
5405    }
5406}
5407
5408/// Assert that `StoreContextMut::run_concurrent` has not been called from
5409/// within an store's event loop.
5410fn check_recursive_run() {
5411    tls::try_get(|store| {
5412        if !matches!(store, tls::TryGet::None) {
5413            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5414        }
5415    });
5416}
5417
5418fn unpack_callback_code(code: u32) -> (u32, u32) {
5419    (code & 0xF, code >> 4)
5420}
5421
5422/// Helper struct for packaging parameters to be passed to
5423/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5424/// `waitable-set.poll`.
5425struct WaitableCheckParams {
5426    set: TableId<WaitableSet>,
5427    options: OptionsIndex,
5428    payload: u32,
5429}
5430
5431/// Indicates whether `ComponentInstance::waitable_check` is being called for
5432/// `waitable-set.wait` or `waitable-set.poll`.
5433enum WaitableCheck {
5434    Wait,
5435    Poll,
5436}
5437
5438/// Represents a guest task called from the host, prepared using `prepare_call`.
5439pub(crate) struct PreparedCall<R> {
5440    /// The guest export to be called
5441    handle: Func,
5442    /// The guest thread created by `prepare_call`
5443    thread: QualifiedThreadId,
5444    /// The number of lowered core Wasm parameters to pass to the call.
5445    param_count: usize,
5446    /// The `oneshot::Receiver` to which the result of the call will be
5447    /// delivered when it is available.
5448    rx: oneshot::Receiver<LiftedResult>,
5449    /// The instance that this call is prepared for.
5450    runtime_instance: RuntimeInstance,
5451    _phantom: PhantomData<R>,
5452}
5453
5454impl<R> PreparedCall<R> {
5455    /// Get a copy of the `TaskId` for this `PreparedCall`.
5456    pub(crate) fn task_id(&self) -> TaskId {
5457        TaskId {
5458            task: self.thread.task,
5459            runtime_instance: self.runtime_instance,
5460        }
5461    }
5462}
5463
5464/// Represents a task created by `prepare_call`.
5465pub(crate) struct TaskId {
5466    task: TableId<GuestTask>,
5467    runtime_instance: RuntimeInstance,
5468}
5469
5470impl TaskId {
5471    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5472    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5473    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5474    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5475    /// and delete the task when all threads are done.
5476    pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5477        let task = store.concurrent_state_mut().get_mut(self.task)?;
5478        let delete = if !task.already_lowered_parameters() {
5479            store.cancel_guest_subtask_without_lowered_parameters(
5480                self.runtime_instance,
5481                self.task,
5482            )?;
5483            true
5484        } else {
5485            task.host_future_state = HostFutureState::Dropped;
5486            task.ready_to_delete()
5487        };
5488        if delete {
5489            Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5490        }
5491        Ok(())
5492    }
5493}
5494
5495/// Prepare a call to the specified exported Wasm function, providing functions
5496/// for lowering the parameters and lifting the result.
5497///
5498/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5499/// loop, use `queue_call`.
5500pub(crate) fn prepare_call<T, R>(
5501    mut store: StoreContextMut<T>,
5502    handle: Func,
5503    param_count: usize,
5504    host_future_present: bool,
5505    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5506    + Send
5507    + Sync
5508    + 'static,
5509    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5510    + Send
5511    + Sync
5512    + 'static,
5513) -> Result<PreparedCall<R>> {
5514    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5515
5516    let instance = handle.instance().id().get(store.0);
5517    let options = &instance.component().env_component().options[options];
5518    let ty = &instance.component().types()[ty];
5519    let async_function = ty.async_;
5520    let task_return_type = ty.results;
5521    let component_instance = raw_options.instance;
5522    let callback = options.callback.map(|i| instance.runtime_callback(i));
5523    let memory = options
5524        .memory()
5525        .map(|i| instance.runtime_memory(i))
5526        .map(SendSyncPtr::new);
5527    let string_encoding = options.string_encoding;
5528    let token = StoreToken::new(store.as_context_mut());
5529    let state = store.0.concurrent_state_mut();
5530
5531    let (tx, rx) = oneshot::channel();
5532
5533    let instance = handle.instance().runtime_instance(component_instance);
5534    let caller = state.current_thread;
5535    let thread = GuestTask::new(
5536        state,
5537        Box::new(for_any_lower(move |store, params| {
5538            lower_params(handle, token.as_context_mut(store), params)
5539        })),
5540        LiftResult {
5541            lift: Box::new(for_any_lift(move |store, result| {
5542                lift_result(handle, store, result)
5543            })),
5544            ty: task_return_type,
5545            memory,
5546            string_encoding,
5547        },
5548        Caller::Host {
5549            tx: Some(tx),
5550            host_future_present,
5551            caller,
5552        },
5553        callback.map(|callback| {
5554            let callback = SendSyncPtr::new(callback);
5555            let instance = handle.instance();
5556            Box::new(move |store: &mut dyn VMStore, event, handle| {
5557                let store = token.as_context_mut(store);
5558                // SAFETY: Per the contract of `prepare_call`, the callback
5559                // will remain valid at least as long is this task exists.
5560                unsafe { instance.call_callback(store, callback, event, handle) }
5561            }) as CallbackFn
5562        }),
5563        instance,
5564        async_function,
5565    )?;
5566
5567    if !store.0.may_enter(instance)? {
5568        bail!(Trap::CannotEnterComponent);
5569    }
5570
5571    Ok(PreparedCall {
5572        handle,
5573        thread,
5574        param_count,
5575        runtime_instance: instance,
5576        rx,
5577        _phantom: PhantomData,
5578    })
5579}
5580
5581/// Queue a call previously prepared using `prepare_call` to be run as part of
5582/// the associated `ComponentInstance`'s event loop.
5583///
5584/// The returned future will resolve to the result once it is available, but
5585/// must only be polled via the instance's event loop. See
5586/// `StoreContextMut::run_concurrent` for details.
5587pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5588    mut store: StoreContextMut<T>,
5589    prepared: PreparedCall<R>,
5590) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5591    let PreparedCall {
5592        handle,
5593        thread,
5594        param_count,
5595        rx,
5596        ..
5597    } = prepared;
5598
5599    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5600
5601    Ok(checked(
5602        store.0.id(),
5603        rx.map(move |result| match result {
5604            Ok(r) => match r.downcast() {
5605                Ok(r) => Ok(*r),
5606                Err(_) => bail_bug!("wrong type of value produced"),
5607            },
5608            Err(e) => Err(e.into()),
5609        }),
5610    ))
5611}
5612
5613/// Queue a call previously prepared using `prepare_call` to be run as part of
5614/// the associated `ComponentInstance`'s event loop.
5615fn queue_call0<T: 'static>(
5616    store: StoreContextMut<T>,
5617    handle: Func,
5618    guest_thread: QualifiedThreadId,
5619    param_count: usize,
5620) -> Result<()> {
5621    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5622    let is_concurrent = raw_options.async_;
5623    let callback = raw_options.callback;
5624    let instance = handle.instance();
5625    let callee = handle.lifted_core_func(store.0);
5626    let post_return = handle.post_return_core_func(store.0);
5627    let callback = callback.map(|i| {
5628        let instance = instance.id().get(store.0);
5629        SendSyncPtr::new(instance.runtime_callback(i))
5630    });
5631
5632    log::trace!("queueing call {guest_thread:?}");
5633
5634    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5635    // (with signatures appropriate for this call) and will remain valid as
5636    // long as this instance is valid.
5637    unsafe {
5638        instance.queue_call(
5639            store,
5640            guest_thread,
5641            SendSyncPtr::new(callee),
5642            param_count,
5643            1,
5644            is_concurrent,
5645            callback,
5646            post_return.map(SendSyncPtr::new),
5647        )
5648    }
5649}