wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{
3    Event, GlobalErrorContextRefCount, HostTaskOutput, LocalErrorContextRefCount, StateTable,
4    Waitable, WaitableCommon, WaitableState,
5};
6use crate::component::concurrent::{ConcurrentState, tls};
7use crate::component::func::{self, LiftContext, LowerContext, Options};
8use crate::component::matching::InstanceType;
9use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
10use crate::component::{Instance, Lower, Val, WasmList, WasmStr};
11use crate::store::{StoreOpaque, StoreToken};
12use crate::vm::{VMFuncRef, VMMemoryDefinition, VMStore};
13use crate::{AsContextMut, StoreContextMut, ValRaw};
14use anyhow::{Context, Result, anyhow, bail};
15use buffers::Extender;
16use buffers::UntypedWriteBuffer;
17use futures::channel::{mpsc, oneshot};
18use futures::future::{self, FutureExt};
19use futures::stream::StreamExt;
20use std::boxed::Box;
21use std::fmt;
22use std::future::Future;
23use std::iter;
24use std::marker::PhantomData;
25use std::mem::{self, MaybeUninit};
26use std::ptr::NonNull;
27use std::string::{String, ToString};
28use std::sync::{Arc, Mutex};
29use std::task::{Poll, Waker};
30use std::vec::Vec;
31use wasmtime_environ::component::{
32    CanonicalAbiInfo, ComponentTypes, InterfaceType, RuntimeComponentInstanceIndex, StringEncoding,
33    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
34    TypeFutureTableIndex, TypeStreamTableIndex,
35};
36
37pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
38
39mod buffers;
40
41/// Enum for distinguishing between a stream or future in functions that handle
42/// both.
43#[derive(Copy, Clone, Debug)]
44enum TransmitKind {
45    Stream,
46    Future,
47}
48
49/// Represents `{stream,future}.{read,write}` results.
50#[derive(Copy, Clone, Debug, PartialEq)]
51pub enum ReturnCode {
52    Blocked,
53    Completed(u32),
54    Dropped(u32),
55    Cancelled(u32),
56}
57
58impl ReturnCode {
59    /// Pack `self` into a single 32-bit integer that may be returned to the
60    /// guest.
61    ///
62    /// This corresponds to `pack_copy_result` in the Component Model spec.
63    pub fn encode(&self) -> u32 {
64        const BLOCKED: u32 = 0xffff_ffff;
65        const COMPLETED: u32 = 0x0;
66        const DROPPED: u32 = 0x1;
67        const CANCELLED: u32 = 0x2;
68        match self {
69            ReturnCode::Blocked => BLOCKED,
70            ReturnCode::Completed(n) => {
71                debug_assert!(*n < (1 << 28));
72                (n << 4) | COMPLETED
73            }
74            ReturnCode::Dropped(n) => {
75                debug_assert!(*n < (1 << 28));
76                (n << 4) | DROPPED
77            }
78            ReturnCode::Cancelled(n) => {
79                debug_assert!(*n < (1 << 28));
80                (n << 4) | CANCELLED
81            }
82        }
83    }
84
85    /// Returns `Self::Completed` with the specified count (or zero if
86    /// `matches!(kind, TransmitKind::Future)`)
87    fn completed(kind: TransmitKind, count: u32) -> Self {
88        Self::Completed(if let TransmitKind::Future = kind {
89            0
90        } else {
91            count
92        })
93    }
94}
95
96/// Represents a stream or future type index.
97///
98/// This is useful as a parameter type for functions which operate on either a
99/// future or a stream.
100#[derive(Copy, Clone, Debug)]
101pub(super) enum TableIndex {
102    Stream(TypeStreamTableIndex),
103    Future(TypeFutureTableIndex),
104}
105
106impl TableIndex {
107    fn kind(&self) -> TransmitKind {
108        match self {
109            TableIndex::Stream(_) => TransmitKind::Stream,
110            TableIndex::Future(_) => TransmitKind::Future,
111        }
112    }
113}
114
115/// Action to take after writing
116enum PostWrite {
117    /// Continue performing writes
118    Continue,
119    /// Drop the channel post-write
120    Drop,
121}
122
123/// Represents the result of a host-initiated stream or future read or write.
124struct HostResult<B> {
125    /// The buffer provided when reading or writing.
126    buffer: B,
127    /// Whether the other end of the stream or future has been dropped.
128    dropped: bool,
129}
130
131/// Retrieve the payload type of the specified stream or future, or `None` if it
132/// has no payload type.
133fn payload(ty: TableIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
134    match ty {
135        TableIndex::Future(ty) => types[types[ty].ty].payload,
136        TableIndex::Stream(ty) => types[types[ty].ty].payload,
137    }
138}
139
140/// Retrieve the host rep and state for the specified guest-visible waitable
141/// handle.
142fn get_mut_by_index_from(
143    state_table: &mut StateTable<WaitableState>,
144    ty: TableIndex,
145    index: u32,
146) -> Result<(u32, &mut StreamFutureState)> {
147    Ok(match ty {
148        TableIndex::Stream(ty) => {
149            let (rep, WaitableState::Stream(actual_ty, state)) =
150                state_table.get_mut_by_index(index)?
151            else {
152                bail!("invalid stream handle");
153            };
154            if *actual_ty != ty {
155                bail!("invalid stream handle");
156            }
157            (rep, state)
158        }
159        TableIndex::Future(ty) => {
160            let (rep, WaitableState::Future(actual_ty, state)) =
161                state_table.get_mut_by_index(index)?
162            else {
163                bail!("invalid future handle");
164            };
165            if *actual_ty != ty {
166                bail!("invalid future handle");
167            }
168            (rep, state)
169        }
170    })
171}
172
173/// Construct a `WaitableState` using the specified type and state.
174fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState {
175    match ty {
176        TableIndex::Stream(ty) => WaitableState::Stream(ty, state),
177        TableIndex::Future(ty) => WaitableState::Future(ty, state),
178    }
179}
180
181/// Return a closure which matches a host write operation to a read (or drop)
182/// operation.
183///
184/// This may be used when the host initiates a write but there is no read
185/// pending at the other end, in which case we construct a
186/// `WriteState::HostReady` using the closure created here and leave it in
187/// `TransmitState::write` for the reader to find and call when it's ready.
188fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
189    store: StoreContextMut<U>,
190    mut buffer: B,
191    tx: oneshot::Sender<HostResult<B>>,
192    kind: TransmitKind,
193) -> impl FnOnce(&mut dyn VMStore, Instance, Reader) -> Result<ReturnCode>
194+ Send
195+ Sync
196+ 'static
197+ use<T, B, U> {
198    let token = StoreToken::new(store);
199    move |store, instance, reader| {
200        let code = match reader {
201            Reader::Guest {
202                options,
203                ty,
204                address,
205                count,
206            } => {
207                let mut store = token.as_context_mut(store);
208                let types = instance.id().get(store.0).component().types().clone();
209                let count = buffer.remaining().len().min(count);
210
211                let lower =
212                    &mut LowerContext::new(store.as_context_mut(), options, &types, instance);
213                if address % usize::try_from(T::ALIGN32)? != 0 {
214                    bail!("read pointer not aligned");
215                }
216                lower
217                    .as_slice_mut()
218                    .get_mut(address..)
219                    .and_then(|b| b.get_mut(..T::SIZE32 * count))
220                    .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
221
222                if let Some(ty) = payload(ty, &types) {
223                    T::linear_store_list_to_memory(
224                        lower,
225                        ty,
226                        address,
227                        &buffer.remaining()[..count],
228                    )?;
229                }
230
231                buffer.skip(count);
232                _ = tx.send(HostResult {
233                    buffer,
234                    dropped: false,
235                });
236                ReturnCode::completed(kind, count.try_into().unwrap())
237            }
238            Reader::Host { accept } => {
239                let count = buffer.remaining().len();
240                let mut untyped = UntypedWriteBuffer::new(&mut buffer);
241                let count = accept(&mut untyped, count);
242                _ = tx.send(HostResult {
243                    buffer,
244                    dropped: false,
245                });
246                ReturnCode::completed(kind, count.try_into().unwrap())
247            }
248            Reader::End => {
249                _ = tx.send(HostResult {
250                    buffer,
251                    dropped: true,
252                });
253                ReturnCode::Dropped(0)
254            }
255        };
256
257        Ok(code)
258    }
259}
260
261/// Return a closure which matches a host read operation to a write (or drop)
262/// operation.
263///
264/// This may be used when the host initiates a read but there is no write
265/// pending at the other end, in which case we construct a
266/// `ReadState::HostReady` using the closure created here and leave it in
267/// `TransmitState::read` for the writer to find and call when it's ready.
268fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
269    mut buffer: B,
270    tx: oneshot::Sender<HostResult<B>>,
271    kind: TransmitKind,
272) -> impl FnOnce(Writer) -> Result<ReturnCode> + Send + Sync + 'static {
273    move |writer| {
274        let count = match writer {
275            Writer::Guest {
276                lift,
277                ty,
278                address,
279                count,
280            } => {
281                let count = count.min(buffer.remaining_capacity());
282                if T::IS_RUST_UNIT_TYPE {
283                    // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
284                    // zero-sized type, so `MaybeUninit::uninit().assume_init()`
285                    // is a valid way to populate the zero-sized buffer.
286                    buffer.extend(
287                        iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() })
288                            .take(count),
289                    )
290                } else {
291                    let ty = ty.unwrap();
292                    if address % usize::try_from(T::ALIGN32)? != 0 {
293                        bail!("write pointer not aligned");
294                    }
295                    lift.memory()
296                        .get(address..)
297                        .and_then(|b| b.get(..T::SIZE32 * count))
298                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
299
300                    let list = &WasmList::new(address, count, lift, ty)?;
301                    T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))?
302                }
303                _ = tx.send(HostResult {
304                    buffer,
305                    dropped: false,
306                });
307                ReturnCode::completed(kind, count.try_into().unwrap())
308            }
309            Writer::Host {
310                buffer: input,
311                count,
312            } => {
313                let count = count.min(buffer.remaining_capacity());
314                buffer.move_from(input.get_mut::<T>(), count);
315                _ = tx.send(HostResult {
316                    buffer,
317                    dropped: false,
318                });
319                ReturnCode::completed(kind, count.try_into().unwrap())
320            }
321            Writer::End => {
322                _ = tx.send(HostResult {
323                    buffer,
324                    dropped: true,
325                });
326                ReturnCode::Dropped(0)
327            }
328        };
329
330        Ok(count)
331    }
332}
333
334/// Represents the state of a stream or future handle from the perspective of a
335/// given component instance.
336#[derive(Debug, Eq, PartialEq)]
337pub(super) enum StreamFutureState {
338    /// The write end of the stream or future.
339    Write {
340        /// Whether the component instance has been notified that the stream or
341        /// future is "done" (i.e. the other end has dropped, or, in the case of
342        /// a future, a value has been transmitted).
343        done: bool,
344    },
345    /// The read end of the stream or future.
346    Read {
347        /// Whether the component instance has been notified that the stream or
348        /// future is "done" (i.e. the other end has dropped, or, in the case of
349        /// a future, a value has been transmitted).
350        done: bool,
351    },
352    /// A read or write is in progress.
353    Busy,
354}
355
356/// Represents the state associated with an error context
357#[derive(Debug, PartialEq, Eq, PartialOrd)]
358pub(super) struct ErrorContextState {
359    /// Debug message associated with the error context
360    pub(crate) debug_msg: String,
361}
362
363/// Represents the size and alignment for a "flat" Component Model type,
364/// i.e. one containing no pointers or handles.
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366pub(super) struct FlatAbi {
367    pub(super) size: u32,
368    pub(super) align: u32,
369}
370
371/// Represents a pending event on a host-owned write end of a stream or future.
372///
373/// See `ComponentInstance::start_write_event_loop` for details.
374enum WriteEvent<B> {
375    /// Write the items in the specified buffer to the stream or future, and
376    /// return the result via the specified `Sender`.
377    Write {
378        buffer: B,
379        tx: oneshot::Sender<HostResult<B>>,
380    },
381    /// Drop the write end of the stream or future.
382    Drop(Option<Box<dyn FnOnce() -> B + Send + Sync>>),
383    /// Watch the read (i.e. opposite) end of this stream or future, dropping
384    /// the specified sender when it is dropped.
385    Watch { tx: oneshot::Sender<()> },
386}
387
388/// Represents a pending event on a host-owned read end of a stream or future.
389///
390/// See `ComponentInstance::start_read_event_loop` for details.
391enum ReadEvent<B> {
392    /// Read as many items as the specified buffer will hold from the stream or
393    /// future, and return the result via the specified `Sender`.
394    Read {
395        buffer: B,
396        tx: oneshot::Sender<HostResult<B>>,
397    },
398    /// Drop the read end of the stream or future.
399    Drop,
400    /// Watch the write (i.e. opposite) end of this stream or future, dropping
401    /// the specified sender when it is dropped.
402    Watch { tx: oneshot::Sender<()> },
403}
404
405/// Send the specified value to the specified `Sender`.
406///
407/// This will panic if there is no room in the channel's buffer, so it should
408/// only be used in a context where there is at least one empty spot in the
409/// buffer.  It will silently ignore any other error (e.g. if the `Receiver` has
410/// been dropped).
411fn send<T>(tx: &mut mpsc::Sender<T>, value: T) {
412    if let Err(e) = tx.try_send(value) {
413        if e.is_full() {
414            unreachable!();
415        }
416    }
417}
418
419/// Wrapper struct which may be converted to the inner value as needed.
420///
421/// This object is normally paired with a `Future` which represents a state
422/// change on the inner value, resolving when that state change happens _or_
423/// when the `Watch` is converted back into the inner value -- whichever happens
424/// first.
425pub struct Watch<T> {
426    inner: T,
427    waker: Arc<Mutex<WatchState>>,
428}
429
430enum WatchState {
431    Idle,
432    Waiting(Waker),
433    Done,
434}
435
436impl<T> Watch<T> {
437    /// Convert this object into its inner value.
438    ///
439    /// Calling this function will cause the associated `Future` to resolve
440    /// immediately if it hasn't already.
441    pub fn into_inner(self) -> T {
442        let state = mem::replace(&mut *self.waker.lock().unwrap(), WatchState::Done);
443        if let WatchState::Waiting(waker) = state {
444            waker.wake();
445        }
446        self.inner
447    }
448}
449
450/// Wrap the specified `oneshot::Receiver` in a future which resolves when
451/// either that `Receiver` resolves or `Watch::into_inner` has been called on
452/// the returned `Watch`.
453fn watch<T: Send + 'static>(
454    instance: Instance,
455    mut rx: oneshot::Receiver<()>,
456    inner: T,
457) -> (impl Future<Output = ()> + Send + 'static, Watch<T>) {
458    let waker = Arc::new(Mutex::new(WatchState::Idle));
459    (
460        super::checked(
461            instance,
462            future::poll_fn({
463                let waker = waker.clone();
464
465                move |cx| {
466                    if rx.poll_unpin(cx).is_ready() {
467                        return Poll::Ready(());
468                    }
469                    let mut state = waker.lock().unwrap();
470                    match *state {
471                        WatchState::Done => Poll::Ready(()),
472                        _ => {
473                            *state = WatchState::Waiting(cx.waker().clone());
474                            Poll::Pending
475                        }
476                    }
477                }
478            }),
479        ),
480        Watch { waker, inner },
481    )
482}
483
484/// Represents the writable end of a Component Model `future`.
485pub struct FutureWriter<T: 'static> {
486    default: Option<fn() -> T>,
487    instance: Instance,
488    tx: Option<mpsc::Sender<WriteEvent<Option<T>>>>,
489}
490
491impl<T> FutureWriter<T> {
492    fn new(
493        default: fn() -> T,
494        tx: Option<mpsc::Sender<WriteEvent<Option<T>>>>,
495        instance: Instance,
496    ) -> Self {
497        Self {
498            default: Some(default),
499            instance,
500            tx,
501        }
502    }
503
504    /// Write the specified value to this `future`.
505    ///
506    /// The returned `Future` will yield `true` if the read end accepted the
507    /// value; otherwise it will return `false`, meaning the read end was dropped
508    /// before the value could be delivered.
509    ///
510    /// Note that the returned `Future` must be polled from the event loop of
511    /// the component instance from which this `FutureWriter` originated.  See
512    /// [`Instance::run`] for details.
513    pub fn write(mut self, value: T) -> impl Future<Output = bool> + Send + 'static
514    where
515        T: Send + 'static,
516    {
517        let (tx, rx) = oneshot::channel();
518        send(
519            &mut self.tx.as_mut().unwrap(),
520            WriteEvent::Write {
521                buffer: Some(value),
522                tx,
523            },
524        );
525        self.default = None;
526        let instance = self.instance;
527        super::checked(
528            instance,
529            rx.map(move |v| {
530                drop(self);
531                match v {
532                    Ok(HostResult { dropped, .. }) => !dropped,
533                    Err(_) => todo!("guarantee buffer recovery if event loop errors or panics"),
534                }
535            }),
536        )
537    }
538
539    /// Convert this object into a `Future` which will resolve when the read end
540    /// of this `future` is dropped, plus a `Watch` which can be used to retrieve
541    /// the `FutureWriter` again.
542    ///
543    /// Note that calling `Watch::into_inner` on the returned `Watch` will have
544    /// the side effect of causing the `Future` to resolve immediately if it
545    /// hasn't already.
546    ///
547    /// Also note that the returned `Future` must be polled from the event loop
548    /// of the component instance from which this `FutureWriter` originated.
549    /// See [`Instance::run`] for details.
550    pub fn watch_reader(mut self) -> (impl Future<Output = ()> + Send + 'static, Watch<Self>)
551    where
552        T: Send + 'static,
553    {
554        let (tx, rx) = oneshot::channel();
555        send(&mut self.tx.as_mut().unwrap(), WriteEvent::Watch { tx });
556        let instance = self.instance;
557        watch(instance, rx, self)
558    }
559}
560
561impl<T> Drop for FutureWriter<T> {
562    fn drop(&mut self) {
563        if let Some(mut tx) = self.tx.take() {
564            send(
565                &mut tx,
566                WriteEvent::Drop(self.default.take().map(|v| {
567                    Box::new(move || Some(v()))
568                        as Box<dyn FnOnce() -> Option<T> + Send + Sync + 'static>
569                })),
570            );
571        }
572    }
573}
574
575/// Represents the readable end of a Component Model `future`.
576///
577/// In order to actually read from or drop this `future`, first convert it to a
578/// [`FutureReader`] using the `into_reader` method.
579///
580/// Note that if a value of this type is dropped without either being converted
581/// to a `FutureReader` or passed to the guest, any writes on the write end may
582/// block forever.
583pub struct HostFuture<T> {
584    instance: Instance,
585    rep: u32,
586    _phantom: PhantomData<T>,
587}
588
589impl<T> HostFuture<T> {
590    /// Create a new `HostFuture`.
591    fn new(rep: u32, instance: Instance) -> Self {
592        Self {
593            instance,
594            rep,
595            _phantom: PhantomData,
596        }
597    }
598
599    /// Convert this object into a [`FutureReader`].
600    pub fn into_reader(self, mut store: impl AsContextMut) -> FutureReader<T>
601    where
602        T: func::Lower + func::Lift + Send + Sync + 'static,
603    {
604        FutureReader {
605            instance: self.instance,
606            rep: self.rep,
607            tx: Some(self.instance.start_read_event_loop(
608                store.as_context_mut(),
609                self.rep,
610                TransmitKind::Future,
611            )),
612        }
613    }
614
615    /// Convert this `FutureReader` into a [`Val`].
616    // See TODO comment for `FutureAny`; this is prone to handle leakage.
617    pub fn into_val(self) -> Val {
618        Val::Future(FutureAny(self.rep))
619    }
620
621    /// Attempt to convert the specified [`Val`] to a `FutureReader`.
622    pub fn from_val(
623        mut store: impl AsContextMut<Data: Send>,
624        instance: Instance,
625        value: &Val,
626    ) -> Result<Self> {
627        let Val::Future(FutureAny(rep)) = value else {
628            bail!("expected `future`; got `{}`", value.desc());
629        };
630        let store = store.as_context_mut();
631        instance
632            .concurrent_state_mut(store.0)
633            .get(TableId::<TransmitHandle>::new(*rep))?; // Just make sure it's present
634        Ok(Self::new(*rep, instance))
635    }
636
637    /// Transfer ownership of the read end of a future from a guest to the host.
638    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
639        match ty {
640            InterfaceType::Future(src) => {
641                let state_table = cx
642                    .instance_mut()
643                    .concurrent_state_mut()
644                    .state_table(TableIndex::Future(src));
645                let (rep, state) =
646                    get_mut_by_index_from(state_table, TableIndex::Future(src), index)?;
647
648                match state {
649                    StreamFutureState::Read { .. } => {
650                        state_table.remove_by_index(index)?;
651                    }
652                    StreamFutureState::Write { .. } => bail!("cannot transfer write end of future"),
653                    StreamFutureState::Busy => bail!("cannot transfer busy future"),
654                }
655
656                let concurrent_state = cx.instance_mut().concurrent_state_mut();
657                let state = concurrent_state
658                    .get(TableId::<TransmitHandle>::new(rep))?
659                    .state;
660
661                if concurrent_state.get(state)?.done {
662                    bail!("cannot lift future after previous read succeeded");
663                }
664
665                Ok(Self::new(rep, cx.instance_handle()))
666            }
667            _ => func::bad_type_info(),
668        }
669    }
670}
671
672/// Transfer ownership of the read end of a future from the host to a guest.
673pub(crate) fn lower_future_to_index<U>(
674    rep: u32,
675    cx: &mut LowerContext<'_, U>,
676    ty: InterfaceType,
677) -> Result<u32> {
678    match ty {
679        InterfaceType::Future(dst) => {
680            let concurrent_state = cx.instance_mut().concurrent_state_mut();
681            let state = concurrent_state
682                .get(TableId::<TransmitHandle>::new(rep))?
683                .state;
684            let rep = concurrent_state.get(state)?.read_handle.rep();
685
686            concurrent_state
687                .state_table(TableIndex::Future(dst))
688                .insert(
689                    rep,
690                    WaitableState::Future(dst, StreamFutureState::Read { done: false }),
691                )
692        }
693        _ => func::bad_type_info(),
694    }
695}
696
697// SAFETY: This relies on the `ComponentType` implementation for `u32` being
698// safe and correct since we lift and lower future handles as `u32`s.
699unsafe impl<T: Send + Sync> func::ComponentType for HostFuture<T> {
700    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
701
702    type Lower = <u32 as func::ComponentType>::Lower;
703
704    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
705        match ty {
706            InterfaceType::Future(_) => Ok(()),
707            other => bail!("expected `future`, found `{}`", func::desc(other)),
708        }
709    }
710}
711
712// SAFETY: See the comment on the `ComponentType` `impl` for this type.
713unsafe impl<T: Send + Sync> func::Lower for HostFuture<T> {
714    fn linear_lower_to_flat<U>(
715        &self,
716        cx: &mut LowerContext<'_, U>,
717        ty: InterfaceType,
718        dst: &mut MaybeUninit<Self::Lower>,
719    ) -> Result<()> {
720        lower_future_to_index(self.rep, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
721    }
722
723    fn linear_lower_to_memory<U>(
724        &self,
725        cx: &mut LowerContext<'_, U>,
726        ty: InterfaceType,
727        offset: usize,
728    ) -> Result<()> {
729        lower_future_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
730            cx,
731            InterfaceType::U32,
732            offset,
733        )
734    }
735}
736
737// SAFETY: See the comment on the `ComponentType` `impl` for this type.
738unsafe impl<T: Send + Sync> func::Lift for HostFuture<T> {
739    fn linear_lift_from_flat(
740        cx: &mut LiftContext<'_>,
741        ty: InterfaceType,
742        src: &Self::Lower,
743    ) -> Result<Self> {
744        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
745        Self::lift_from_index(cx, ty, index)
746    }
747
748    fn linear_lift_from_memory(
749        cx: &mut LiftContext<'_>,
750        ty: InterfaceType,
751        bytes: &[u8],
752    ) -> Result<Self> {
753        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
754        Self::lift_from_index(cx, ty, index)
755    }
756}
757
758impl<T> From<FutureReader<T>> for HostFuture<T> {
759    fn from(mut value: FutureReader<T>) -> Self {
760        value.tx.take();
761
762        Self {
763            instance: value.instance,
764            rep: value.rep,
765            _phantom: PhantomData,
766        }
767    }
768}
769
770/// Represents the readable end of a Component Model `future`.
771///
772/// In order to pass this end to guest code, first convert it to a
773/// [`HostFuture`] using the `into` method.
774pub struct FutureReader<T> {
775    instance: Instance,
776    rep: u32,
777    tx: Option<mpsc::Sender<ReadEvent<Option<T>>>>,
778}
779
780impl<T> FutureReader<T> {
781    fn new(rep: u32, tx: Option<mpsc::Sender<ReadEvent<Option<T>>>>, instance: Instance) -> Self {
782        Self { instance, rep, tx }
783    }
784
785    /// Read the value from this `future`.
786    ///
787    /// The returned `Future` will yield `None` if the guest has trapped
788    /// before it could produce a result.
789    ///
790    /// Note that the returned `Future` must be polled from the event loop of
791    /// the component instance from which this `FutureReader` originated.  See
792    /// [`Instance::run`] for details.
793    pub fn read(mut self) -> impl Future<Output = Option<T>> + Send + 'static
794    where
795        T: Send + 'static,
796    {
797        let (tx, rx) = oneshot::channel();
798        send(
799            &mut self.tx.as_mut().unwrap(),
800            ReadEvent::Read { buffer: None, tx },
801        );
802        let instance = self.instance;
803        super::checked(
804            instance,
805            rx.map(move |v| {
806                drop(self);
807
808                if let Ok(HostResult {
809                    mut buffer,
810                    dropped: false,
811                }) = v
812                {
813                    buffer.take()
814                } else {
815                    None
816                }
817            }),
818        )
819    }
820
821    /// Convert this object into a `Future` which will resolve when the write
822    /// end of this `future` is dropped, plus a `Watch` which can be used to
823    /// retrieve the `FutureReader` again.
824    ///
825    /// Note that calling `Watch::into_inner` on the returned `Watch` will have
826    /// the side effect of causing the `Future` to resolve immediately if it
827    /// hasn't already.
828    ///
829    /// Also note that the returned `Future` must be polled from the event loop
830    /// of the component instance from which this `FutureReader` originated.
831    /// See [`Instance::run`] for details.
832    pub fn watch_writer(mut self) -> (impl Future<Output = ()> + Send + 'static, Watch<Self>)
833    where
834        T: Send + 'static,
835    {
836        let (tx, rx) = oneshot::channel();
837        send(&mut self.tx.as_mut().unwrap(), ReadEvent::Watch { tx });
838        let instance = self.instance;
839        watch(instance, rx, self)
840    }
841}
842
843impl<T> Drop for FutureReader<T> {
844    fn drop(&mut self) {
845        if let Some(mut tx) = self.tx.take() {
846            send(&mut tx, ReadEvent::Drop);
847        }
848    }
849}
850
851/// Represents the writable end of a Component Model `stream`.
852pub struct StreamWriter<B> {
853    instance: Instance,
854    tx: Option<mpsc::Sender<WriteEvent<B>>>,
855}
856
857impl<B> StreamWriter<B> {
858    fn new(tx: Option<mpsc::Sender<WriteEvent<B>>>, instance: Instance) -> Self {
859        Self { instance, tx }
860    }
861
862    /// Write the specified items to the `stream`.
863    ///
864    /// Note that this will only write as many items as the reader accepts
865    /// during its current or next read.  Use `write_all` to loop until the
866    /// buffer is drained or the read end is dropped.
867    ///
868    /// The returned `Future` will yield a `(Some(_), _)` if the write completed
869    /// (possibly consuming a subset of the items or nothing depending on the
870    /// number of items the reader accepted).  It will return `(None, _)` if the
871    /// write failed due to the closure of the read end.  In either case, the
872    /// returned buffer will be the same one passed as a parameter, possibly
873    /// mutated to consume any written values.
874    ///
875    /// Note that the returned `Future` must be polled from the event loop of
876    /// the component instance from which this `StreamWriter` originated.  See
877    /// [`Instance::run`] for details.
878    pub fn write(
879        mut self,
880        buffer: B,
881    ) -> impl Future<Output = (Option<StreamWriter<B>>, B)> + Send + 'static
882    where
883        B: Send + 'static,
884    {
885        let (tx, rx) = oneshot::channel();
886        send(self.tx.as_mut().unwrap(), WriteEvent::Write { buffer, tx });
887        let instance = self.instance;
888        super::checked(
889            instance,
890            rx.map(move |v| match v {
891                Ok(HostResult { buffer, dropped }) => ((!dropped).then_some(self), buffer),
892                Err(_) => todo!("guarantee buffer recovery if event loop errors or panics"),
893            }),
894        )
895    }
896
897    /// Write the specified values until either the buffer is drained or the
898    /// read end is dropped.
899    ///
900    /// The returned `Future` will yield a `(Some(_), _)` if the write completed
901    /// (i.e. all the items were accepted).  It will return `(None, _)` if the
902    /// write failed due to the closure of the read end.  In either case, the
903    /// returned buffer will be the same one passed as a parameter, possibly
904    /// mutated to consume any written values.
905    ///
906    /// Note that the returned `Future` must be polled from the event loop of
907    /// the component instance from which this `StreamWriter` originated.  See
908    /// [`Instance::run`] for details.
909    pub fn write_all<T>(
910        self,
911        buffer: B,
912    ) -> impl Future<Output = (Option<StreamWriter<B>>, B)> + Send + 'static
913    where
914        B: WriteBuffer<T>,
915    {
916        let instance = self.instance;
917        super::checked(
918            instance,
919            self.write(buffer).then(|(me, buffer)| async move {
920                if let Some(me) = me {
921                    if buffer.remaining().len() > 0 {
922                        // Note the use of `Box::pin` which is required due to
923                        // the recursive nature of this function.
924                        Box::pin(me.write_all(buffer)).await
925                    } else {
926                        (Some(me), buffer)
927                    }
928                } else {
929                    (None, buffer)
930                }
931            }),
932        )
933    }
934
935    /// Convert this object into a `Future` which will resolve when the read end
936    /// of this `stream` is dropped, plus a `Watch` which can be used to retrieve
937    /// the `StreamWriter` again.
938    ///
939    /// Note that calling `Watch::into_inner` on the returned `Watch` will have
940    /// the side effect of causing the `Future` to resolve immediately if it
941    /// hasn't already.
942    ///
943    /// Also note that the returned `Future` must be polled from the event loop
944    /// of the component instance from which this `StreamWriter` originated.
945    /// See [`Instance::run`] for details.
946    pub fn watch_reader(mut self) -> (impl Future<Output = ()> + Send + 'static, Watch<Self>)
947    where
948        B: Send + 'static,
949    {
950        let (tx, rx) = oneshot::channel();
951        send(&mut self.tx.as_mut().unwrap(), WriteEvent::Watch { tx });
952        let instance = self.instance;
953        watch(instance, rx, self)
954    }
955}
956
957impl<T> Drop for StreamWriter<T> {
958    fn drop(&mut self) {
959        if let Some(mut tx) = self.tx.take() {
960            send(&mut tx, WriteEvent::Drop(None));
961        }
962    }
963}
964
965/// Represents the readable end of a Component Model `stream`.
966///
967/// In order to actually read from or drop this `stream`, first convert it to a
968/// [`FutureReader`] using the `into_reader` method.
969///
970/// Note that if a value of this type is dropped without either being converted
971/// to a `StreamReader` or passed to the guest, any writes on the write end may
972/// block forever.
973pub struct HostStream<T> {
974    instance: Instance,
975    rep: u32,
976    _phantom: PhantomData<T>,
977}
978
979impl<T> HostStream<T> {
980    /// Create a new `HostStream`.
981    fn new(rep: u32, instance: Instance) -> Self {
982        Self {
983            instance,
984            rep,
985            _phantom: PhantomData,
986        }
987    }
988
989    /// Convert this object into a [`StreamReader`].
990    pub fn into_reader<B>(self, mut store: impl AsContextMut) -> StreamReader<B>
991    where
992        T: func::Lower + func::Lift + Send + 'static,
993        B: ReadBuffer<T>,
994    {
995        StreamReader {
996            instance: self.instance,
997            rep: self.rep,
998            tx: Some(self.instance.start_read_event_loop(
999                store.as_context_mut(),
1000                self.rep,
1001                TransmitKind::Stream,
1002            )),
1003        }
1004    }
1005
1006    /// Convert this `HostStream` into a [`Val`].
1007    // See TODO comment for `StreamAny`; this is prone to handle leakage.
1008    pub fn into_val(self) -> Val {
1009        Val::Stream(StreamAny(self.rep))
1010    }
1011
1012    /// Attempt to convert the specified [`Val`] to a `HostStream`.
1013    pub fn from_val(
1014        mut store: impl AsContextMut<Data: Send>,
1015        instance: Instance,
1016        value: &Val,
1017    ) -> Result<Self> {
1018        let Val::Stream(StreamAny(rep)) = value else {
1019            bail!("expected `stream`; got `{}`", value.desc());
1020        };
1021        let store = store.as_context_mut();
1022        instance
1023            .concurrent_state_mut(store.0)
1024            .get(TableId::<TransmitHandle>::new(*rep))?; // Just make sure it's present
1025        Ok(Self::new(*rep, instance))
1026    }
1027
1028    /// Transfer ownership of the read end of a stream from a guest to the host.
1029    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1030        match ty {
1031            InterfaceType::Stream(src) => {
1032                let state_table = cx
1033                    .instance_mut()
1034                    .concurrent_state_mut()
1035                    .state_table(TableIndex::Stream(src));
1036                let (rep, state) =
1037                    get_mut_by_index_from(state_table, TableIndex::Stream(src), index)?;
1038
1039                match state {
1040                    StreamFutureState::Read { done: true } => bail!(
1041                        "cannot lift stream after being notified that the writable end dropped"
1042                    ),
1043                    StreamFutureState::Read { done: false } => {
1044                        state_table.remove_by_index(index)?;
1045                    }
1046                    StreamFutureState::Write { .. } => bail!("cannot transfer write end of stream"),
1047                    StreamFutureState::Busy => bail!("cannot transfer busy stream"),
1048                }
1049
1050                Ok(Self::new(rep, cx.instance_handle()))
1051            }
1052            _ => func::bad_type_info(),
1053        }
1054    }
1055}
1056
1057/// Transfer ownership of the read end of a stream from the host to a guest.
1058pub(crate) fn lower_stream_to_index<U>(
1059    rep: u32,
1060    cx: &mut LowerContext<'_, U>,
1061    ty: InterfaceType,
1062) -> Result<u32> {
1063    match ty {
1064        InterfaceType::Stream(dst) => {
1065            let concurrent_state = cx.instance_mut().concurrent_state_mut();
1066            let state = concurrent_state
1067                .get(TableId::<TransmitHandle>::new(rep))?
1068                .state;
1069            let rep = concurrent_state.get(state)?.read_handle.rep();
1070
1071            concurrent_state
1072                .state_table(TableIndex::Stream(dst))
1073                .insert(
1074                    rep,
1075                    WaitableState::Stream(dst, StreamFutureState::Read { done: false }),
1076                )
1077        }
1078        _ => func::bad_type_info(),
1079    }
1080}
1081
1082// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1083// safe and correct since we lift and lower stream handles as `u32`s.
1084unsafe impl<T: Send + Sync> func::ComponentType for HostStream<T> {
1085    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1086
1087    type Lower = <u32 as func::ComponentType>::Lower;
1088
1089    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1090        match ty {
1091            InterfaceType::Stream(_) => Ok(()),
1092            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1093        }
1094    }
1095}
1096
1097// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1098unsafe impl<T: Send + Sync> func::Lower for HostStream<T> {
1099    fn linear_lower_to_flat<U>(
1100        &self,
1101        cx: &mut LowerContext<'_, U>,
1102        ty: InterfaceType,
1103        dst: &mut MaybeUninit<Self::Lower>,
1104    ) -> Result<()> {
1105        lower_stream_to_index(self.rep, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1106    }
1107
1108    fn linear_lower_to_memory<U>(
1109        &self,
1110        cx: &mut LowerContext<'_, U>,
1111        ty: InterfaceType,
1112        offset: usize,
1113    ) -> Result<()> {
1114        lower_stream_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1115            cx,
1116            InterfaceType::U32,
1117            offset,
1118        )
1119    }
1120}
1121
1122// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1123unsafe impl<T: Send + Sync> func::Lift for HostStream<T> {
1124    fn linear_lift_from_flat(
1125        cx: &mut LiftContext<'_>,
1126        ty: InterfaceType,
1127        src: &Self::Lower,
1128    ) -> Result<Self> {
1129        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1130        Self::lift_from_index(cx, ty, index)
1131    }
1132
1133    fn linear_lift_from_memory(
1134        cx: &mut LiftContext<'_>,
1135        ty: InterfaceType,
1136        bytes: &[u8],
1137    ) -> Result<Self> {
1138        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1139        Self::lift_from_index(cx, ty, index)
1140    }
1141}
1142
1143impl<T, B> From<StreamReader<B>> for HostStream<T> {
1144    fn from(mut value: StreamReader<B>) -> Self {
1145        value.tx.take();
1146
1147        Self {
1148            instance: value.instance,
1149            rep: value.rep,
1150            _phantom: PhantomData,
1151        }
1152    }
1153}
1154
1155/// Represents the readable end of a Component Model `stream`.
1156///
1157/// In order to pass this end to guest code, first convert it to a
1158/// [`HostStream`] using the `into` method.
1159pub struct StreamReader<B> {
1160    instance: Instance,
1161    rep: u32,
1162    tx: Option<mpsc::Sender<ReadEvent<B>>>,
1163}
1164
1165impl<B> StreamReader<B> {
1166    fn new(rep: u32, tx: Option<mpsc::Sender<ReadEvent<B>>>, instance: Instance) -> Self {
1167        Self { instance, rep, tx }
1168    }
1169
1170    /// Read values from this `stream`.
1171    ///
1172    /// The returned `Future` will yield a `(Some(_), _)` if the read completed
1173    /// (possibly with zero items if the write was empty).  It will return
1174    /// `(None, _)` if the read failed due to the closure of the write end.  In
1175    /// either case, the returned buffer will be the same one passed as a
1176    /// parameter, with zero or more items added.
1177    ///
1178    /// Note that the returned `Future` must be polled from the event loop of
1179    /// the component instance from which this `StreamReader` originated.  See
1180    /// [`Instance::run`] for details.
1181    pub fn read(
1182        mut self,
1183        buffer: B,
1184    ) -> impl Future<Output = (Option<StreamReader<B>>, B)> + Send + 'static
1185    where
1186        B: Send + 'static,
1187    {
1188        let (tx, rx) = oneshot::channel();
1189        send(self.tx.as_mut().unwrap(), ReadEvent::Read { buffer, tx });
1190        let instance = self.instance;
1191        super::checked(
1192            instance,
1193            rx.map(move |v| match v {
1194                Ok(HostResult { buffer, dropped }) => ((!dropped).then_some(self), buffer),
1195                Err(_) => {
1196                    todo!("guarantee buffer recovery if event loop errors or panics")
1197                }
1198            }),
1199        )
1200    }
1201
1202    /// Convert this object into a `Future` which will resolve when the write
1203    /// end of this `stream` is dropped, plus a `Watch` which can be used to
1204    /// retrieve the `StreamReader` again.
1205    ///
1206    /// Note that calling `Watch::into_inner` on the returned `Watch` will have
1207    /// the side effect of causing the `Future` to resolve immediately if it
1208    /// hasn't already.
1209    ///
1210    /// Also note that the returned `Future` must be polled from the event loop
1211    /// of the component instance from which this `StreamReader` originated.
1212    /// See [`Instance::run`] for details.
1213    pub fn watch_writer(mut self) -> (impl Future<Output = ()> + Send + 'static, Watch<Self>)
1214    where
1215        B: Send + 'static,
1216    {
1217        let (tx, rx) = oneshot::channel();
1218        send(&mut self.tx.as_mut().unwrap(), ReadEvent::Watch { tx });
1219        let instance = self.instance;
1220        watch(instance, rx, self)
1221    }
1222}
1223
1224impl<B> Drop for StreamReader<B> {
1225    fn drop(&mut self) {
1226        if let Some(mut tx) = self.tx.take() {
1227            send(&mut tx, ReadEvent::Drop);
1228        }
1229    }
1230}
1231
1232/// Represents a Component Model `error-context`.
1233pub struct ErrorContext {
1234    rep: u32,
1235}
1236
1237impl ErrorContext {
1238    pub(crate) fn new(rep: u32) -> Self {
1239        Self { rep }
1240    }
1241
1242    /// Convert this `ErrorContext` into a [`Val`].
1243    pub fn into_val(self) -> Val {
1244        Val::ErrorContext(ErrorContextAny(self.rep))
1245    }
1246
1247    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1248    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1249        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1250            bail!("expected `error-context`; got `{}`", value.desc());
1251        };
1252        Ok(Self::new(*rep))
1253    }
1254
1255    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1256        match ty {
1257            InterfaceType::ErrorContext(src) => {
1258                let (rep, _) = cx
1259                    .instance_mut()
1260                    .concurrent_state_mut()
1261                    .error_context_tables
1262                    .get_mut(src)
1263                    .expect("error context table index present in (sub)component table during lift")
1264                    .get_mut_by_index(index)?;
1265
1266                Ok(Self { rep })
1267            }
1268            _ => func::bad_type_info(),
1269        }
1270    }
1271}
1272
1273pub(crate) fn lower_error_context_to_index<U>(
1274    rep: u32,
1275    cx: &mut LowerContext<'_, U>,
1276    ty: InterfaceType,
1277) -> Result<u32> {
1278    match ty {
1279        InterfaceType::ErrorContext(dst) => {
1280            let tbl = cx
1281                .instance_mut()
1282                .concurrent_state_mut()
1283                .error_context_tables
1284                .get_mut(dst)
1285                .expect("error context table index present in (sub)component table during lower");
1286
1287            if let Some((dst_idx, dst_state)) = tbl.get_mut_by_rep(rep) {
1288                dst_state.0 += 1;
1289                Ok(dst_idx)
1290            } else {
1291                tbl.insert(rep, LocalErrorContextRefCount(1))
1292            }
1293        }
1294        _ => func::bad_type_info(),
1295    }
1296}
1297
1298// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1299// safe and correct since we lift and lower future handles as `u32`s.
1300unsafe impl func::ComponentType for ErrorContext {
1301    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1302
1303    type Lower = <u32 as func::ComponentType>::Lower;
1304
1305    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1306        match ty {
1307            InterfaceType::ErrorContext(_) => Ok(()),
1308            other => bail!("expected `error`, found `{}`", func::desc(other)),
1309        }
1310    }
1311}
1312
1313// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1314unsafe impl func::Lower for ErrorContext {
1315    fn linear_lower_to_flat<T>(
1316        &self,
1317        cx: &mut LowerContext<'_, T>,
1318        ty: InterfaceType,
1319        dst: &mut MaybeUninit<Self::Lower>,
1320    ) -> Result<()> {
1321        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1322            cx,
1323            InterfaceType::U32,
1324            dst,
1325        )
1326    }
1327
1328    fn linear_lower_to_memory<T>(
1329        &self,
1330        cx: &mut LowerContext<'_, T>,
1331        ty: InterfaceType,
1332        offset: usize,
1333    ) -> Result<()> {
1334        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1335            cx,
1336            InterfaceType::U32,
1337            offset,
1338        )
1339    }
1340}
1341
1342// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1343unsafe impl func::Lift for ErrorContext {
1344    fn linear_lift_from_flat(
1345        cx: &mut LiftContext<'_>,
1346        ty: InterfaceType,
1347        src: &Self::Lower,
1348    ) -> Result<Self> {
1349        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1350        Self::lift_from_index(cx, ty, index)
1351    }
1352
1353    fn linear_lift_from_memory(
1354        cx: &mut LiftContext<'_>,
1355        ty: InterfaceType,
1356        bytes: &[u8],
1357    ) -> Result<Self> {
1358        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1359        Self::lift_from_index(cx, ty, index)
1360    }
1361}
1362
1363/// Represents the read or write end of a stream or future.
1364pub(super) struct TransmitHandle {
1365    pub(super) common: WaitableCommon,
1366    /// See `TransmitState`
1367    state: TableId<TransmitState>,
1368}
1369
1370impl TransmitHandle {
1371    fn new(state: TableId<TransmitState>) -> Self {
1372        Self {
1373            common: WaitableCommon::default(),
1374            state,
1375        }
1376    }
1377}
1378
1379impl TableDebug for TransmitHandle {
1380    fn type_name() -> &'static str {
1381        "TransmitHandle"
1382    }
1383}
1384
1385/// Represents the state of a stream or future.
1386struct TransmitState {
1387    /// The write end of the stream or future.
1388    write_handle: TableId<TransmitHandle>,
1389    /// The read end of the stream or future.
1390    read_handle: TableId<TransmitHandle>,
1391    /// See `WriteState`
1392    write: WriteState,
1393    /// See `ReadState`
1394    read: ReadState,
1395    /// The `Sender`, if any, to be dropped when the write end of the stream or
1396    /// future is dropped.
1397    ///
1398    /// This will signal to the host-owned read end that the write end has been
1399    /// dropped.
1400    writer_watcher: Option<oneshot::Sender<()>>,
1401    /// Like `writer_watcher`, but for the reverse direction.
1402    reader_watcher: Option<oneshot::Sender<()>>,
1403    /// Whether futher values may be transmitted via this stream or future.
1404    done: bool,
1405}
1406
1407impl Default for TransmitState {
1408    fn default() -> Self {
1409        Self {
1410            write_handle: TableId::new(0),
1411            read_handle: TableId::new(0),
1412            read: ReadState::Open,
1413            write: WriteState::Open,
1414            reader_watcher: None,
1415            writer_watcher: None,
1416            done: false,
1417        }
1418    }
1419}
1420
1421impl TableDebug for TransmitState {
1422    fn type_name() -> &'static str {
1423        "TransmitState"
1424    }
1425}
1426
1427/// Represents the state of the write end of a stream or future.
1428enum WriteState {
1429    /// The write end is open, but no write is pending.
1430    Open,
1431    /// The write end is owned by a guest task and a write is pending.
1432    GuestReady {
1433        ty: TableIndex,
1434        flat_abi: Option<FlatAbi>,
1435        options: Options,
1436        address: usize,
1437        count: usize,
1438        handle: u32,
1439        post_write: PostWrite,
1440    },
1441    /// The write end is owned by a host task and a write is pending.
1442    HostReady {
1443        accept:
1444            Box<dyn FnOnce(&mut dyn VMStore, Instance, Reader) -> Result<ReturnCode> + Send + Sync>,
1445        post_write: PostWrite,
1446    },
1447    /// The write end has been dropped.
1448    Dropped,
1449}
1450
1451impl fmt::Debug for WriteState {
1452    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1453        match self {
1454            Self::Open => f.debug_tuple("Open").finish(),
1455            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1456            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1457            Self::Dropped => f.debug_tuple("Dropped").finish(),
1458        }
1459    }
1460}
1461
1462/// Represents the state of the read end of a stream or future.
1463enum ReadState {
1464    /// The read end is open, but no read is pending.
1465    Open,
1466    /// The read end is owned by a guest task and a read is pending.
1467    GuestReady {
1468        ty: TableIndex,
1469        flat_abi: Option<FlatAbi>,
1470        options: Options,
1471        address: usize,
1472        count: usize,
1473        handle: u32,
1474    },
1475    /// The read end is owned by a host task and a read is pending.
1476    HostReady {
1477        accept: Box<dyn FnOnce(Writer) -> Result<ReturnCode> + Send + Sync>,
1478    },
1479    /// The read end has been dropped.
1480    Dropped,
1481}
1482
1483impl fmt::Debug for ReadState {
1484    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1485        match self {
1486            Self::Open => f.debug_tuple("Open").finish(),
1487            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1488            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1489            Self::Dropped => f.debug_tuple("Dropped").finish(),
1490        }
1491    }
1492}
1493
1494/// Parameter type to pass to a `ReadState::HostReady` closure.
1495///
1496/// See also `accept_writer`.
1497enum Writer<'a> {
1498    /// The write end is owned by a guest task.
1499    Guest {
1500        lift: &'a mut LiftContext<'a>,
1501        ty: Option<InterfaceType>,
1502        address: usize,
1503        count: usize,
1504    },
1505    /// The write end is owned by the host.
1506    Host {
1507        buffer: &'a mut UntypedWriteBuffer<'a>,
1508        count: usize,
1509    },
1510    /// The write end has been dropped.
1511    End,
1512}
1513
1514/// Parameter type to pass to a `WriteState::HostReady` closure.
1515///
1516/// See also `accept_reader`.
1517enum Reader<'a> {
1518    /// The read end is owned by a guest task.
1519    Guest {
1520        options: &'a Options,
1521        ty: TableIndex,
1522        address: usize,
1523        count: usize,
1524    },
1525    /// The read end is owned by the host.
1526    Host {
1527        accept: Box<dyn FnOnce(&mut UntypedWriteBuffer, usize) -> usize>,
1528    },
1529    /// The read end has been dropped.
1530    End,
1531}
1532
1533impl Instance {
1534    /// Create a new Component Model `future` as pair of writable and readable ends,
1535    /// the latter of which may be passed to guest code.
1536    ///
1537    /// The `default` parameter will be used if the returned `FutureWriter` is
1538    /// dropped before `FutureWriter::write` is called.  Since the write end of
1539    /// a Component Model `future` must be written to before it is dropped, and
1540    /// since Rust does not currently provide a way to statically enforce that
1541    /// (e.g. linear typing), we use this mechanism to ensure a value is always
1542    /// written prior to closing.
1543    ///
1544    /// If there's no plausible default value, and you're sure
1545    /// `FutureWriter::write` will be called, you can consider passing `||
1546    /// unreachable!()` as the `default` parameter.
1547    pub fn future<T: func::Lower + func::Lift + Send + Sync + 'static>(
1548        self,
1549        default: fn() -> T,
1550        mut store: impl AsContextMut,
1551    ) -> Result<(FutureWriter<T>, FutureReader<T>)> {
1552        let mut store = store.as_context_mut();
1553        let (write, read) = self.concurrent_state_mut(store.0).new_transmit()?;
1554
1555        Ok((
1556            FutureWriter::new(
1557                default,
1558                Some(self.start_write_event_loop(
1559                    store.as_context_mut(),
1560                    write.rep(),
1561                    TransmitKind::Future,
1562                )),
1563                self,
1564            ),
1565            FutureReader::new(
1566                read.rep(),
1567                Some(self.start_read_event_loop(
1568                    store.as_context_mut(),
1569                    read.rep(),
1570                    TransmitKind::Future,
1571                )),
1572                self,
1573            ),
1574        ))
1575    }
1576
1577    /// Create a new Component Model `stream` as pair of writable and readable ends,
1578    /// the latter of which may be passed to guest code.
1579    pub fn stream<
1580        T: func::Lower + func::Lift + Send + 'static,
1581        W: WriteBuffer<T>,
1582        R: ReadBuffer<T>,
1583    >(
1584        self,
1585        mut store: impl AsContextMut,
1586    ) -> Result<(StreamWriter<W>, StreamReader<R>)> {
1587        let mut store = store.as_context_mut();
1588        let (write, read) = self.concurrent_state_mut(store.0).new_transmit()?;
1589
1590        Ok((
1591            StreamWriter::new(
1592                Some(self.start_write_event_loop(
1593                    store.as_context_mut(),
1594                    write.rep(),
1595                    TransmitKind::Stream,
1596                )),
1597                self,
1598            ),
1599            StreamReader::new(
1600                read.rep(),
1601                Some(self.start_read_event_loop(
1602                    store.as_context_mut(),
1603                    read.rep(),
1604                    TransmitKind::Stream,
1605                )),
1606                self,
1607            ),
1608        ))
1609    }
1610
1611    /// Spawn a background task to be polled in this instance's event loop.
1612    ///
1613    /// The spawned task will accept host events from the `Receiver` corresponding to
1614    /// the returned `Sender`, handling each event it receives and then exiting
1615    /// when the channel is dropped.
1616    ///
1617    /// We handle `StreamWriter` and `FutureWriter` operations this way so that
1618    /// they can be initiated without access to the store and possibly outside
1619    /// the instance's event loop, improving the ergonmics for host embedders.
1620    fn start_write_event_loop<
1621        T: func::Lower + func::Lift + Send + 'static,
1622        B: WriteBuffer<T>,
1623        U,
1624    >(
1625        self,
1626        mut store: StoreContextMut<U>,
1627        rep: u32,
1628        kind: TransmitKind,
1629    ) -> mpsc::Sender<WriteEvent<B>> {
1630        let (tx, mut rx) = mpsc::channel(1);
1631        let id = TableId::<TransmitHandle>::new(rep);
1632        let run_on_drop =
1633            RunOnDrop::new(move || log::trace!("write event loop for {id:?} dropped"));
1634        let token = StoreToken::new(store.as_context_mut());
1635        let task = Box::pin(
1636            async move {
1637                log::trace!("write event loop for {id:?} started");
1638                let mut my_rep = None;
1639                while let Some(event) = rx.next().await {
1640                    if my_rep.is_none() {
1641                        my_rep = Some(self.get_state_rep(rep)?);
1642                    }
1643                    let rep = my_rep.unwrap();
1644                    match event {
1645                        WriteEvent::Write { buffer, tx } => tls::get(|store| {
1646                            self.host_write::<_, _, U>(
1647                                token.as_context_mut(store),
1648                                rep,
1649                                buffer,
1650                                PostWrite::Continue,
1651                                tx,
1652                                kind,
1653                            )
1654                        })?,
1655                        WriteEvent::Drop(default) => tls::get(|store| {
1656                            if let Some(default) = default {
1657                                self.host_write::<_, _, U>(
1658                                    token.as_context_mut(store),
1659                                    rep,
1660                                    default(),
1661                                    PostWrite::Continue,
1662                                    oneshot::channel().0,
1663                                    kind,
1664                                )?;
1665                            }
1666                            self.concurrent_state_mut(store).host_drop_writer(rep, kind)
1667                        })?,
1668                        WriteEvent::Watch { tx } => tls::get(|store| {
1669                            let state =
1670                                self.concurrent_state_mut(store)
1671                                    .get_mut(TableId::<TransmitState>::new(rep))?;
1672                            if !matches!(&state.read, ReadState::Dropped) {
1673                                state.reader_watcher = Some(tx);
1674                            }
1675                            Ok::<_, anyhow::Error>(())
1676                        })?,
1677                    }
1678                }
1679                Ok(())
1680            }
1681            .map(move |v| {
1682                run_on_drop.cancel();
1683                log::trace!("write event loop for {id:?} finished: {v:?}");
1684                HostTaskOutput::Result(v)
1685            }),
1686        );
1687        self.concurrent_state_mut(store.0).push_future(task);
1688        tx
1689    }
1690
1691    /// Same as `Self::start_write_event_loop`, but for the read end of a stream
1692    /// or future.
1693    fn start_read_event_loop<T: func::Lower + func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
1694        self,
1695        mut store: StoreContextMut<U>,
1696        rep: u32,
1697        kind: TransmitKind,
1698    ) -> mpsc::Sender<ReadEvent<B>> {
1699        let (tx, mut rx) = mpsc::channel(1);
1700        let id = TableId::<TransmitHandle>::new(rep);
1701        let run_on_drop = RunOnDrop::new(move || log::trace!("read event loop for {id:?} dropped"));
1702        let token = StoreToken::new(store.as_context_mut());
1703        let task = Box::pin(
1704            async move {
1705                log::trace!("read event loop for {id:?} started");
1706                let mut my_rep = None;
1707                while let Some(event) = rx.next().await {
1708                    if my_rep.is_none() {
1709                        my_rep = Some(self.get_state_rep(rep)?);
1710                    }
1711                    let rep = my_rep.unwrap();
1712                    match event {
1713                        ReadEvent::Read { buffer, tx } => tls::get(|store| {
1714                            self.host_read::<_, _, U>(
1715                                token.as_context_mut(store),
1716                                rep,
1717                                buffer,
1718                                tx,
1719                                kind,
1720                            )
1721                        })?,
1722                        ReadEvent::Drop => {
1723                            tls::get(|store| self.host_drop_reader(store, rep, kind))?
1724                        }
1725                        ReadEvent::Watch { tx } => tls::get(|store| {
1726                            let state =
1727                                self.concurrent_state_mut(store)
1728                                    .get_mut(TableId::<TransmitState>::new(rep))?;
1729                            if !matches!(
1730                                &state.write,
1731                                WriteState::Dropped
1732                                    | WriteState::GuestReady {
1733                                        post_write: PostWrite::Drop,
1734                                        ..
1735                                    }
1736                                    | WriteState::HostReady {
1737                                        post_write: PostWrite::Drop,
1738                                        ..
1739                                    }
1740                            ) {
1741                                state.writer_watcher = Some(tx);
1742                            }
1743                            Ok::<_, anyhow::Error>(())
1744                        })?,
1745                    }
1746                }
1747                Ok(())
1748            }
1749            .map(move |v| {
1750                run_on_drop.cancel();
1751                log::trace!("read event loop for {id:?} finished: {v:?}");
1752                HostTaskOutput::Result(v)
1753            }),
1754        );
1755        self.concurrent_state_mut(store.0).push_future(task);
1756        tx
1757    }
1758
1759    /// Write to the specified stream or future from the host.
1760    ///
1761    /// # Arguments
1762    ///
1763    /// * `store` - The store to which this instance belongs
1764    /// * `transmit_rep` - The `TransmitState` rep for the stream or future
1765    /// * `buffer` - Buffer of values that should be written
1766    /// * `post_write` - Whether the transmit should be dropped after write, possibly with an error context
1767    /// * `tx` - Oneshot channel to notify when operation completes (or drop on error)
1768    /// * `kind` - whether this is a stream or a future
1769    fn host_write<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U>(
1770        self,
1771        mut store: StoreContextMut<U>,
1772        transmit_rep: u32,
1773        mut buffer: B,
1774        mut post_write: PostWrite,
1775        tx: oneshot::Sender<HostResult<B>>,
1776        kind: TransmitKind,
1777    ) -> Result<()> {
1778        let mut store = store.as_context_mut();
1779        let transmit_id = TableId::<TransmitState>::new(transmit_rep);
1780        let transmit = self
1781            .concurrent_state_mut(store.0)
1782            .get_mut(transmit_id)
1783            .with_context(|| format!("retrieving state for transmit [{transmit_rep}]"))?;
1784        log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read);
1785
1786        let new_state = if let ReadState::Dropped = &transmit.read {
1787            ReadState::Dropped
1788        } else {
1789            ReadState::Open
1790        };
1791
1792        match mem::replace(&mut transmit.read, new_state) {
1793            ReadState::Open => {
1794                assert!(matches!(&transmit.write, WriteState::Open));
1795
1796                let state = WriteState::HostReady {
1797                    accept: Box::new(accept_reader::<T, B, U>(
1798                        store.as_context_mut(),
1799                        buffer,
1800                        tx,
1801                        kind,
1802                    )),
1803                    post_write,
1804                };
1805                self.concurrent_state_mut(store.0)
1806                    .get_mut(transmit_id)?
1807                    .write = state;
1808                post_write = PostWrite::Continue;
1809            }
1810
1811            ReadState::GuestReady {
1812                ty,
1813                flat_abi: _,
1814                options,
1815                address,
1816                count,
1817                handle,
1818                ..
1819            } => {
1820                if let TransmitKind::Future = kind {
1821                    transmit.done = true;
1822                }
1823
1824                let read_handle = transmit.read_handle;
1825                let code = accept_reader::<T, B, U>(store.as_context_mut(), buffer, tx, kind)(
1826                    store.0.traitobj_mut(),
1827                    self,
1828                    Reader::Guest {
1829                        options: &options,
1830                        ty,
1831                        address,
1832                        count,
1833                    },
1834                )?;
1835
1836                self.concurrent_state_mut(store.0).set_event(
1837                    read_handle.rep(),
1838                    match ty {
1839                        TableIndex::Future(ty) => Event::FutureRead {
1840                            code,
1841                            pending: Some((ty, handle)),
1842                        },
1843                        TableIndex::Stream(ty) => Event::StreamRead {
1844                            code,
1845                            pending: Some((ty, handle)),
1846                        },
1847                    },
1848                )?;
1849            }
1850
1851            ReadState::HostReady { accept } => {
1852                let count = buffer.remaining().len();
1853                let mut untyped = UntypedWriteBuffer::new(&mut buffer);
1854                let code = accept(Writer::Host {
1855                    buffer: &mut untyped,
1856                    count,
1857                })?;
1858                let (ReturnCode::Completed(_) | ReturnCode::Dropped(_)) = code else {
1859                    unreachable!()
1860                };
1861
1862                _ = tx.send(HostResult {
1863                    buffer,
1864                    dropped: false,
1865                });
1866            }
1867
1868            ReadState::Dropped => {
1869                _ = tx.send(HostResult {
1870                    buffer,
1871                    dropped: true,
1872                });
1873            }
1874        }
1875
1876        if let PostWrite::Drop = post_write {
1877            self.concurrent_state_mut(store.0)
1878                .host_drop_writer(transmit_rep, kind)?;
1879        }
1880
1881        Ok(())
1882    }
1883
1884    /// Read from the specified stream or future from the host.
1885    ///
1886    /// # Arguments
1887    ///
1888    /// * `store` - The store to which this instance belongs
1889    /// * `rep` - The `TransmitState` rep for the stream or future
1890    /// * `buffer` - Buffer to receive values
1891    /// * `tx` - Oneshot channel to notify when operation completes (or drop on error)
1892    /// * `kind` - whether this is a stream or a future
1893    fn host_read<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
1894        self,
1895        mut store: StoreContextMut<U>,
1896        rep: u32,
1897        mut buffer: B,
1898        tx: oneshot::Sender<HostResult<B>>,
1899        kind: TransmitKind,
1900    ) -> Result<()> {
1901        let store = store.as_context_mut();
1902        let transmit_id = TableId::<TransmitState>::new(rep);
1903        let transmit = self
1904            .concurrent_state_mut(store.0)
1905            .get_mut(transmit_id)
1906            .with_context(|| rep.to_string())?;
1907        log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write);
1908
1909        let new_state = if let WriteState::Dropped = &transmit.write {
1910            WriteState::Dropped
1911        } else {
1912            WriteState::Open
1913        };
1914
1915        match mem::replace(&mut transmit.write, new_state) {
1916            WriteState::Open => {
1917                assert!(matches!(&transmit.read, ReadState::Open));
1918
1919                transmit.read = ReadState::HostReady {
1920                    accept: Box::new(accept_writer::<T, B, U>(buffer, tx, kind)),
1921                };
1922            }
1923
1924            WriteState::GuestReady {
1925                ty,
1926                flat_abi: _,
1927                options,
1928                address,
1929                count,
1930                handle,
1931                post_write,
1932                ..
1933            } => {
1934                if let TableIndex::Future(_) = ty {
1935                    transmit.done = true;
1936                }
1937
1938                let write_handle = transmit.write_handle;
1939                let types = self.id().get(store.0).component().types().clone();
1940                let lift =
1941                    &mut LiftContext::new(store.0.store_opaque_mut(), &options, &types, self);
1942                let code = accept_writer::<T, B, U>(buffer, tx, kind)(Writer::Guest {
1943                    lift,
1944                    ty: payload(ty, &types),
1945                    address,
1946                    count,
1947                })?;
1948
1949                let state = self.concurrent_state_mut(store.0);
1950                let pending = if let PostWrite::Drop = post_write {
1951                    state.get_mut(transmit_id)?.write = WriteState::Dropped;
1952                    false
1953                } else {
1954                    true
1955                };
1956
1957                state.set_event(
1958                    write_handle.rep(),
1959                    match ty {
1960                        TableIndex::Future(ty) => Event::FutureWrite {
1961                            code,
1962                            pending: pending.then_some((ty, handle)),
1963                        },
1964                        TableIndex::Stream(ty) => Event::StreamWrite {
1965                            code,
1966                            pending: pending.then_some((ty, handle)),
1967                        },
1968                    },
1969                )?;
1970            }
1971
1972            WriteState::HostReady { accept, post_write } => {
1973                accept(
1974                    store.0.traitobj_mut(),
1975                    self,
1976                    Reader::Host {
1977                        accept: Box::new(move |input, count| {
1978                            let count = count.min(buffer.remaining_capacity());
1979                            buffer.move_from(input.get_mut::<T>(), count);
1980                            _ = tx.send(HostResult {
1981                                buffer,
1982                                dropped: false,
1983                            });
1984                            count
1985                        }),
1986                    },
1987                )?;
1988
1989                if let PostWrite::Drop = post_write {
1990                    self.concurrent_state_mut(store.0)
1991                        .get_mut(transmit_id)?
1992                        .write = WriteState::Dropped;
1993                }
1994            }
1995
1996            WriteState::Dropped => {
1997                _ = tx.send(HostResult {
1998                    buffer,
1999                    dropped: true,
2000                });
2001            }
2002        }
2003
2004        Ok(())
2005    }
2006
2007    /// Drop the read end of a stream or future read from the host.
2008    ///
2009    /// # Arguments
2010    ///
2011    /// * `store` - The store to which this instance belongs
2012    /// * `transmit_rep` - The `TransmitState` rep for the stream or future.
2013    fn host_drop_reader(
2014        self,
2015        store: &mut dyn VMStore,
2016        transmit_rep: u32,
2017        kind: TransmitKind,
2018    ) -> Result<()> {
2019        let transmit_id = TableId::<TransmitState>::new(transmit_rep);
2020        let state = self.concurrent_state_mut(store);
2021        let transmit = state
2022            .get_mut(transmit_id)
2023            .with_context(|| format!("error closing reader {transmit_rep}"))?;
2024        log::trace!(
2025            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2026            transmit.read,
2027            transmit.write
2028        );
2029
2030        transmit.read = ReadState::Dropped;
2031        transmit.reader_watcher = None;
2032
2033        // If the write end is already dropped, it should stay dropped,
2034        // otherwise, it should be opened.
2035        let new_state = if let WriteState::Dropped = &transmit.write {
2036            WriteState::Dropped
2037        } else {
2038            WriteState::Open
2039        };
2040
2041        let write_handle = transmit.write_handle;
2042
2043        match mem::replace(&mut transmit.write, new_state) {
2044            // If a guest is waiting to write, notify it that the read end has
2045            // been dropped.
2046            WriteState::GuestReady {
2047                ty,
2048                handle,
2049                post_write,
2050                ..
2051            } => {
2052                if let PostWrite::Drop = post_write {
2053                    state.delete_transmit(transmit_id)?;
2054                } else {
2055                    state.update_event(
2056                        write_handle.rep(),
2057                        match ty {
2058                            TableIndex::Future(ty) => Event::FutureWrite {
2059                                code: ReturnCode::Dropped(0),
2060                                pending: Some((ty, handle)),
2061                            },
2062                            TableIndex::Stream(ty) => Event::StreamWrite {
2063                                code: ReturnCode::Dropped(0),
2064                                pending: Some((ty, handle)),
2065                            },
2066                        },
2067                    )?;
2068                };
2069            }
2070
2071            WriteState::HostReady { accept, .. } => {
2072                accept(store, self, Reader::End)?;
2073            }
2074
2075            WriteState::Open => {
2076                state.update_event(
2077                    write_handle.rep(),
2078                    match kind {
2079                        TransmitKind::Future => Event::FutureWrite {
2080                            code: ReturnCode::Dropped(0),
2081                            pending: None,
2082                        },
2083                        TransmitKind::Stream => Event::StreamWrite {
2084                            code: ReturnCode::Dropped(0),
2085                            pending: None,
2086                        },
2087                    },
2088                )?;
2089            }
2090
2091            WriteState::Dropped => {
2092                log::trace!("host_drop_reader delete {transmit_rep}");
2093                state.delete_transmit(transmit_id)?;
2094            }
2095        }
2096        Ok(())
2097    }
2098
2099    /// Copy `count` items from `read_address` to `write_address` for the
2100    /// specified stream or future.
2101    fn copy<T: 'static>(
2102        self,
2103        mut store: StoreContextMut<T>,
2104        flat_abi: Option<FlatAbi>,
2105        write_ty: TableIndex,
2106        write_options: &Options,
2107        write_address: usize,
2108        read_ty: TableIndex,
2109        read_options: &Options,
2110        read_address: usize,
2111        count: usize,
2112        rep: u32,
2113    ) -> Result<()> {
2114        let types = self.id().get(store.0).component().types().clone();
2115        match (write_ty, read_ty) {
2116            (TableIndex::Future(write_ty), TableIndex::Future(read_ty)) => {
2117                assert_eq!(count, 1);
2118
2119                let val = types[types[write_ty].ty]
2120                    .payload
2121                    .map(|ty| {
2122                        let abi = types.canonical_abi(&ty);
2123                        // FIXME: needs to read an i64 for memory64
2124                        if write_address % usize::try_from(abi.align32)? != 0 {
2125                            bail!("write pointer not aligned");
2126                        }
2127
2128                        let lift = &mut LiftContext::new(
2129                            store.0.store_opaque_mut(),
2130                            write_options,
2131                            &types,
2132                            self,
2133                        );
2134                        let bytes = lift
2135                            .memory()
2136                            .get(write_address..)
2137                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2138                            .ok_or_else(|| {
2139                                anyhow::anyhow!("write pointer out of bounds of memory")
2140                            })?;
2141
2142                        Val::load(lift, ty, bytes)
2143                    })
2144                    .transpose()?;
2145
2146                if let Some(val) = val {
2147                    let lower =
2148                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2149                    let ty = types[types[read_ty].ty].payload.unwrap();
2150                    let ptr = func::validate_inbounds_dynamic(
2151                        types.canonical_abi(&ty),
2152                        lower.as_slice_mut(),
2153                        &ValRaw::u32(read_address.try_into().unwrap()),
2154                    )?;
2155                    val.store(lower, ty, ptr)?;
2156                }
2157            }
2158            (TableIndex::Stream(write_ty), TableIndex::Stream(read_ty)) => {
2159                if let Some(flat_abi) = flat_abi {
2160                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
2161                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2162                    if length_in_bytes > 0 {
2163                        if write_address % usize::try_from(flat_abi.align)? != 0 {
2164                            bail!("write pointer not aligned");
2165                        }
2166                        if read_address % usize::try_from(flat_abi.align)? != 0 {
2167                            bail!("read pointer not aligned");
2168                        }
2169
2170                        let store_opaque = store.0.store_opaque_mut();
2171
2172                        {
2173                            let src = write_options
2174                                .memory(store_opaque)
2175                                .get(write_address..)
2176                                .and_then(|b| b.get(..length_in_bytes))
2177                                .ok_or_else(|| {
2178                                    anyhow::anyhow!("write pointer out of bounds of memory")
2179                                })?
2180                                .as_ptr();
2181                            let dst = read_options
2182                                .memory_mut(store_opaque)
2183                                .get_mut(read_address..)
2184                                .and_then(|b| b.get_mut(..length_in_bytes))
2185                                .ok_or_else(|| {
2186                                    anyhow::anyhow!("read pointer out of bounds of memory")
2187                                })?
2188                                .as_mut_ptr();
2189                            // SAFETY: Both `src` and `dst` have been validated
2190                            // above.
2191                            unsafe { src.copy_to(dst, length_in_bytes) };
2192                        }
2193                    }
2194                } else {
2195                    let store_opaque = store.0.store_opaque_mut();
2196                    let lift = &mut LiftContext::new(store_opaque, write_options, &types, self);
2197                    let ty = types[types[write_ty].ty].payload.unwrap();
2198                    let abi = lift.types.canonical_abi(&ty);
2199                    let size = usize::try_from(abi.size32).unwrap();
2200                    if write_address % usize::try_from(abi.align32)? != 0 {
2201                        bail!("write pointer not aligned");
2202                    }
2203                    let bytes = lift
2204                        .memory()
2205                        .get(write_address..)
2206                        .and_then(|b| b.get(..size * count))
2207                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2208
2209                    let values = (0..count)
2210                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2211                        .collect::<Result<Vec<_>>>()?;
2212
2213                    let id = TableId::<TransmitHandle>::new(rep);
2214                    log::trace!("copy values {values:?} for {id:?}");
2215
2216                    let lower =
2217                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2218                    let ty = types[types[read_ty].ty].payload.unwrap();
2219                    let abi = lower.types.canonical_abi(&ty);
2220                    if read_address % usize::try_from(abi.align32)? != 0 {
2221                        bail!("read pointer not aligned");
2222                    }
2223                    let size = usize::try_from(abi.size32).unwrap();
2224                    lower
2225                        .as_slice_mut()
2226                        .get_mut(read_address..)
2227                        .and_then(|b| b.get_mut(..size * count))
2228                        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2229                    let mut ptr = read_address;
2230                    for value in values {
2231                        value.store(lower, ty, ptr)?;
2232                        ptr += size
2233                    }
2234                }
2235            }
2236            _ => unreachable!(),
2237        }
2238
2239        Ok(())
2240    }
2241
2242    /// Write to the specified stream or future from the guest.
2243    ///
2244    /// SAFETY: `memory` and `realloc` must be valid pointers to their
2245    /// respective guest entities.
2246    pub(super) unsafe fn guest_write<T: 'static>(
2247        self,
2248        mut store: StoreContextMut<T>,
2249        memory: *mut VMMemoryDefinition,
2250        realloc: *mut VMFuncRef,
2251        string_encoding: u8,
2252        async_: bool,
2253        ty: TableIndex,
2254        flat_abi: Option<FlatAbi>,
2255        handle: u32,
2256        address: u32,
2257        count: u32,
2258    ) -> Result<ReturnCode> {
2259        if !async_ {
2260            bail!("synchronous stream and future writes not yet supported");
2261        }
2262
2263        let address = usize::try_from(address).unwrap();
2264        let count = usize::try_from(count).unwrap();
2265        // SAFETY: Per this function's contract, `memory` and `realloc` are
2266        // valid.
2267        let options = unsafe {
2268            Options::new(
2269                store.0.store_opaque().id(),
2270                NonNull::new(memory),
2271                NonNull::new(realloc),
2272                StringEncoding::from_u8(string_encoding).unwrap(),
2273                true,
2274                None,
2275            )
2276        };
2277        let concurrent_state = self.concurrent_state_mut(store.0);
2278        let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?;
2279        let StreamFutureState::Write { done } = *state else {
2280            bail!(
2281                "invalid handle {handle}; expected `Write`; got {:?}",
2282                *state
2283            );
2284        };
2285
2286        if done {
2287            bail!("cannot write to stream after being notified that the readable end dropped");
2288        }
2289
2290        *state = StreamFutureState::Busy;
2291        let transmit_handle = TableId::<TransmitHandle>::new(rep);
2292        let transmit_id = concurrent_state.get(transmit_handle)?.state;
2293        let transmit = concurrent_state.get_mut(transmit_id)?;
2294        log::trace!(
2295            "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2296            transmit.read
2297        );
2298
2299        if transmit.done {
2300            bail!("cannot write to future after previous write succeeded or readable end dropped");
2301        }
2302
2303        let new_state = if let ReadState::Dropped = &transmit.read {
2304            ReadState::Dropped
2305        } else {
2306            ReadState::Open
2307        };
2308
2309        let set_guest_ready = |me: &mut ConcurrentState| {
2310            let transmit = me.get_mut(transmit_id)?;
2311            assert!(matches!(&transmit.write, WriteState::Open));
2312            transmit.write = WriteState::GuestReady {
2313                ty,
2314                flat_abi,
2315                options,
2316                address,
2317                count,
2318                handle,
2319                post_write: PostWrite::Continue,
2320            };
2321            Ok::<_, crate::Error>(())
2322        };
2323
2324        let result = match mem::replace(&mut transmit.read, new_state) {
2325            ReadState::GuestReady {
2326                ty: read_ty,
2327                flat_abi: read_flat_abi,
2328                options: read_options,
2329                address: read_address,
2330                count: read_count,
2331                handle: read_handle,
2332            } => {
2333                assert_eq!(flat_abi, read_flat_abi);
2334
2335                if let TableIndex::Future(_) = ty {
2336                    transmit.done = true;
2337                }
2338
2339                // Note that zero-length reads and writes are handling specially
2340                // by the spec to allow each end to signal readiness to the
2341                // other.  Quoting the spec:
2342                //
2343                // ```
2344                // The meaning of a read or write when the length is 0 is that
2345                // the caller is querying the "readiness" of the other
2346                // side. When a 0-length read/write rendezvous with a
2347                // non-0-length read/write, only the 0-length read/write
2348                // completes; the non-0-length read/write is kept pending (and
2349                // ready for a subsequent rendezvous).
2350                //
2351                // In the corner case where a 0-length read and write
2352                // rendezvous, only the writer is notified of readiness. To
2353                // avoid livelock, the Canonical ABI requires that a writer must
2354                // (eventually) follow a completed 0-length write with a
2355                // non-0-length write that is allowed to block (allowing the
2356                // reader end to run and rendezvous with its own non-0-length
2357                // read).
2358                // ```
2359
2360                let write_complete = count == 0 || read_count > 0;
2361                let read_complete = count > 0;
2362                let read_buffer_remaining = count < read_count;
2363
2364                let read_handle_rep = transmit.read_handle.rep();
2365
2366                let count = count.min(read_count);
2367
2368                self.copy(
2369                    store.as_context_mut(),
2370                    flat_abi,
2371                    ty,
2372                    &options,
2373                    address,
2374                    read_ty,
2375                    &read_options,
2376                    read_address,
2377                    count,
2378                    rep,
2379                )?;
2380
2381                let instance = self.id().get_mut(store.0);
2382                let types = instance.component().types();
2383                let item_size = payload(ty, types)
2384                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2385                    .unwrap_or(0);
2386                let concurrent_state = instance.concurrent_state_mut();
2387                if read_complete {
2388                    let count = u32::try_from(count).unwrap();
2389                    let total = if let Some(Event::StreamRead {
2390                        code: ReturnCode::Completed(old_total),
2391                        ..
2392                    }) = concurrent_state.take_event(read_handle_rep)?
2393                    {
2394                        count + old_total
2395                    } else {
2396                        count
2397                    };
2398
2399                    let code = ReturnCode::completed(ty.kind(), total);
2400
2401                    concurrent_state.set_event(
2402                        read_handle_rep,
2403                        match read_ty {
2404                            TableIndex::Future(ty) => Event::FutureRead {
2405                                code,
2406                                pending: Some((ty, read_handle)),
2407                            },
2408                            TableIndex::Stream(ty) => Event::StreamRead {
2409                                code,
2410                                pending: Some((ty, read_handle)),
2411                            },
2412                        },
2413                    )?;
2414                }
2415
2416                if read_buffer_remaining {
2417                    let transmit = concurrent_state.get_mut(transmit_id)?;
2418                    transmit.read = ReadState::GuestReady {
2419                        ty: read_ty,
2420                        flat_abi: read_flat_abi,
2421                        options: read_options,
2422                        address: read_address + (count * item_size),
2423                        count: read_count - count,
2424                        handle: read_handle,
2425                    };
2426                }
2427
2428                if write_complete {
2429                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2430                } else {
2431                    set_guest_ready(concurrent_state)?;
2432                    ReturnCode::Blocked
2433                }
2434            }
2435
2436            ReadState::HostReady { accept } => {
2437                if let TableIndex::Future(_) = ty {
2438                    transmit.done = true;
2439                }
2440
2441                let types = self.id().get(store.0).component().types().clone();
2442                let lift =
2443                    &mut LiftContext::new(store.0.store_opaque_mut(), &options, &types, self);
2444                accept(Writer::Guest {
2445                    lift,
2446                    ty: payload(ty, &types),
2447                    address,
2448                    count,
2449                })?
2450            }
2451
2452            ReadState::Open => {
2453                set_guest_ready(concurrent_state)?;
2454                ReturnCode::Blocked
2455            }
2456
2457            ReadState::Dropped => {
2458                if let TableIndex::Future(_) = ty {
2459                    transmit.done = true;
2460                }
2461
2462                ReturnCode::Dropped(0)
2463            }
2464        };
2465
2466        if result != ReturnCode::Blocked {
2467            let state = self.concurrent_state_mut(store.0);
2468            *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write {
2469                done: matches!(
2470                    (result, ty),
2471                    (ReturnCode::Dropped(_), TableIndex::Stream(_))
2472                ),
2473            };
2474        }
2475
2476        Ok(result)
2477    }
2478
2479    /// Read from the specified stream or future from the guest.
2480    ///
2481    /// SAFETY: `memory` and `realloc` must be valid pointers to their
2482    /// respective guest entities.
2483    pub(super) unsafe fn guest_read<T: 'static>(
2484        self,
2485        mut store: StoreContextMut<T>,
2486        memory: *mut VMMemoryDefinition,
2487        realloc: *mut VMFuncRef,
2488        string_encoding: u8,
2489        async_: bool,
2490        ty: TableIndex,
2491        flat_abi: Option<FlatAbi>,
2492        handle: u32,
2493        address: u32,
2494        count: u32,
2495    ) -> Result<ReturnCode> {
2496        if !async_ {
2497            bail!("synchronous stream and future reads not yet supported");
2498        }
2499
2500        let address = usize::try_from(address).unwrap();
2501        // SAFETY: Per this function's contract, `memory` and `realloc` must be
2502        // valid.
2503        let options = unsafe {
2504            Options::new(
2505                store.0.store_opaque().id(),
2506                NonNull::new(memory),
2507                NonNull::new(realloc),
2508                StringEncoding::from_u8(string_encoding).unwrap(),
2509                true,
2510                None,
2511            )
2512        };
2513        let concurrent_state = self.concurrent_state_mut(store.0);
2514        let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?;
2515        let StreamFutureState::Read { done } = *state else {
2516            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
2517        };
2518
2519        if done {
2520            bail!("cannot read from stream after being notified that the writable end dropped");
2521        }
2522
2523        *state = StreamFutureState::Busy;
2524        let transmit_handle = TableId::<TransmitHandle>::new(rep);
2525        let transmit_id = concurrent_state.get(transmit_handle)?.state;
2526        let transmit = concurrent_state.get_mut(transmit_id)?;
2527        log::trace!(
2528            "guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2529            transmit.write
2530        );
2531
2532        if transmit.done {
2533            bail!("cannot read from future after previous read succeeded");
2534        }
2535
2536        let new_state = if let WriteState::Dropped = &transmit.write {
2537            WriteState::Dropped
2538        } else {
2539            WriteState::Open
2540        };
2541
2542        let set_guest_ready = |me: &mut ConcurrentState| {
2543            let transmit = me.get_mut(transmit_id)?;
2544            assert!(matches!(&transmit.read, ReadState::Open));
2545            transmit.read = ReadState::GuestReady {
2546                ty,
2547                flat_abi,
2548                options,
2549                address,
2550                count: usize::try_from(count).unwrap(),
2551                handle,
2552            };
2553            Ok::<_, crate::Error>(())
2554        };
2555
2556        let result = match mem::replace(&mut transmit.write, new_state) {
2557            WriteState::GuestReady {
2558                ty: write_ty,
2559                flat_abi: write_flat_abi,
2560                options: write_options,
2561                address: write_address,
2562                count: write_count,
2563                handle: write_handle,
2564                post_write,
2565            } => {
2566                assert_eq!(flat_abi, write_flat_abi);
2567
2568                if let TableIndex::Future(_) = ty {
2569                    transmit.done = true;
2570                }
2571
2572                let write_handle_rep = transmit.write_handle.rep();
2573
2574                // See the comment in `guest_write` for the
2575                // `ReadState::GuestReady` case concerning zero-length reads and
2576                // writes.
2577
2578                let count = usize::try_from(count).unwrap();
2579
2580                let write_complete = write_count == 0 || count > 0;
2581                let read_complete = write_count > 0;
2582                let write_buffer_remaining = count < write_count;
2583
2584                let count = count.min(write_count);
2585
2586                self.copy(
2587                    store.as_context_mut(),
2588                    flat_abi,
2589                    write_ty,
2590                    &write_options,
2591                    write_address,
2592                    ty,
2593                    &options,
2594                    address,
2595                    count,
2596                    rep,
2597                )?;
2598
2599                let instance = self.id().get_mut(store.0);
2600                let types = instance.component().types();
2601                let item_size = payload(ty, types)
2602                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2603                    .unwrap_or(0);
2604                let concurrent_state = instance.concurrent_state_mut();
2605                let pending = if let PostWrite::Drop = post_write {
2606                    concurrent_state.get_mut(transmit_id)?.write = WriteState::Dropped;
2607                    false
2608                } else {
2609                    true
2610                };
2611
2612                if write_complete {
2613                    let count = u32::try_from(count).unwrap();
2614                    let total = if let Some(Event::StreamWrite {
2615                        code: ReturnCode::Completed(old_total),
2616                        ..
2617                    }) = concurrent_state.take_event(write_handle_rep)?
2618                    {
2619                        count + old_total
2620                    } else {
2621                        count
2622                    };
2623
2624                    let code = ReturnCode::completed(ty.kind(), total);
2625
2626                    concurrent_state.set_event(
2627                        write_handle_rep,
2628                        match write_ty {
2629                            TableIndex::Future(ty) => Event::FutureWrite {
2630                                code,
2631                                pending: pending.then_some((ty, write_handle)),
2632                            },
2633                            TableIndex::Stream(ty) => Event::StreamWrite {
2634                                code,
2635                                pending: pending.then_some((ty, write_handle)),
2636                            },
2637                        },
2638                    )?;
2639                }
2640
2641                if write_buffer_remaining {
2642                    let transmit = concurrent_state.get_mut(transmit_id)?;
2643                    transmit.write = WriteState::GuestReady {
2644                        ty: write_ty,
2645                        flat_abi: write_flat_abi,
2646                        options: write_options,
2647                        address: write_address + (count * item_size),
2648                        count: write_count - count,
2649                        handle: write_handle,
2650                        post_write,
2651                    };
2652                }
2653
2654                if read_complete {
2655                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2656                } else {
2657                    set_guest_ready(concurrent_state)?;
2658                    ReturnCode::Blocked
2659                }
2660            }
2661
2662            WriteState::HostReady { accept, post_write } => {
2663                if let TableIndex::Future(_) = ty {
2664                    transmit.done = true;
2665                }
2666
2667                let code = accept(
2668                    store.0.traitobj_mut(),
2669                    self,
2670                    Reader::Guest {
2671                        options: &options,
2672                        ty,
2673                        address,
2674                        count: count.try_into().unwrap(),
2675                    },
2676                )?;
2677
2678                if let PostWrite::Drop = post_write {
2679                    self.concurrent_state_mut(store.0)
2680                        .get_mut(transmit_id)?
2681                        .write = WriteState::Dropped;
2682                }
2683
2684                code
2685            }
2686
2687            WriteState::Open => {
2688                set_guest_ready(concurrent_state)?;
2689                ReturnCode::Blocked
2690            }
2691
2692            WriteState::Dropped => ReturnCode::Dropped(0),
2693        };
2694
2695        if result != ReturnCode::Blocked {
2696            let state = self.concurrent_state_mut(store.0);
2697            *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read {
2698                done: matches!(
2699                    (result, ty),
2700                    (ReturnCode::Dropped(_), TableIndex::Stream(_))
2701                ),
2702            };
2703        }
2704
2705        Ok(result)
2706    }
2707
2708    /// Drop the readable end of the specified stream or future from the guest.
2709    fn guest_drop_readable(
2710        self,
2711        store: &mut dyn VMStore,
2712        ty: TableIndex,
2713        reader: u32,
2714    ) -> Result<()> {
2715        let concurrent_state = self.concurrent_state_mut(store);
2716        let (rep, state) = concurrent_state.state_table(ty).remove_by_index(reader)?;
2717        let (state, kind) = match state {
2718            WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
2719            WaitableState::Future(_, state) => (state, TransmitKind::Future),
2720            _ => {
2721                bail!("invalid stream or future handle");
2722            }
2723        };
2724        match state {
2725            StreamFutureState::Read { .. } => {}
2726            StreamFutureState::Write { .. } => {
2727                bail!("passed write end to `{{stream|future}}.drop-readable`")
2728            }
2729            StreamFutureState::Busy => bail!("cannot drop busy stream or future"),
2730        }
2731        let id = TableId::<TransmitHandle>::new(rep);
2732        let rep = concurrent_state.get(id)?.state.rep();
2733        log::trace!("guest_drop_readable: drop reader {id:?}");
2734        self.host_drop_reader(store, rep, kind)
2735    }
2736
2737    /// Create a new error context for the given component.
2738    ///
2739    /// SAFETY: `memory` and `realloc` must be valid pointers to their
2740    /// respective guest entities.
2741    pub(crate) unsafe fn error_context_new(
2742        self,
2743        store: &mut StoreOpaque,
2744        memory: *mut VMMemoryDefinition,
2745        realloc: *mut VMFuncRef,
2746        string_encoding: u8,
2747        ty: TypeComponentLocalErrorContextTableIndex,
2748        debug_msg_address: u32,
2749        debug_msg_len: u32,
2750    ) -> Result<u32> {
2751        // SAFETY: Per this function's contract, `memory` and `realloc` must be
2752        // valid.
2753        let options = unsafe {
2754            Options::new(
2755                store.id(),
2756                NonNull::new(memory),
2757                NonNull::new(realloc),
2758                StringEncoding::from_u8(string_encoding).ok_or_else(|| {
2759                    anyhow::anyhow!("failed to convert u8 string encoding [{string_encoding}]")
2760                })?,
2761                false,
2762                None,
2763            )
2764        };
2765        let types = self.id().get(store).component().types().clone();
2766        let lift_ctx = &mut LiftContext::new(store, &options, &types, self);
2767        //  Read string from guest memory
2768        let address = usize::try_from(debug_msg_address)?;
2769        let len = usize::try_from(debug_msg_len)?;
2770        lift_ctx
2771            .memory()
2772            .get(address..)
2773            .and_then(|b| b.get(..len))
2774            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
2775        let message = WasmStr::new(address, len, lift_ctx)?;
2776
2777        // Create a new ErrorContext that is tracked along with other concurrent state
2778        let err_ctx = ErrorContextState {
2779            debug_msg: message
2780                .to_str_from_memory(options.memory(store))?
2781                .to_string(),
2782        };
2783        let state = self.concurrent_state_mut(store);
2784        let table_id = state.push(err_ctx)?;
2785        let global_ref_count_idx =
2786            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
2787
2788        // Add to the global error context ref counts
2789        let _ = state
2790            .global_error_context_ref_counts
2791            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
2792
2793        // Error context are tracked both locally (to a single component instance) and globally
2794        // the counts for both must stay in sync.
2795        //
2796        // Here we reflect the newly created global concurrent error context state into the
2797        // component instance's locally tracked count, along with the appropriate key into the global
2798        // ref tracking data structures to enable later lookup
2799        let local_tbl = &mut state.error_context_tables[ty];
2800
2801        assert!(
2802            !local_tbl.has_handle(table_id.rep()),
2803            "newly created error context state already tracked by component"
2804        );
2805        let local_idx = local_tbl.insert(table_id.rep(), LocalErrorContextRefCount(1))?;
2806
2807        Ok(local_idx)
2808    }
2809
2810    /// Retrieve the debug message from the specified error context.
2811    ///
2812    /// SAFETY: `memory` and `realloc` must be valid pointers to their
2813    /// respective guest entities.
2814    pub(super) unsafe fn error_context_debug_message<T>(
2815        self,
2816        store: StoreContextMut<T>,
2817        memory: *mut VMMemoryDefinition,
2818        realloc: *mut VMFuncRef,
2819        string_encoding: u8,
2820        ty: TypeComponentLocalErrorContextTableIndex,
2821        err_ctx_handle: u32,
2822        debug_msg_address: u32,
2823    ) -> Result<()> {
2824        // Retrieve the error context and internal debug message
2825        let id = store.0.store_opaque().id();
2826        let state = self.concurrent_state_mut(store.0);
2827        let (state_table_id_rep, _) = state
2828            .error_context_tables
2829            .get_mut(ty)
2830            .context("error context table index present in (sub)component lookup during debug_msg")?
2831            .get_mut_by_index(err_ctx_handle)?;
2832
2833        // Get the state associated with the error context
2834        let ErrorContextState { debug_msg } =
2835            state.get_mut(TableId::<ErrorContextState>::new(state_table_id_rep))?;
2836        let debug_msg = debug_msg.clone();
2837
2838        // SAFETY: Per this function's contract, `memory` and `realloc` are
2839        // valid.
2840        let options = unsafe {
2841            Options::new(
2842                id,
2843                NonNull::new(memory),
2844                NonNull::new(realloc),
2845                StringEncoding::from_u8(string_encoding).ok_or_else(|| {
2846                    anyhow::anyhow!("failed to convert u8 string encoding [{string_encoding}]")
2847                })?,
2848                false,
2849                None,
2850            )
2851        };
2852        let types = self.id().get(store.0).component().types().clone();
2853        let lower_cx = &mut LowerContext::new(store, &options, &types, self);
2854        let debug_msg_address = usize::try_from(debug_msg_address)?;
2855        // Lower the string into the component's memory
2856        let offset = lower_cx
2857            .as_slice_mut()
2858            .get(debug_msg_address..)
2859            .and_then(|b| b.get(..debug_msg.bytes().len()))
2860            .map(|_| debug_msg_address)
2861            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
2862        debug_msg
2863            .as_str()
2864            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
2865
2866        Ok(())
2867    }
2868
2869    /// Implements the `future.drop-readable` intrinsic.
2870    pub(crate) fn future_drop_readable(
2871        self,
2872        store: &mut dyn VMStore,
2873        ty: TypeFutureTableIndex,
2874        reader: u32,
2875    ) -> Result<()> {
2876        self.guest_drop_readable(store, TableIndex::Future(ty), reader)
2877    }
2878
2879    /// Implements the `stream.drop-readable` intrinsic.
2880    pub(crate) fn stream_drop_readable(
2881        self,
2882        store: &mut dyn VMStore,
2883        ty: TypeStreamTableIndex,
2884        reader: u32,
2885    ) -> Result<()> {
2886        self.guest_drop_readable(store, TableIndex::Stream(ty), reader)
2887    }
2888
2889    /// Retrieve the `TransmitState` rep for the specified `TransmitHandle` rep.
2890    fn get_state_rep(&self, rep: u32) -> Result<u32> {
2891        tls::get(|store| {
2892            let transmit_handle = TableId::<TransmitHandle>::new(rep);
2893            Ok(self
2894                .concurrent_state_mut(store)
2895                .get(transmit_handle)
2896                .with_context(|| format!("stream or future {transmit_handle:?} not found"))?
2897                .state
2898                .rep())
2899        })
2900    }
2901}
2902
2903/// Helper struct for running a closure on drop, e.g. for logging purposes.
2904struct RunOnDrop<F: FnOnce()>(Option<F>);
2905
2906impl<F: FnOnce()> RunOnDrop<F> {
2907    fn new(fun: F) -> Self {
2908        Self(Some(fun))
2909    }
2910
2911    fn cancel(mut self) {
2912        self.0 = None;
2913    }
2914}
2915
2916impl<F: FnOnce()> Drop for RunOnDrop<F> {
2917    fn drop(&mut self) {
2918        if let Some(fun) = self.0.take() {
2919            fun();
2920        }
2921    }
2922}
2923
2924impl ConcurrentState {
2925    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
2926        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
2927    }
2928
2929    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
2930        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
2931    }
2932
2933    /// Set or update the event for the specified waitable.
2934    ///
2935    /// If there is already an event set for this waitable, we assert that it is
2936    /// of the same variant as the new one and reuse the `ReturnCode` count and
2937    /// the `pending` field if applicable.
2938    // TODO: This is a bit awkward due to how
2939    // `Event::{Stream,Future}{Write,Read}` and
2940    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
2941    // Consider updating those representations in a way that allows this
2942    // function to be simplified.
2943    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
2944        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
2945
2946        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
2947            let (ReturnCode::Completed(count)
2948            | ReturnCode::Dropped(count)
2949            | ReturnCode::Cancelled(count)) = old
2950            else {
2951                unreachable!()
2952            };
2953
2954            match new {
2955                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
2956                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
2957                _ => unreachable!(),
2958            }
2959        }
2960
2961        let event = match (waitable.take_event(self)?, event) {
2962            (None, _) => event,
2963            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
2964            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
2965            (
2966                Some(Event::StreamWrite {
2967                    code: old_code,
2968                    pending: old_pending,
2969                }),
2970                Event::StreamWrite { code, pending },
2971            ) => Event::StreamWrite {
2972                code: update_code(old_code, code),
2973                pending: old_pending.or(pending),
2974            },
2975            (
2976                Some(Event::StreamRead {
2977                    code: old_code,
2978                    pending: old_pending,
2979                }),
2980                Event::StreamRead { code, pending },
2981            ) => Event::StreamRead {
2982                code: update_code(old_code, code),
2983                pending: old_pending.or(pending),
2984            },
2985            _ => unreachable!(),
2986        };
2987
2988        waitable.set_event(self, Some(event))
2989    }
2990
2991    fn get_mut_by_index(
2992        &mut self,
2993        ty: TableIndex,
2994        index: u32,
2995    ) -> Result<(u32, &mut StreamFutureState)> {
2996        get_mut_by_index_from(self.state_table(ty), ty, index)
2997    }
2998
2999    /// Allocate a new future or stream, including the `TransmitState` and the
3000    /// `TransmitHandle`s corresponding to the read and write ends.
3001    fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
3002        let state_id = self.push(TransmitState::default())?;
3003
3004        let write = self.push(TransmitHandle::new(state_id))?;
3005        let read = self.push(TransmitHandle::new(state_id))?;
3006
3007        let state = self.get_mut(state_id)?;
3008        state.write_handle = write;
3009        state.read_handle = read;
3010
3011        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
3012
3013        Ok((write, read))
3014    }
3015
3016    /// Delete the specified future or stream, including the read and write ends.
3017    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
3018        let state = self.delete(state_id)?;
3019        self.delete(state.write_handle)?;
3020        self.delete(state.read_handle)?;
3021
3022        log::trace!(
3023            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
3024            state.write_handle,
3025            state.read_handle,
3026        );
3027
3028        Ok(())
3029    }
3030
3031    fn state_table(&mut self, ty: TableIndex) -> &mut StateTable<WaitableState> {
3032        let runtime_instance = match ty {
3033            TableIndex::Stream(ty) => self.component.types()[ty].instance,
3034            TableIndex::Future(ty) => self.component.types()[ty].instance,
3035        };
3036        &mut self.waitable_tables[runtime_instance]
3037    }
3038
3039    /// Allocate a new future or stream and grant ownership of both the read and
3040    /// write ends to the (sub-)component instance to which the specified
3041    /// `TableIndex` belongs.
3042    fn guest_new(&mut self, ty: TableIndex) -> Result<ResourcePair> {
3043        let (write, read) = self.new_transmit()?;
3044        let read = self.state_table(ty).insert(
3045            read.rep(),
3046            waitable_state(ty, StreamFutureState::Read { done: false }),
3047        )?;
3048        let write = self.state_table(ty).insert(
3049            write.rep(),
3050            waitable_state(ty, StreamFutureState::Write { done: false }),
3051        )?;
3052        Ok(ResourcePair { write, read })
3053    }
3054
3055    /// Cancel a pending stream or future write from the host.
3056    ///
3057    /// # Arguments
3058    ///
3059    /// * `rep` - The `TransmitState` rep for the stream or future.
3060    fn host_cancel_write(&mut self, rep: u32) -> Result<ReturnCode> {
3061        let transmit_id = TableId::<TransmitState>::new(rep);
3062        let transmit = self.get_mut(transmit_id)?;
3063        log::trace!(
3064            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3065            transmit.read,
3066            transmit.write
3067        );
3068
3069        let code = if let Some(event) =
3070            Waitable::Transmit(transmit.write_handle).take_event(self)?
3071        {
3072            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3073                unreachable!();
3074            };
3075            match (code, event) {
3076                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3077                    ReturnCode::Cancelled(count)
3078                }
3079                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3080                _ => unreachable!(),
3081            }
3082        } else {
3083            ReturnCode::Cancelled(0)
3084        };
3085
3086        let transmit = self.get_mut(transmit_id)?;
3087
3088        match &transmit.write {
3089            WriteState::GuestReady { .. } | WriteState::HostReady { .. } => {
3090                transmit.write = WriteState::Open;
3091            }
3092
3093            WriteState::Open | WriteState::Dropped => {}
3094        }
3095
3096        log::trace!("cancelled write {transmit_id:?}");
3097
3098        Ok(code)
3099    }
3100
3101    /// Cancel a pending stream or future read from the host.
3102    ///
3103    /// # Arguments
3104    ///
3105    /// * `rep` - The `TransmitState` rep for the stream or future.
3106    fn host_cancel_read(&mut self, rep: u32) -> Result<ReturnCode> {
3107        let transmit_id = TableId::<TransmitState>::new(rep);
3108        let transmit = self.get_mut(transmit_id)?;
3109        log::trace!(
3110            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3111            transmit.read,
3112            transmit.write
3113        );
3114
3115        let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
3116            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3117                unreachable!();
3118            };
3119            match (code, event) {
3120                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3121                    ReturnCode::Cancelled(count)
3122                }
3123                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3124                _ => unreachable!(),
3125            }
3126        } else {
3127            ReturnCode::Cancelled(0)
3128        };
3129
3130        let transmit = self.get_mut(transmit_id)?;
3131
3132        match &transmit.read {
3133            ReadState::GuestReady { .. } | ReadState::HostReady { .. } => {
3134                transmit.read = ReadState::Open;
3135            }
3136
3137            ReadState::Open | ReadState::Dropped => {}
3138        }
3139
3140        log::trace!("cancelled read {transmit_id:?}");
3141
3142        Ok(code)
3143    }
3144
3145    /// Drop the write end of a stream or future read from the host.
3146    ///
3147    /// # Arguments
3148    ///
3149    /// * `transmit_rep` - The `TransmitState` rep for the stream or future.
3150    fn host_drop_writer(&mut self, transmit_rep: u32, kind: TransmitKind) -> Result<()> {
3151        let transmit_id = TableId::<TransmitState>::new(transmit_rep);
3152        let transmit = self
3153            .get_mut(transmit_id)
3154            .with_context(|| format!("error closing writer {transmit_rep}"))?;
3155        log::trace!(
3156            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
3157            transmit.read,
3158            transmit.write
3159        );
3160
3161        transmit.writer_watcher = None;
3162
3163        // Existing queued transmits must be updated with information for the impending writer closure
3164        match &mut transmit.write {
3165            WriteState::GuestReady { post_write, .. } => {
3166                *post_write = PostWrite::Drop;
3167            }
3168            WriteState::HostReady { post_write, .. } => {
3169                *post_write = PostWrite::Drop;
3170            }
3171            v @ WriteState::Open => {
3172                if let (TransmitKind::Future, false) = (
3173                    kind,
3174                    transmit.done || matches!(transmit.read, ReadState::Dropped),
3175                ) {
3176                    bail!("cannot drop future write end without first writing a value")
3177                }
3178
3179                *v = WriteState::Dropped;
3180            }
3181            WriteState::Dropped => unreachable!("write state is already dropped"),
3182        }
3183
3184        // If the existing read state is dropped, then there's nothing to read
3185        // and we can keep it that way.
3186        //
3187        // If the read state was any other state, then we must set the new state to open
3188        // to indicate that there *is* data to be read
3189        let new_state = if let ReadState::Dropped = &transmit.read {
3190            ReadState::Dropped
3191        } else {
3192            ReadState::Open
3193        };
3194
3195        let read_handle = transmit.read_handle;
3196
3197        // Swap in the new read state
3198        match mem::replace(&mut transmit.read, new_state) {
3199            // If the guest was ready to read, then we cannot drop the reader (or writer)
3200            // we must deliver the event, and update the state associated with the handle to
3201            // represent that a read must be performed
3202            ReadState::GuestReady { ty, handle, .. } => {
3203                // Ensure the final read of the guest is queued, with appropriate closure indicator
3204                self.update_event(
3205                    read_handle.rep(),
3206                    match ty {
3207                        TableIndex::Future(ty) => Event::FutureRead {
3208                            code: ReturnCode::Dropped(0),
3209                            pending: Some((ty, handle)),
3210                        },
3211                        TableIndex::Stream(ty) => Event::StreamRead {
3212                            code: ReturnCode::Dropped(0),
3213                            pending: Some((ty, handle)),
3214                        },
3215                    },
3216                )?;
3217            }
3218
3219            // If the host was ready to read, and the writer end is being dropped (host->host write?)
3220            // signal to the reader that we've reached the end of the stream
3221            ReadState::HostReady { accept } => {
3222                accept(Writer::End)?;
3223            }
3224
3225            // If the read state is open, then there are no registered readers of the stream/future
3226            ReadState::Open => {
3227                self.update_event(
3228                    read_handle.rep(),
3229                    match kind {
3230                        TransmitKind::Future => Event::FutureRead {
3231                            code: ReturnCode::Dropped(0),
3232                            pending: None,
3233                        },
3234                        TransmitKind::Stream => Event::StreamRead {
3235                            code: ReturnCode::Dropped(0),
3236                            pending: None,
3237                        },
3238                    },
3239                )?;
3240            }
3241
3242            // If the read state was already dropped, then we can remove the transmit state completely
3243            // (both writer and reader have been dropped)
3244            ReadState::Dropped => {
3245                log::trace!("host_drop_writer delete {transmit_rep}");
3246                self.delete_transmit(transmit_id)?;
3247            }
3248        }
3249        Ok(())
3250    }
3251
3252    /// Cancel a pending write for the specified stream or future from the guest.
3253    fn guest_cancel_write(
3254        &mut self,
3255        ty: TableIndex,
3256        writer: u32,
3257        _async_: bool,
3258    ) -> Result<ReturnCode> {
3259        let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3260            self.state_table(ty).get_mut_by_index(writer)?
3261        else {
3262            bail!("invalid stream or future handle");
3263        };
3264        let id = TableId::<TransmitHandle>::new(rep);
3265        log::trace!("guest cancel write {id:?} (handle {writer})");
3266        match state {
3267            StreamFutureState::Write { .. } => {
3268                bail!("stream or future write cancelled when no write is pending")
3269            }
3270            StreamFutureState::Read { .. } => {
3271                bail!("passed read end to `{{stream|future}}.cancel-write`")
3272            }
3273            StreamFutureState::Busy => {
3274                *state = StreamFutureState::Write { done: false };
3275            }
3276        }
3277        let rep = self.get(id)?.state.rep();
3278        self.host_cancel_write(rep)
3279    }
3280
3281    /// Cancel a pending read for the specified stream or future from the guest.
3282    fn guest_cancel_read(
3283        &mut self,
3284        ty: TableIndex,
3285        reader: u32,
3286        _async_: bool,
3287    ) -> Result<ReturnCode> {
3288        let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3289            self.state_table(ty).get_mut_by_index(reader)?
3290        else {
3291            bail!("invalid stream or future handle");
3292        };
3293        let id = TableId::<TransmitHandle>::new(rep);
3294        log::trace!("guest cancel read {id:?} (handle {reader})");
3295        match state {
3296            StreamFutureState::Read { .. } => {
3297                bail!("stream or future read cancelled when no read is pending")
3298            }
3299            StreamFutureState::Write { .. } => {
3300                bail!("passed write end to `{{stream|future}}.cancel-read`")
3301            }
3302            StreamFutureState::Busy => {
3303                *state = StreamFutureState::Read { done: false };
3304            }
3305        }
3306        let rep = self.get(id)?.state.rep();
3307        self.host_cancel_read(rep)
3308    }
3309
3310    /// Drop the writable end of the specified stream or future from the guest.
3311    fn guest_drop_writable(&mut self, ty: TableIndex, writer: u32) -> Result<()> {
3312        let (transmit_rep, state) = self
3313            .state_table(ty)
3314            .remove_by_index(writer)
3315            .context("failed to find writer")?;
3316        let (state, kind) = match state {
3317            WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
3318            WaitableState::Future(_, state) => (state, TransmitKind::Future),
3319            _ => {
3320                bail!("invalid stream or future handle");
3321            }
3322        };
3323        match state {
3324            StreamFutureState::Write { .. } => {}
3325            StreamFutureState::Read { .. } => {
3326                bail!("passed read end to `{{stream|future}}.drop-writable`")
3327            }
3328            StreamFutureState::Busy => bail!("cannot drop busy stream or future"),
3329        }
3330
3331        let id = TableId::<TransmitHandle>::new(transmit_rep);
3332        let transmit_rep = self.get(id)?.state.rep();
3333        log::trace!("guest_drop_writable: drop writer {id:?}");
3334        self.host_drop_writer(transmit_rep, kind)
3335    }
3336
3337    /// Drop the specified error context.
3338    pub(crate) fn error_context_drop(
3339        &mut self,
3340        ty: TypeComponentLocalErrorContextTableIndex,
3341        error_context: u32,
3342    ) -> Result<()> {
3343        let local_state_table = self
3344            .error_context_tables
3345            .get_mut(ty)
3346            .context("error context table index present in (sub)component table during drop")?;
3347
3348        // Reduce the local (sub)component ref count, removing tracking if necessary
3349        let (rep, local_ref_removed) = {
3350            let (rep, LocalErrorContextRefCount(local_ref_count)) =
3351                local_state_table.get_mut_by_index(error_context)?;
3352            assert!(*local_ref_count > 0);
3353            *local_ref_count -= 1;
3354            let mut local_ref_removed = false;
3355            if *local_ref_count == 0 {
3356                local_ref_removed = true;
3357                local_state_table
3358                    .remove_by_index(error_context)
3359                    .context("removing error context from component-local tracking")?;
3360            }
3361            (rep, local_ref_removed)
3362        };
3363
3364        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3365
3366        let GlobalErrorContextRefCount(global_ref_count) = self
3367            .global_error_context_ref_counts
3368            .get_mut(&global_ref_count_idx)
3369            .expect("retrieve concurrent state for error context during drop");
3370
3371        // Reduce the component-global ref count, removing tracking if necessary
3372        assert!(*global_ref_count >= 1);
3373        *global_ref_count -= 1;
3374        if *global_ref_count == 0 {
3375            assert!(local_ref_removed);
3376
3377            self.global_error_context_ref_counts
3378                .remove(&global_ref_count_idx);
3379
3380            self.delete(TableId::<ErrorContextState>::new(rep))
3381                .context("deleting component-global error context data")?;
3382        }
3383
3384        Ok(())
3385    }
3386
3387    /// Transfer ownership of the specified stream or future read end from one
3388    /// guest to another.
3389    fn guest_transfer<U: PartialEq + Eq + std::fmt::Debug>(
3390        &mut self,
3391        src_idx: u32,
3392        src: U,
3393        src_instance: RuntimeComponentInstanceIndex,
3394        dst: U,
3395        dst_instance: RuntimeComponentInstanceIndex,
3396        match_state: impl Fn(&mut WaitableState) -> Result<(U, &mut StreamFutureState)>,
3397        make_state: impl Fn(U, StreamFutureState) -> WaitableState,
3398    ) -> Result<u32> {
3399        let src_table = &mut self.waitable_tables[src_instance];
3400        let (_rep, src_state) = src_table.get_mut_by_index(src_idx)?;
3401        let (src_ty, _) = match_state(src_state)?;
3402        if src_ty != src {
3403            bail!("invalid future handle");
3404        }
3405
3406        let src_table = &mut self.waitable_tables[src_instance];
3407        let (rep, src_state) = src_table.get_mut_by_index(src_idx)?;
3408        let (_, src_state) = match_state(src_state)?;
3409
3410        match src_state {
3411            StreamFutureState::Read { done: true } => {
3412                bail!("cannot lift stream after being notified that the writable end dropped")
3413            }
3414            StreamFutureState::Read { done: false } => {
3415                src_table.remove_by_index(src_idx)?;
3416
3417                let dst_table = &mut self.waitable_tables[dst_instance];
3418                dst_table.insert(
3419                    rep,
3420                    make_state(dst, StreamFutureState::Read { done: false }),
3421                )
3422            }
3423            StreamFutureState::Write { .. } => {
3424                bail!("cannot transfer write end of stream or future")
3425            }
3426            StreamFutureState::Busy => bail!("cannot transfer busy stream or future"),
3427        }
3428    }
3429
3430    /// Implements the `future.new` intrinsic.
3431    pub(crate) fn future_new(&mut self, ty: TypeFutureTableIndex) -> Result<ResourcePair> {
3432        self.guest_new(TableIndex::Future(ty))
3433    }
3434
3435    /// Implements the `future.cancel-write` intrinsic.
3436    pub(crate) fn future_cancel_write(
3437        &mut self,
3438        ty: TypeFutureTableIndex,
3439        async_: bool,
3440        writer: u32,
3441    ) -> Result<u32> {
3442        self.guest_cancel_write(TableIndex::Future(ty), writer, async_)
3443            .map(|result| result.encode())
3444    }
3445
3446    /// Implements the `future.cancel-read` intrinsic.
3447    pub(crate) fn future_cancel_read(
3448        &mut self,
3449        ty: TypeFutureTableIndex,
3450        async_: bool,
3451        reader: u32,
3452    ) -> Result<u32> {
3453        self.guest_cancel_read(TableIndex::Future(ty), reader, async_)
3454            .map(|result| result.encode())
3455    }
3456
3457    /// Implements the `future.drop-writable` intrinsic.
3458    pub(crate) fn future_drop_writable(
3459        &mut self,
3460        ty: TypeFutureTableIndex,
3461        writer: u32,
3462    ) -> Result<()> {
3463        self.guest_drop_writable(TableIndex::Future(ty), writer)
3464    }
3465
3466    /// Implements the `stream.new` intrinsic.
3467    pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result<ResourcePair> {
3468        self.guest_new(TableIndex::Stream(ty))
3469    }
3470
3471    /// Implements the `stream.cancel-write` intrinsic.
3472    pub(crate) fn stream_cancel_write(
3473        &mut self,
3474        ty: TypeStreamTableIndex,
3475        async_: bool,
3476        writer: u32,
3477    ) -> Result<u32> {
3478        self.guest_cancel_write(TableIndex::Stream(ty), writer, async_)
3479            .map(|result| result.encode())
3480    }
3481
3482    /// Implements the `stream.cancel-read` intrinsic.
3483    pub(crate) fn stream_cancel_read(
3484        &mut self,
3485        ty: TypeStreamTableIndex,
3486        async_: bool,
3487        reader: u32,
3488    ) -> Result<u32> {
3489        self.guest_cancel_read(TableIndex::Stream(ty), reader, async_)
3490            .map(|result| result.encode())
3491    }
3492
3493    /// Implements the `stream.drop-writable` intrinsic.
3494    pub(crate) fn stream_drop_writable(
3495        &mut self,
3496        ty: TypeStreamTableIndex,
3497        writer: u32,
3498    ) -> Result<()> {
3499        self.guest_drop_writable(TableIndex::Stream(ty), writer)
3500    }
3501
3502    /// Transfer ownership of the specified future read end from one guest to
3503    /// another.
3504    pub(crate) fn future_transfer(
3505        &mut self,
3506        src_idx: u32,
3507        src: TypeFutureTableIndex,
3508        dst: TypeFutureTableIndex,
3509    ) -> Result<u32> {
3510        self.guest_transfer(
3511            src_idx,
3512            src,
3513            self.component.types()[src].instance,
3514            dst,
3515            self.component.types()[dst].instance,
3516            |state| {
3517                if let WaitableState::Future(ty, state) = state {
3518                    Ok((*ty, state))
3519                } else {
3520                    Err(anyhow!("invalid future handle"))
3521                }
3522            },
3523            WaitableState::Future,
3524        )
3525    }
3526
3527    /// Transfer ownership of the specified stream read end from one guest to
3528    /// another.
3529    pub(crate) fn stream_transfer(
3530        &mut self,
3531        src_idx: u32,
3532        src: TypeStreamTableIndex,
3533        dst: TypeStreamTableIndex,
3534    ) -> Result<u32> {
3535        self.guest_transfer(
3536            src_idx,
3537            src,
3538            self.component.types()[src].instance,
3539            dst,
3540            self.component.types()[dst].instance,
3541            |state| {
3542                if let WaitableState::Stream(ty, state) = state {
3543                    Ok((*ty, state))
3544                } else {
3545                    Err(anyhow!("invalid stream handle"))
3546                }
3547            },
3548            WaitableState::Stream,
3549        )
3550    }
3551
3552    /// Copy the specified error context from one component to another.
3553    pub(crate) fn error_context_transfer(
3554        &mut self,
3555        src_idx: u32,
3556        src: TypeComponentLocalErrorContextTableIndex,
3557        dst: TypeComponentLocalErrorContextTableIndex,
3558    ) -> Result<u32> {
3559        let (rep, _) = {
3560            let rep = self
3561                .error_context_tables
3562                .get_mut(src)
3563                .context("error context table index present in (sub)component lookup")?
3564                .get_mut_by_index(src_idx)?;
3565            rep
3566        };
3567        let dst = self
3568            .error_context_tables
3569            .get_mut(dst)
3570            .context("error context table index present in (sub)component lookup")?;
3571
3572        // Update the component local for the destination
3573        let updated_count = if let Some((dst_idx, count)) = dst.get_mut_by_rep(rep) {
3574            (*count).0 += 1;
3575            dst_idx
3576        } else {
3577            dst.insert(rep, LocalErrorContextRefCount(1))?
3578        };
3579
3580        // Update the global (cross-subcomponent) count for error contexts
3581        // as the new component has essentially created a new reference that will
3582        // be dropped/handled independently
3583        let global_ref_count = self
3584            .global_error_context_ref_counts
3585            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
3586            .context("global ref count present for existing (sub)component error context")?;
3587        global_ref_count.0 += 1;
3588
3589        Ok(updated_count)
3590    }
3591}
3592
3593pub(crate) struct ResourcePair {
3594    pub(crate) write: u32,
3595    pub(crate) read: u32,
3596}