wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem};
4use crate::component::func::{self, LiftContext, LowerContext, Options};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::VMStore;
10use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context, Result, anyhow, bail};
13use buffers::Extender;
14use buffers::UntypedWriteBuffer;
15use futures::channel::oneshot;
16use std::boxed::Box;
17use std::fmt;
18use std::future;
19use std::iter;
20use std::marker::PhantomData;
21use std::mem::{self, MaybeUninit};
22use std::pin::Pin;
23use std::string::{String, ToString};
24use std::sync::{Arc, Mutex};
25use std::task::{Poll, Waker};
26use std::vec::Vec;
27use wasmtime_environ::component::{
28    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex,
29    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
30    TypeFutureTableIndex, TypeStreamTableIndex,
31};
32
33pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
34
35mod buffers;
36
37/// Enum for distinguishing between a stream or future in functions that handle
38/// both.
39#[derive(Copy, Clone, Debug)]
40pub enum TransmitKind {
41    Stream,
42    Future,
43}
44
45/// Represents `{stream,future}.{read,write}` results.
46#[derive(Copy, Clone, Debug, PartialEq)]
47pub enum ReturnCode {
48    Blocked,
49    Completed(u32),
50    Dropped(u32),
51    Cancelled(u32),
52}
53
54impl ReturnCode {
55    /// Pack `self` into a single 32-bit integer that may be returned to the
56    /// guest.
57    ///
58    /// This corresponds to `pack_copy_result` in the Component Model spec.
59    pub fn encode(&self) -> u32 {
60        const BLOCKED: u32 = 0xffff_ffff;
61        const COMPLETED: u32 = 0x0;
62        const DROPPED: u32 = 0x1;
63        const CANCELLED: u32 = 0x2;
64        match self {
65            ReturnCode::Blocked => BLOCKED,
66            ReturnCode::Completed(n) => {
67                debug_assert!(*n < (1 << 28));
68                (n << 4) | COMPLETED
69            }
70            ReturnCode::Dropped(n) => {
71                debug_assert!(*n < (1 << 28));
72                (n << 4) | DROPPED
73            }
74            ReturnCode::Cancelled(n) => {
75                debug_assert!(*n < (1 << 28));
76                (n << 4) | CANCELLED
77            }
78        }
79    }
80
81    /// Returns `Self::Completed` with the specified count (or zero if
82    /// `matches!(kind, TransmitKind::Future)`)
83    fn completed(kind: TransmitKind, count: u32) -> Self {
84        Self::Completed(if let TransmitKind::Future = kind {
85            0
86        } else {
87            count
88        })
89    }
90}
91
92/// Represents a stream or future type index.
93///
94/// This is useful as a parameter type for functions which operate on either a
95/// future or a stream.
96#[derive(Copy, Clone, Debug)]
97pub enum TransmitIndex {
98    Stream(TypeStreamTableIndex),
99    Future(TypeFutureTableIndex),
100}
101
102impl TransmitIndex {
103    pub fn kind(&self) -> TransmitKind {
104        match self {
105            TransmitIndex::Stream(_) => TransmitKind::Stream,
106            TransmitIndex::Future(_) => TransmitKind::Future,
107        }
108    }
109}
110
111/// Action to take after writing
112enum PostWrite {
113    /// Continue performing writes
114    Continue,
115    /// Drop the channel post-write
116    Drop,
117}
118
119/// Represents the result of a host-initiated stream or future read or write.
120struct HostResult<B> {
121    /// The buffer provided when reading or writing.
122    buffer: B,
123    /// Whether the other end of the stream or future has been dropped.
124    dropped: bool,
125}
126
127/// Retrieve the payload type of the specified stream or future, or `None` if it
128/// has no payload type.
129fn payload(ty: TransmitIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
130    match ty {
131        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
132        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
133    }
134}
135
136/// Retrieve the host rep and state for the specified guest-visible waitable
137/// handle.
138fn get_mut_by_index_from(
139    handle_table: &mut HandleTable,
140    ty: TransmitIndex,
141    index: u32,
142) -> Result<(u32, &mut TransmitLocalState)> {
143    match ty {
144        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
145        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
146    }
147}
148
149/// Complete a write initiated by a host-owned future or stream by matching it
150/// with the specified `Reader`.
151fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
152    mut store: StoreContextMut<U>,
153    instance: Instance,
154    reader: Reader,
155    mut buffer: B,
156    kind: TransmitKind,
157) -> Result<(HostResult<B>, ReturnCode)> {
158    Ok(match reader {
159        Reader::Guest {
160            options,
161            ty,
162            address,
163            count,
164        } => {
165            let types = instance.id().get(store.0).component().types().clone();
166            let count = buffer.remaining().len().min(count);
167
168            let lower = &mut if T::MAY_REQUIRE_REALLOC {
169                LowerContext::new
170            } else {
171                LowerContext::new_without_realloc
172            }(store.as_context_mut(), options, &types, instance);
173
174            if address % usize::try_from(T::ALIGN32)? != 0 {
175                bail!("read pointer not aligned");
176            }
177            lower
178                .as_slice_mut()
179                .get_mut(address..)
180                .and_then(|b| b.get_mut(..T::SIZE32 * count))
181                .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
182
183            if let Some(ty) = payload(ty, &types) {
184                T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
185            }
186
187            buffer.skip(count);
188            (
189                HostResult {
190                    buffer,
191                    dropped: false,
192                },
193                ReturnCode::completed(kind, count.try_into().unwrap()),
194            )
195        }
196        Reader::Host { accept } => {
197            let count = buffer.remaining().len();
198            let mut untyped = UntypedWriteBuffer::new(&mut buffer);
199            let count = accept(&mut untyped, count);
200            (
201                HostResult {
202                    buffer,
203                    dropped: false,
204                },
205                ReturnCode::completed(kind, count.try_into().unwrap()),
206            )
207        }
208        Reader::End => (
209            HostResult {
210                buffer,
211                dropped: true,
212            },
213            ReturnCode::Dropped(0),
214        ),
215    })
216}
217
218/// Complete a read initiated by a host-owned future or stream by matching it with the
219/// specified `Writer`.
220fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
221    writer: Writer,
222    mut buffer: B,
223    kind: TransmitKind,
224) -> Result<(HostResult<B>, ReturnCode)> {
225    Ok(match writer {
226        Writer::Guest {
227            lift,
228            ty,
229            address,
230            count,
231        } => {
232            let count = count.min(buffer.remaining_capacity());
233            if T::IS_RUST_UNIT_TYPE {
234                // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
235                // zero-sized type, so `MaybeUninit::uninit().assume_init()`
236                // is a valid way to populate the zero-sized buffer.
237                buffer.extend(
238                    iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() })
239                        .take(count),
240                )
241            } else {
242                let ty = ty.unwrap();
243                if address % usize::try_from(T::ALIGN32)? != 0 {
244                    bail!("write pointer not aligned");
245                }
246                lift.memory()
247                    .get(address..)
248                    .and_then(|b| b.get(..T::SIZE32 * count))
249                    .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
250
251                let list = &WasmList::new(address, count, lift, ty)?;
252                T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))?
253            }
254            (
255                HostResult {
256                    buffer,
257                    dropped: false,
258                },
259                ReturnCode::completed(kind, count.try_into().unwrap()),
260            )
261        }
262        Writer::Host {
263            buffer: input,
264            count,
265        } => {
266            let count = count.min(buffer.remaining_capacity());
267            buffer.move_from(input.get_mut::<T>(), count);
268            (
269                HostResult {
270                    buffer,
271                    dropped: false,
272                },
273                ReturnCode::completed(kind, count.try_into().unwrap()),
274            )
275        }
276        Writer::End => (
277            HostResult {
278                buffer,
279                dropped: true,
280            },
281            ReturnCode::Dropped(0),
282        ),
283    })
284}
285
286/// Return a `Future` which will resolve once the reader end corresponding to
287/// the specified writer end of a future or stream is dropped.
288async fn watch_reader(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
289    future::poll_fn(|cx| {
290        accessor
291            .as_accessor()
292            .with(|mut access| {
293                let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
294                let state_id = concurrent_state.get(id)?.state;
295                let state = concurrent_state.get_mut(state_id)?;
296                anyhow::Ok(if matches!(&state.read, ReadState::Dropped) {
297                    Poll::Ready(())
298                } else {
299                    state.reader_watcher = Some(cx.waker().clone());
300                    Poll::Pending
301                })
302            })
303            .unwrap_or(Poll::Ready(()))
304    })
305    .await
306}
307
308/// Return a `Future` which will resolve once the writer end corresponding to
309/// the specified reader end of a future or stream is dropped.
310async fn watch_writer(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
311    future::poll_fn(|cx| {
312        accessor
313            .as_accessor()
314            .with(|mut access| {
315                let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
316                let state_id = concurrent_state.get(id)?.state;
317                let state = concurrent_state.get_mut(state_id)?;
318                anyhow::Ok(
319                    if matches!(
320                        &state.write,
321                        WriteState::Dropped
322                            | WriteState::GuestReady {
323                                post_write: PostWrite::Drop,
324                                ..
325                            }
326                            | WriteState::HostReady {
327                                post_write: PostWrite::Drop,
328                                ..
329                            }
330                    ) {
331                        Poll::Ready(())
332                    } else {
333                        state.writer_watcher = Some(cx.waker().clone());
334                        Poll::Pending
335                    },
336                )
337            })
338            .unwrap_or(Poll::Ready(()))
339    })
340    .await
341}
342
343/// Represents the state associated with an error context
344#[derive(Debug, PartialEq, Eq, PartialOrd)]
345pub(super) struct ErrorContextState {
346    /// Debug message associated with the error context
347    pub(crate) debug_msg: String,
348}
349
350/// Represents the size and alignment for a "flat" Component Model type,
351/// i.e. one containing no pointers or handles.
352#[derive(Debug, Clone, Copy, PartialEq, Eq)]
353pub(super) struct FlatAbi {
354    pub(super) size: u32,
355    pub(super) align: u32,
356}
357
358/// Represents the writable end of a Component Model `future`.
359///
360/// Note that `FutureWriter` instances must be disposed of using either `write`
361/// or `close`; otherwise the in-store representation will leak and the reader
362/// end will hang indefinitely.  Consider using [`GuardedFutureWriter`] to
363/// ensure that disposal happens automatically.
364pub struct FutureWriter<T> {
365    default: fn() -> T,
366    id: TableId<TransmitHandle>,
367    instance: Instance,
368}
369
370impl<T> FutureWriter<T> {
371    fn new(default: fn() -> T, id: TableId<TransmitHandle>, instance: Instance) -> Self {
372        Self {
373            default,
374            id,
375            instance,
376        }
377    }
378
379    /// Write the specified value to this `future`.
380    ///
381    /// The returned `Future` will yield `true` if the read end accepted the
382    /// value; otherwise it will return `false`, meaning the read end was dropped
383    /// before the value could be delivered.
384    ///
385    /// # Panics
386    ///
387    /// Panics if the store that the [`Accessor`] is derived from does not own
388    /// this future.
389    pub async fn write(self, accessor: impl AsAccessor, value: T) -> bool
390    where
391        T: func::Lower + Send + Sync + 'static,
392    {
393        self.guard(accessor).write(value).await
394    }
395
396    /// Mut-ref signature instead of by-value signature for
397    /// `GuardedFutureWriter` to more easily call.
398    async fn write_(&mut self, accessor: impl AsAccessor, value: T) -> bool
399    where
400        T: func::Lower + Send + Sync + 'static,
401    {
402        let accessor = accessor.as_accessor();
403
404        let result = self
405            .instance
406            .host_write_async(accessor, self.id, Some(value), TransmitKind::Future)
407            .await;
408
409        match result {
410            Ok(HostResult { dropped, .. }) => !dropped,
411            Err(_) => todo!("guarantee buffer recovery if `host_write` fails"),
412        }
413    }
414
415    /// Wait for the read end of this `future` is dropped.
416    ///
417    /// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or
418    /// from within a host function for example.
419    ///
420    /// # Panics
421    ///
422    /// Panics if the store that the [`Accessor`] is derived from does not own
423    /// this future.
424    pub async fn watch_reader(&mut self, accessor: impl AsAccessor) {
425        watch_reader(accessor, self.instance, self.id).await
426    }
427
428    /// Close this `FutureWriter`, writing the default value.
429    ///
430    /// # Panics
431    ///
432    /// Panics if the store that the [`Accessor`] is derived from does not own
433    /// this future. Usage of this future after calling `close` will also cause
434    /// a panic.
435    pub fn close(&mut self, mut store: impl AsContextMut)
436    where
437        T: func::Lower + Send + Sync + 'static,
438    {
439        let id = mem::replace(&mut self.id, TableId::new(0));
440        let default = self.default;
441        self.instance
442            .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default())))
443            .unwrap();
444    }
445
446    /// Convenience method around [`Self::close`].
447    pub fn close_with(&mut self, accessor: impl AsAccessor)
448    where
449        T: func::Lower + Send + Sync + 'static,
450    {
451        accessor.as_accessor().with(|access| self.close(access))
452    }
453
454    /// Returns a [`GuardedFutureWriter`] which will auto-close this future on
455    /// drop and clean it up from the store.
456    ///
457    /// Note that the `accessor` provided must own this future and is
458    /// additionally transferred to the `GuardedFutureWriter` return value.
459    pub fn guard<A>(self, accessor: A) -> GuardedFutureWriter<T, A>
460    where
461        T: func::Lower + Send + Sync + 'static,
462        A: AsAccessor,
463    {
464        GuardedFutureWriter::new(accessor, self)
465    }
466}
467
468/// A [`FutureWriter`] paired with an [`Accessor`].
469///
470/// This is an RAII wrapper around [`FutureWriter`] that ensures it is closed
471/// when dropped. This can be created through [`GuardedFutureWriter::new`] or
472/// [`FutureWriter::guard`].
473pub struct GuardedFutureWriter<T, A>
474where
475    T: func::Lower + Send + Sync + 'static,
476    A: AsAccessor,
477{
478    // This field is `None` to implement the conversion from this guard back to
479    // `FutureWriter`. When `None` is seen in the destructor it will cause the
480    // destructor to do nothing.
481    writer: Option<FutureWriter<T>>,
482    accessor: A,
483}
484
485impl<T, A> GuardedFutureWriter<T, A>
486where
487    T: func::Lower + Send + Sync + 'static,
488    A: AsAccessor,
489{
490    /// Create a new `GuardedFutureWriter` with the specified `accessor` and
491    /// `writer`.
492    pub fn new(accessor: A, writer: FutureWriter<T>) -> Self {
493        Self {
494            writer: Some(writer),
495            accessor,
496        }
497    }
498
499    /// Wrapper for [`FutureWriter::write`].
500    pub async fn write(mut self, value: T) -> bool
501    where
502        T: func::Lower + Send + Sync + 'static,
503    {
504        self.writer
505            .as_mut()
506            .unwrap()
507            .write_(&self.accessor, value)
508            .await
509    }
510
511    /// Wrapper for [`FutureWriter::watch_reader`]
512    pub async fn watch_reader(&mut self) {
513        self.writer
514            .as_mut()
515            .unwrap()
516            .watch_reader(&self.accessor)
517            .await
518    }
519
520    /// Extracts the underlying [`FutureWriter`] from this guard, returning it
521    /// back.
522    pub fn into_future(self) -> FutureWriter<T> {
523        self.into()
524    }
525}
526
527impl<T, A> From<GuardedFutureWriter<T, A>> for FutureWriter<T>
528where
529    T: func::Lower + Send + Sync + 'static,
530    A: AsAccessor,
531{
532    fn from(mut guard: GuardedFutureWriter<T, A>) -> Self {
533        guard.writer.take().unwrap()
534    }
535}
536
537impl<T, A> Drop for GuardedFutureWriter<T, A>
538where
539    T: func::Lower + Send + Sync + 'static,
540    A: AsAccessor,
541{
542    fn drop(&mut self) {
543        if let Some(writer) = &mut self.writer {
544            writer.close_with(&self.accessor)
545        }
546    }
547}
548
549/// Represents the readable end of a Component Model `future`.
550///
551/// Note that `FutureReader` instances must be disposed of using either `read`
552/// or `close`; otherwise the in-store representation will leak and the writer
553/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
554/// ensure that disposal happens automatically.
555pub struct FutureReader<T> {
556    instance: Instance,
557    id: TableId<TransmitHandle>,
558    _phantom: PhantomData<T>,
559}
560
561impl<T> FutureReader<T> {
562    fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
563        Self {
564            instance,
565            id,
566            _phantom: PhantomData,
567        }
568    }
569
570    /// Read the value from this `future`.
571    ///
572    /// The returned `Future` will yield `Err` if the guest has trapped
573    /// before it could produce a result.
574    ///
575    /// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or
576    /// from within a host function for example.
577    ///
578    /// # Panics
579    ///
580    /// Panics if the store that the [`Accessor`] is derived from does not own
581    /// this future.
582    pub async fn read(self, accessor: impl AsAccessor) -> Option<T>
583    where
584        T: func::Lift + Send + 'static,
585    {
586        self.guard(accessor).read().await
587    }
588
589    async fn read_(&mut self, accessor: impl AsAccessor) -> Option<T>
590    where
591        T: func::Lift + Send + 'static,
592    {
593        let accessor = accessor.as_accessor();
594
595        let result = self
596            .instance
597            .host_read_async(accessor, self.id, None, TransmitKind::Future)
598            .await;
599
600        if let Ok(HostResult {
601            mut buffer,
602            dropped: false,
603        }) = result
604        {
605            buffer.take()
606        } else {
607            None
608        }
609    }
610
611    /// Wait for the write end of this `future` to be dropped.
612    ///
613    /// The [`Accessor`] provided can be acquired from
614    /// [`Instance::run_concurrent`] or from within a host function for example.
615    ///
616    /// # Panics
617    ///
618    /// Panics if the store that the [`Accessor`] is derived from does not own
619    /// this future.
620    pub async fn watch_writer(&mut self, accessor: impl AsAccessor) {
621        watch_writer(accessor, self.instance, self.id).await;
622    }
623
624    /// Convert this `FutureReader` into a [`Val`].
625    // See TODO comment for `FutureAny`; this is prone to handle leakage.
626    pub fn into_val(self) -> Val {
627        Val::Future(FutureAny(self.id.rep()))
628    }
629
630    /// Attempt to convert the specified [`Val`] to a `FutureReader`.
631    pub fn from_val(
632        mut store: impl AsContextMut<Data: Send>,
633        instance: Instance,
634        value: &Val,
635    ) -> Result<Self> {
636        let Val::Future(FutureAny(rep)) = value else {
637            bail!("expected `future`; got `{}`", value.desc());
638        };
639        let store = store.as_context_mut();
640        let id = TableId::<TransmitHandle>::new(*rep);
641        instance.concurrent_state_mut(store.0).get(id)?; // Just make sure it's present
642        Ok(Self::new(id, instance))
643    }
644
645    /// Transfer ownership of the read end of a future from a guest to the host.
646    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
647        match ty {
648            InterfaceType::Future(src) => {
649                let handle_table = cx
650                    .instance_mut()
651                    .table_for_transmit(TransmitIndex::Future(src));
652                let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
653                if is_done {
654                    bail!("cannot lift future after being notified that the writable end dropped");
655                }
656                let id = TableId::<TransmitHandle>::new(rep);
657                let concurrent_state = cx.instance_mut().concurrent_state_mut();
658                let future = concurrent_state.get_mut(id)?;
659                future.common.handle = None;
660                let state = future.state;
661
662                if concurrent_state.get(state)?.done {
663                    bail!("cannot lift future after previous read succeeded");
664                }
665
666                Ok(Self::new(id, cx.instance_handle()))
667            }
668            _ => func::bad_type_info(),
669        }
670    }
671
672    /// Close this `FutureReader`, writing the default value.
673    ///
674    /// # Panics
675    ///
676    /// Panics if the store that the [`Accessor`] is derived from does not own
677    /// this future. Usage of this future after calling `close` will also cause
678    /// a panic.
679    pub fn close(&mut self, mut store: impl AsContextMut) {
680        // `self` should never be used again, but leave an invalid handle there just in case.
681        let id = mem::replace(&mut self.id, TableId::new(0));
682        self.instance
683            .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Future)
684            .unwrap();
685    }
686
687    /// Convenience method around [`Self::close`].
688    pub fn close_with(&mut self, accessor: impl AsAccessor) {
689        accessor.as_accessor().with(|access| self.close(access))
690    }
691
692    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
693    /// drop and clean it up from the store.
694    ///
695    /// Note that the `accessor` provided must own this future and is
696    /// additionally transferred to the `GuardedFutureReader` return value.
697    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
698    where
699        A: AsAccessor,
700    {
701        GuardedFutureReader::new(accessor, self)
702    }
703}
704
705impl<T> fmt::Debug for FutureReader<T> {
706    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707        f.debug_struct("FutureReader")
708            .field("id", &self.id)
709            .field("instance", &self.instance)
710            .finish()
711    }
712}
713
714/// Transfer ownership of the read end of a future from the host to a guest.
715pub(crate) fn lower_future_to_index<U>(
716    rep: u32,
717    cx: &mut LowerContext<'_, U>,
718    ty: InterfaceType,
719) -> Result<u32> {
720    match ty {
721        InterfaceType::Future(dst) => {
722            let concurrent_state = cx.instance_mut().concurrent_state_mut();
723            let id = TableId::<TransmitHandle>::new(rep);
724            let state = concurrent_state.get(id)?.state;
725            let rep = concurrent_state.get(state)?.read_handle.rep();
726
727            let handle = cx
728                .instance_mut()
729                .table_for_transmit(TransmitIndex::Future(dst))
730                .future_insert_read(dst, rep)?;
731
732            cx.instance_mut()
733                .concurrent_state_mut()
734                .get_mut(id)?
735                .common
736                .handle = Some(handle);
737
738            Ok(handle)
739        }
740        _ => func::bad_type_info(),
741    }
742}
743
744// SAFETY: This relies on the `ComponentType` implementation for `u32` being
745// safe and correct since we lift and lower future handles as `u32`s.
746unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
747    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
748
749    type Lower = <u32 as func::ComponentType>::Lower;
750
751    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
752        match ty {
753            InterfaceType::Future(_) => Ok(()),
754            other => bail!("expected `future`, found `{}`", func::desc(other)),
755        }
756    }
757}
758
759// SAFETY: See the comment on the `ComponentType` `impl` for this type.
760unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
761    fn linear_lower_to_flat<U>(
762        &self,
763        cx: &mut LowerContext<'_, U>,
764        ty: InterfaceType,
765        dst: &mut MaybeUninit<Self::Lower>,
766    ) -> Result<()> {
767        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
768            cx,
769            InterfaceType::U32,
770            dst,
771        )
772    }
773
774    fn linear_lower_to_memory<U>(
775        &self,
776        cx: &mut LowerContext<'_, U>,
777        ty: InterfaceType,
778        offset: usize,
779    ) -> Result<()> {
780        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
781            cx,
782            InterfaceType::U32,
783            offset,
784        )
785    }
786}
787
788// SAFETY: See the comment on the `ComponentType` `impl` for this type.
789unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
790    fn linear_lift_from_flat(
791        cx: &mut LiftContext<'_>,
792        ty: InterfaceType,
793        src: &Self::Lower,
794    ) -> Result<Self> {
795        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
796        Self::lift_from_index(cx, ty, index)
797    }
798
799    fn linear_lift_from_memory(
800        cx: &mut LiftContext<'_>,
801        ty: InterfaceType,
802        bytes: &[u8],
803    ) -> Result<Self> {
804        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
805        Self::lift_from_index(cx, ty, index)
806    }
807}
808
809/// A [`FutureReader`] paired with an [`Accessor`].
810///
811/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
812/// when dropped. This can be created through [`GuardedFutureReader::new`] or
813/// [`FutureReader::guard`].
814pub struct GuardedFutureReader<T, A>
815where
816    A: AsAccessor,
817{
818    // This field is `None` to implement the conversion from this guard back to
819    // `FutureReader`. When `None` is seen in the destructor it will cause the
820    // destructor to do nothing.
821    reader: Option<FutureReader<T>>,
822    accessor: A,
823}
824
825impl<T, A> GuardedFutureReader<T, A>
826where
827    A: AsAccessor,
828{
829    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
830    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
831        Self {
832            reader: Some(reader),
833            accessor,
834        }
835    }
836
837    /// Wrapper for [`FutureReader::read`].
838    pub async fn read(mut self) -> Option<T>
839    where
840        T: func::Lift + Send + 'static,
841    {
842        self.reader.as_mut().unwrap().read_(&self.accessor).await
843    }
844
845    /// Wrapper for [`FutureReader::watch_writer`].
846    pub async fn watch_writer(&mut self) {
847        self.reader
848            .as_mut()
849            .unwrap()
850            .watch_writer(&self.accessor)
851            .await
852    }
853
854    /// Extracts the underlying [`FutureReader`] from this guard, returning it
855    /// back.
856    pub fn into_future(self) -> FutureReader<T> {
857        self.into()
858    }
859}
860
861impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
862where
863    A: AsAccessor,
864{
865    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
866        guard.reader.take().unwrap()
867    }
868}
869
870impl<T, A> Drop for GuardedFutureReader<T, A>
871where
872    A: AsAccessor,
873{
874    fn drop(&mut self) {
875        if let Some(reader) = &mut self.reader {
876            reader.close_with(&self.accessor)
877        }
878    }
879}
880
881/// Represents the writable end of a Component Model `stream`.
882///
883/// Note that `StreamWriter` instances must be disposed of using `close`;
884/// otherwise the in-store representation will leak and the reader end will hang
885/// indefinitely.  Consider using [`GuardedStreamWriter`] to ensure that
886/// disposal happens automatically.
887pub struct StreamWriter<T> {
888    instance: Instance,
889    id: TableId<TransmitHandle>,
890    closed: bool,
891    _phantom: PhantomData<T>,
892}
893
894impl<T> StreamWriter<T> {
895    fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
896        Self {
897            instance,
898            id,
899            closed: false,
900            _phantom: PhantomData,
901        }
902    }
903
904    /// Returns whether this stream is "closed" meaning that the other end of
905    /// the stream has been dropped.
906    pub fn is_closed(&self) -> bool {
907        self.closed
908    }
909
910    /// Write the specified items to the `stream`.
911    ///
912    /// Note that this will only write as many items as the reader accepts
913    /// during its current or next read.  Use `write_all` to loop until the
914    /// buffer is drained or the read end is dropped.
915    ///
916    /// The returned `Future` will yield the input buffer back,
917    /// possibly consuming a subset of the items or nothing depending on the
918    /// number of items the reader accepted.
919    ///
920    /// The [`is_closed`](Self::is_closed) method can be used to determine
921    /// whether the stream was learned to be closed after this operation completes.
922    ///
923    /// # Panics
924    ///
925    /// Panics if the store that the [`Accessor`] is derived from does not own
926    /// this future.
927    pub async fn write<B>(&mut self, accessor: impl AsAccessor, buffer: B) -> B
928    where
929        T: func::Lower + 'static,
930        B: WriteBuffer<T>,
931    {
932        let result = self
933            .instance
934            .host_write_async(
935                accessor.as_accessor(),
936                self.id,
937                buffer,
938                TransmitKind::Stream,
939            )
940            .await;
941
942        match result {
943            Ok(HostResult { buffer, dropped }) => {
944                if self.closed {
945                    debug_assert!(dropped);
946                }
947                self.closed = dropped;
948                buffer
949            }
950            Err(_) => todo!("guarantee buffer recovery if `host_write` fails"),
951        }
952    }
953
954    /// Write the specified values until either the buffer is drained or the
955    /// read end is dropped.
956    ///
957    /// The buffer is returned back to the caller and may still contain items
958    /// within it if the other end of this stream was dropped. Use the
959    /// [`is_closed`](Self::is_closed) method to determine if the other end is
960    /// dropped.
961    ///
962    /// # Panics
963    ///
964    /// Panics if the store that the [`Accessor`] is derived from does not own
965    /// this future.
966    pub async fn write_all<B>(&mut self, accessor: impl AsAccessor, mut buffer: B) -> B
967    where
968        T: func::Lower + 'static,
969        B: WriteBuffer<T>,
970    {
971        let accessor = accessor.as_accessor();
972        while !self.is_closed() && buffer.remaining().len() > 0 {
973            buffer = self.write(accessor, buffer).await;
974        }
975        buffer
976    }
977
978    /// Wait for the read end of this `stream` to be dropped.
979    ///
980    /// # Panics
981    ///
982    /// Panics if the store that the [`Accessor`] is derived from does not own
983    /// this future.
984    pub async fn watch_reader(&mut self, accessor: impl AsAccessor) {
985        watch_reader(accessor, self.instance, self.id).await
986    }
987
988    /// Close this `StreamWriter`, writing the default value.
989    ///
990    /// # Panics
991    ///
992    /// Panics if the store that the [`Accessor`] is derived from does not own
993    /// this future. Usage of this future after calling `close` will also cause
994    /// a panic.
995    pub fn close(&mut self, mut store: impl AsContextMut) {
996        // `self` should never be used again, but leave an invalid handle there just in case.
997        let id = mem::replace(&mut self.id, TableId::new(0));
998        self.instance
999            .host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>)
1000            .unwrap()
1001    }
1002
1003    /// Convenience method around [`Self::close`].
1004    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1005        accessor.as_accessor().with(|access| self.close(access))
1006    }
1007
1008    /// Returns a [`GuardedStreamWriter`] which will auto-close this stream on
1009    /// drop and clean it up from the store.
1010    ///
1011    /// Note that the `accessor` provided must own this future and is
1012    /// additionally transferred to the `GuardedStreamWriter` return value.
1013    pub fn guard<A>(self, accessor: A) -> GuardedStreamWriter<T, A>
1014    where
1015        A: AsAccessor,
1016    {
1017        GuardedStreamWriter::new(accessor, self)
1018    }
1019}
1020
1021/// A [`StreamWriter`] paired with an [`Accessor`].
1022///
1023/// This is an RAII wrapper around [`StreamWriter`] that ensures it is closed
1024/// when dropped. This can be created through [`GuardedStreamWriter::new`] or
1025/// [`StreamWriter::guard`].
1026pub struct GuardedStreamWriter<T, A>
1027where
1028    A: AsAccessor,
1029{
1030    // This field is `None` to implement the conversion from this guard back to
1031    // `StreamWriter`. When `None` is seen in the destructor it will cause the
1032    // destructor to do nothing.
1033    writer: Option<StreamWriter<T>>,
1034    accessor: A,
1035}
1036
1037impl<T, A> GuardedStreamWriter<T, A>
1038where
1039    A: AsAccessor,
1040{
1041    /// Create a new `GuardedStreamWriter` with the specified `accessor` and `writer`.
1042    pub fn new(accessor: A, writer: StreamWriter<T>) -> Self {
1043        Self {
1044            writer: Some(writer),
1045            accessor,
1046        }
1047    }
1048
1049    /// Wrapper for [`StreamWriter::is_closed`].
1050    pub fn is_closed(&self) -> bool {
1051        self.writer.as_ref().unwrap().is_closed()
1052    }
1053
1054    /// Wrapper for [`StreamWriter::write`].
1055    pub async fn write<B>(&mut self, buffer: B) -> B
1056    where
1057        T: func::Lower + 'static,
1058        B: WriteBuffer<T>,
1059    {
1060        self.writer
1061            .as_mut()
1062            .unwrap()
1063            .write(&self.accessor, buffer)
1064            .await
1065    }
1066
1067    /// Wrapper for [`StreamWriter::write_all`].
1068    pub async fn write_all<B>(&mut self, buffer: B) -> B
1069    where
1070        T: func::Lower + 'static,
1071        B: WriteBuffer<T>,
1072    {
1073        self.writer
1074            .as_mut()
1075            .unwrap()
1076            .write_all(&self.accessor, buffer)
1077            .await
1078    }
1079
1080    /// Wrapper for [`StreamWriter::watch_reader`].
1081    pub async fn watch_reader(&mut self) {
1082        self.writer
1083            .as_mut()
1084            .unwrap()
1085            .watch_reader(&self.accessor)
1086            .await
1087    }
1088
1089    /// Extracts the underlying [`StreamWriter`] from this guard, returning it
1090    /// back.
1091    pub fn into_stream(self) -> StreamWriter<T> {
1092        self.into()
1093    }
1094}
1095
1096impl<T, A> From<GuardedStreamWriter<T, A>> for StreamWriter<T>
1097where
1098    A: AsAccessor,
1099{
1100    fn from(mut guard: GuardedStreamWriter<T, A>) -> Self {
1101        guard.writer.take().unwrap()
1102    }
1103}
1104
1105impl<T, A> Drop for GuardedStreamWriter<T, A>
1106where
1107    A: AsAccessor,
1108{
1109    fn drop(&mut self) {
1110        if let Some(writer) = &mut self.writer {
1111            writer.close_with(&self.accessor)
1112        }
1113    }
1114}
1115
1116/// Represents the readable end of a Component Model `stream`.
1117///
1118/// Note that `StreamReader` instances must be disposed of using `close`;
1119/// otherwise the in-store representation will leak and the writer end will hang
1120/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1121/// disposal happens automatically.
1122pub struct StreamReader<T> {
1123    instance: Instance,
1124    id: TableId<TransmitHandle>,
1125    closed: bool,
1126    _phantom: PhantomData<T>,
1127}
1128
1129impl<T> StreamReader<T> {
1130    fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1131        Self {
1132            instance,
1133            id,
1134            closed: false,
1135            _phantom: PhantomData,
1136        }
1137    }
1138
1139    /// Returns whether this stream is "closed" meaning that the other end of
1140    /// the stream has been dropped.
1141    pub fn is_closed(&self) -> bool {
1142        self.closed
1143    }
1144
1145    /// Read values from this `stream`.
1146    ///
1147    /// The returned `Future` will yield a `(Some(_), _)` if the read completed
1148    /// (possibly with zero items if the write was empty).  It will return
1149    /// `(None, _)` if the read failed due to the closure of the write end. In
1150    /// either case, the returned buffer will be the same one passed as a
1151    /// parameter, with zero or more items added.
1152    ///
1153    /// # Panics
1154    ///
1155    /// Panics if the store that the [`Accessor`] is derived from does not own
1156    /// this future.
1157    pub async fn read<B>(&mut self, accessor: impl AsAccessor, buffer: B) -> B
1158    where
1159        T: func::Lift + 'static,
1160        B: ReadBuffer<T> + Send + 'static,
1161    {
1162        let result = self
1163            .instance
1164            .host_read_async(
1165                accessor.as_accessor(),
1166                self.id,
1167                buffer,
1168                TransmitKind::Stream,
1169            )
1170            .await;
1171
1172        match result {
1173            Ok(HostResult { buffer, dropped }) => {
1174                if self.closed {
1175                    debug_assert!(dropped);
1176                }
1177                self.closed = dropped;
1178                buffer
1179            }
1180            Err(_) => {
1181                todo!("guarantee buffer recovery if `host_read` fails")
1182            }
1183        }
1184    }
1185
1186    /// Wait until the write end of this `stream` is dropped.
1187    ///
1188    /// # Panics
1189    ///
1190    /// Panics if the store that the [`Accessor`] is derived from does not own
1191    /// this future.
1192    pub async fn watch_writer(&mut self, accessor: impl AsAccessor) {
1193        watch_writer(accessor, self.instance, self.id).await
1194    }
1195
1196    /// Convert this `StreamReader` into a [`Val`].
1197    // See TODO comment for `StreamAny`; this is prone to handle leakage.
1198    pub fn into_val(self) -> Val {
1199        Val::Stream(StreamAny(self.id.rep()))
1200    }
1201
1202    /// Attempt to convert the specified [`Val`] to a `StreamReader`.
1203    pub fn from_val(
1204        mut store: impl AsContextMut<Data: Send>,
1205        instance: Instance,
1206        value: &Val,
1207    ) -> Result<Self> {
1208        let Val::Stream(StreamAny(rep)) = value else {
1209            bail!("expected `stream`; got `{}`", value.desc());
1210        };
1211        let store = store.as_context_mut();
1212        let id = TableId::<TransmitHandle>::new(*rep);
1213        instance.concurrent_state_mut(store.0).get(id)?; // Just make sure it's present
1214        Ok(Self::new(id, instance))
1215    }
1216
1217    /// Transfer ownership of the read end of a stream from a guest to the host.
1218    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1219        match ty {
1220            InterfaceType::Stream(src) => {
1221                let handle_table = cx
1222                    .instance_mut()
1223                    .table_for_transmit(TransmitIndex::Stream(src));
1224                let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1225                if is_done {
1226                    bail!("cannot lift stream after being notified that the writable end dropped");
1227                }
1228                let id = TableId::<TransmitHandle>::new(rep);
1229                cx.instance_mut()
1230                    .concurrent_state_mut()
1231                    .get_mut(id)?
1232                    .common
1233                    .handle = None;
1234                Ok(Self::new(id, cx.instance_handle()))
1235            }
1236            _ => func::bad_type_info(),
1237        }
1238    }
1239
1240    /// Close this `StreamReader`, writing the default value.
1241    ///
1242    /// # Panics
1243    ///
1244    /// Panics if the store that the [`Accessor`] is derived from does not own
1245    /// this future. Usage of this future after calling `close` will also cause
1246    /// a panic.
1247    pub fn close(&mut self, mut store: impl AsContextMut) {
1248        // `self` should never be used again, but leave an invalid handle there just in case.
1249        let id = mem::replace(&mut self.id, TableId::new(0));
1250        self.instance
1251            .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Stream)
1252            .unwrap()
1253    }
1254
1255    /// Convenience method around [`Self::close`].
1256    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1257        accessor.as_accessor().with(|access| self.close(access))
1258    }
1259
1260    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1261    /// drop and clean it up from the store.
1262    ///
1263    /// Note that the `accessor` provided must own this future and is
1264    /// additionally transferred to the `GuardedStreamReader` return value.
1265    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1266    where
1267        A: AsAccessor,
1268    {
1269        GuardedStreamReader::new(accessor, self)
1270    }
1271}
1272
1273impl<T> fmt::Debug for StreamReader<T> {
1274    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1275        f.debug_struct("StreamReader")
1276            .field("id", &self.id)
1277            .field("instance", &self.instance)
1278            .finish()
1279    }
1280}
1281
1282/// Transfer ownership of the read end of a stream from the host to a guest.
1283pub(crate) fn lower_stream_to_index<U>(
1284    rep: u32,
1285    cx: &mut LowerContext<'_, U>,
1286    ty: InterfaceType,
1287) -> Result<u32> {
1288    match ty {
1289        InterfaceType::Stream(dst) => {
1290            let concurrent_state = cx.instance_mut().concurrent_state_mut();
1291            let id = TableId::<TransmitHandle>::new(rep);
1292            let state = concurrent_state.get(id)?.state;
1293            let rep = concurrent_state.get(state)?.read_handle.rep();
1294
1295            let handle = cx
1296                .instance_mut()
1297                .table_for_transmit(TransmitIndex::Stream(dst))
1298                .stream_insert_read(dst, rep)?;
1299
1300            cx.instance_mut()
1301                .concurrent_state_mut()
1302                .get_mut(id)?
1303                .common
1304                .handle = Some(handle);
1305
1306            Ok(handle)
1307        }
1308        _ => func::bad_type_info(),
1309    }
1310}
1311
1312// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1313// safe and correct since we lift and lower stream handles as `u32`s.
1314unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1315    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1316
1317    type Lower = <u32 as func::ComponentType>::Lower;
1318
1319    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1320        match ty {
1321            InterfaceType::Stream(_) => Ok(()),
1322            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1323        }
1324    }
1325}
1326
1327// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1328unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1329    fn linear_lower_to_flat<U>(
1330        &self,
1331        cx: &mut LowerContext<'_, U>,
1332        ty: InterfaceType,
1333        dst: &mut MaybeUninit<Self::Lower>,
1334    ) -> Result<()> {
1335        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1336            cx,
1337            InterfaceType::U32,
1338            dst,
1339        )
1340    }
1341
1342    fn linear_lower_to_memory<U>(
1343        &self,
1344        cx: &mut LowerContext<'_, U>,
1345        ty: InterfaceType,
1346        offset: usize,
1347    ) -> Result<()> {
1348        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1349            cx,
1350            InterfaceType::U32,
1351            offset,
1352        )
1353    }
1354}
1355
1356// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1357unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1358    fn linear_lift_from_flat(
1359        cx: &mut LiftContext<'_>,
1360        ty: InterfaceType,
1361        src: &Self::Lower,
1362    ) -> Result<Self> {
1363        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1364        Self::lift_from_index(cx, ty, index)
1365    }
1366
1367    fn linear_lift_from_memory(
1368        cx: &mut LiftContext<'_>,
1369        ty: InterfaceType,
1370        bytes: &[u8],
1371    ) -> Result<Self> {
1372        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1373        Self::lift_from_index(cx, ty, index)
1374    }
1375}
1376
1377/// A [`StreamReader`] paired with an [`Accessor`].
1378///
1379/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1380/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1381/// [`StreamReader::guard`].
1382pub struct GuardedStreamReader<T, A>
1383where
1384    A: AsAccessor,
1385{
1386    // This field is `None` to implement the conversion from this guard back to
1387    // `StreamReader`. When `None` is seen in the destructor it will cause the
1388    // destructor to do nothing.
1389    reader: Option<StreamReader<T>>,
1390    accessor: A,
1391}
1392
1393impl<T, A> GuardedStreamReader<T, A>
1394where
1395    A: AsAccessor,
1396{
1397    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1398    /// `reader`.
1399    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1400        Self {
1401            reader: Some(reader),
1402            accessor,
1403        }
1404    }
1405
1406    /// Wrapper for `StreamReader::is_closed`
1407    pub fn is_closed(&self) -> bool {
1408        self.reader.as_ref().unwrap().is_closed()
1409    }
1410
1411    /// Wrapper for `StreamReader::read`.
1412    pub async fn read<B>(&mut self, buffer: B) -> B
1413    where
1414        T: func::Lift + 'static,
1415        B: ReadBuffer<T> + Send + 'static,
1416    {
1417        self.reader
1418            .as_mut()
1419            .unwrap()
1420            .read(&self.accessor, buffer)
1421            .await
1422    }
1423
1424    /// Wrapper for `StreamReader::watch_writer`.
1425    pub async fn watch_writer(&mut self) {
1426        self.reader
1427            .as_mut()
1428            .unwrap()
1429            .watch_writer(&self.accessor)
1430            .await
1431    }
1432
1433    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1434    /// back.
1435    pub fn into_stream(self) -> StreamReader<T> {
1436        self.into()
1437    }
1438}
1439
1440impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1441where
1442    A: AsAccessor,
1443{
1444    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1445        guard.reader.take().unwrap()
1446    }
1447}
1448
1449impl<T, A> Drop for GuardedStreamReader<T, A>
1450where
1451    A: AsAccessor,
1452{
1453    fn drop(&mut self) {
1454        if let Some(reader) = &mut self.reader {
1455            reader.close_with(&self.accessor)
1456        }
1457    }
1458}
1459
1460/// Represents a Component Model `error-context`.
1461pub struct ErrorContext {
1462    rep: u32,
1463}
1464
1465impl ErrorContext {
1466    pub(crate) fn new(rep: u32) -> Self {
1467        Self { rep }
1468    }
1469
1470    /// Convert this `ErrorContext` into a [`Val`].
1471    pub fn into_val(self) -> Val {
1472        Val::ErrorContext(ErrorContextAny(self.rep))
1473    }
1474
1475    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1476    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1477        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1478            bail!("expected `error-context`; got `{}`", value.desc());
1479        };
1480        Ok(Self::new(*rep))
1481    }
1482
1483    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1484        match ty {
1485            InterfaceType::ErrorContext(src) => {
1486                let rep = cx
1487                    .instance_mut()
1488                    .table_for_error_context(src)
1489                    .error_context_rep(index)?;
1490
1491                Ok(Self { rep })
1492            }
1493            _ => func::bad_type_info(),
1494        }
1495    }
1496}
1497
1498pub(crate) fn lower_error_context_to_index<U>(
1499    rep: u32,
1500    cx: &mut LowerContext<'_, U>,
1501    ty: InterfaceType,
1502) -> Result<u32> {
1503    match ty {
1504        InterfaceType::ErrorContext(dst) => {
1505            let tbl = cx.instance_mut().table_for_error_context(dst);
1506            tbl.error_context_insert(rep)
1507        }
1508        _ => func::bad_type_info(),
1509    }
1510}
1511// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1512// safe and correct since we lift and lower future handles as `u32`s.
1513unsafe impl func::ComponentType for ErrorContext {
1514    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1515
1516    type Lower = <u32 as func::ComponentType>::Lower;
1517
1518    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1519        match ty {
1520            InterfaceType::ErrorContext(_) => Ok(()),
1521            other => bail!("expected `error`, found `{}`", func::desc(other)),
1522        }
1523    }
1524}
1525
1526// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1527unsafe impl func::Lower for ErrorContext {
1528    fn linear_lower_to_flat<T>(
1529        &self,
1530        cx: &mut LowerContext<'_, T>,
1531        ty: InterfaceType,
1532        dst: &mut MaybeUninit<Self::Lower>,
1533    ) -> Result<()> {
1534        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1535            cx,
1536            InterfaceType::U32,
1537            dst,
1538        )
1539    }
1540
1541    fn linear_lower_to_memory<T>(
1542        &self,
1543        cx: &mut LowerContext<'_, T>,
1544        ty: InterfaceType,
1545        offset: usize,
1546    ) -> Result<()> {
1547        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1548            cx,
1549            InterfaceType::U32,
1550            offset,
1551        )
1552    }
1553}
1554
1555// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1556unsafe impl func::Lift for ErrorContext {
1557    fn linear_lift_from_flat(
1558        cx: &mut LiftContext<'_>,
1559        ty: InterfaceType,
1560        src: &Self::Lower,
1561    ) -> Result<Self> {
1562        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1563        Self::lift_from_index(cx, ty, index)
1564    }
1565
1566    fn linear_lift_from_memory(
1567        cx: &mut LiftContext<'_>,
1568        ty: InterfaceType,
1569        bytes: &[u8],
1570    ) -> Result<Self> {
1571        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1572        Self::lift_from_index(cx, ty, index)
1573    }
1574}
1575
1576/// Represents the read or write end of a stream or future.
1577pub(super) struct TransmitHandle {
1578    pub(super) common: WaitableCommon,
1579    /// See `TransmitState`
1580    state: TableId<TransmitState>,
1581}
1582
1583impl TransmitHandle {
1584    fn new(state: TableId<TransmitState>) -> Self {
1585        Self {
1586            common: WaitableCommon::default(),
1587            state,
1588        }
1589    }
1590}
1591
1592impl TableDebug for TransmitHandle {
1593    fn type_name() -> &'static str {
1594        "TransmitHandle"
1595    }
1596}
1597
1598/// Represents the state of a stream or future.
1599struct TransmitState {
1600    /// The write end of the stream or future.
1601    write_handle: TableId<TransmitHandle>,
1602    /// The read end of the stream or future.
1603    read_handle: TableId<TransmitHandle>,
1604    /// See `WriteState`
1605    write: WriteState,
1606    /// See `ReadState`
1607    read: ReadState,
1608    /// The `Waker`, if any, to be woken when the write end of the stream or
1609    /// future is dropped.
1610    ///
1611    /// This will signal to the host-owned read end that the write end has been
1612    /// dropped.
1613    writer_watcher: Option<Waker>,
1614    /// Like `writer_watcher`, but for the reverse direction.
1615    reader_watcher: Option<Waker>,
1616    /// Whether futher values may be transmitted via this stream or future.
1617    done: bool,
1618}
1619
1620impl Default for TransmitState {
1621    fn default() -> Self {
1622        Self {
1623            write_handle: TableId::new(0),
1624            read_handle: TableId::new(0),
1625            read: ReadState::Open,
1626            write: WriteState::Open,
1627            reader_watcher: None,
1628            writer_watcher: None,
1629            done: false,
1630        }
1631    }
1632}
1633
1634impl TableDebug for TransmitState {
1635    fn type_name() -> &'static str {
1636        "TransmitState"
1637    }
1638}
1639
1640/// Represents the state of the write end of a stream or future.
1641enum WriteState {
1642    /// The write end is open, but no write is pending.
1643    Open,
1644    /// The write end is owned by a guest task and a write is pending.
1645    GuestReady {
1646        ty: TransmitIndex,
1647        flat_abi: Option<FlatAbi>,
1648        options: Options,
1649        address: usize,
1650        count: usize,
1651        handle: u32,
1652        post_write: PostWrite,
1653    },
1654    /// The write end is owned by a host task and a write is pending.
1655    HostReady {
1656        accept:
1657            Box<dyn FnOnce(&mut dyn VMStore, Instance, Reader) -> Result<ReturnCode> + Send + Sync>,
1658        post_write: PostWrite,
1659    },
1660    /// The write end has been dropped.
1661    Dropped,
1662}
1663
1664impl fmt::Debug for WriteState {
1665    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1666        match self {
1667            Self::Open => f.debug_tuple("Open").finish(),
1668            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1669            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1670            Self::Dropped => f.debug_tuple("Dropped").finish(),
1671        }
1672    }
1673}
1674
1675/// Represents the state of the read end of a stream or future.
1676enum ReadState {
1677    /// The read end is open, but no read is pending.
1678    Open,
1679    /// The read end is owned by a guest task and a read is pending.
1680    GuestReady {
1681        ty: TransmitIndex,
1682        flat_abi: Option<FlatAbi>,
1683        options: Options,
1684        address: usize,
1685        count: usize,
1686        handle: u32,
1687    },
1688    /// The read end is owned by a host task and a read is pending.
1689    HostReady {
1690        accept: Box<dyn FnOnce(Writer) -> Result<ReturnCode> + Send + Sync>,
1691    },
1692    /// The read end has been dropped.
1693    Dropped,
1694}
1695
1696impl fmt::Debug for ReadState {
1697    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1698        match self {
1699            Self::Open => f.debug_tuple("Open").finish(),
1700            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1701            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1702            Self::Dropped => f.debug_tuple("Dropped").finish(),
1703        }
1704    }
1705}
1706
1707/// Parameter type to pass to a `ReadState::HostReady` closure.
1708///
1709/// See also `accept_writer`.
1710enum Writer<'a> {
1711    /// The write end is owned by a guest task.
1712    Guest {
1713        lift: &'a mut LiftContext<'a>,
1714        ty: Option<InterfaceType>,
1715        address: usize,
1716        count: usize,
1717    },
1718    /// The write end is owned by the host.
1719    Host {
1720        buffer: &'a mut UntypedWriteBuffer<'a>,
1721        count: usize,
1722    },
1723    /// The write end has been dropped.
1724    End,
1725}
1726
1727/// Parameter type to pass to a `WriteState::HostReady` closure.
1728///
1729/// See also `accept_reader`.
1730enum Reader<'a> {
1731    /// The read end is owned by a guest task.
1732    Guest {
1733        options: &'a Options,
1734        ty: TransmitIndex,
1735        address: usize,
1736        count: usize,
1737    },
1738    /// The read end is owned by the host.
1739    Host {
1740        accept: Box<dyn FnOnce(&mut UntypedWriteBuffer, usize) -> usize + 'a>,
1741    },
1742    /// The read end has been dropped.
1743    End,
1744}
1745
1746impl Instance {
1747    /// Create a new Component Model `future` as pair of writable and readable ends,
1748    /// the latter of which may be passed to guest code.
1749    ///
1750    /// `default` is a callback to be used if the writable end of the future is
1751    /// closed without having written a value.  You may supply e.g. `||
1752    /// unreachable!()` if you're sure that won't happen.
1753    pub fn future<T: func::Lower + func::Lift + Send + Sync + 'static>(
1754        self,
1755        mut store: impl AsContextMut,
1756        default: fn() -> T,
1757    ) -> Result<(FutureWriter<T>, FutureReader<T>)> {
1758        let (write, read) = self
1759            .concurrent_state_mut(store.as_context_mut().0)
1760            .new_transmit()?;
1761
1762        Ok((
1763            FutureWriter::new(default, write, self),
1764            FutureReader::new(read, self),
1765        ))
1766    }
1767
1768    /// Create a new Component Model `stream` as pair of writable and readable ends,
1769    /// the latter of which may be passed to guest code.
1770    pub fn stream<T: func::Lower + func::Lift + Send + 'static>(
1771        self,
1772        mut store: impl AsContextMut,
1773    ) -> Result<(StreamWriter<T>, StreamReader<T>)> {
1774        let (write, read) = self
1775            .concurrent_state_mut(store.as_context_mut().0)
1776            .new_transmit()?;
1777
1778        Ok((
1779            StreamWriter::new(write, self),
1780            StreamReader::new(read, self),
1781        ))
1782    }
1783
1784    /// Write to the specified stream or future from the host.
1785    fn host_write<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U>(
1786        self,
1787        mut store: StoreContextMut<U>,
1788        id: TableId<TransmitHandle>,
1789        mut buffer: B,
1790        kind: TransmitKind,
1791        post_write: PostWrite,
1792    ) -> Result<Result<HostResult<B>, oneshot::Receiver<HostResult<B>>>> {
1793        let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
1794        let transmit = self
1795            .concurrent_state_mut(store.0)
1796            .get_mut(transmit_id)
1797            .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?;
1798        log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read);
1799
1800        let new_state = if let ReadState::Dropped = &transmit.read {
1801            ReadState::Dropped
1802        } else {
1803            ReadState::Open
1804        };
1805
1806        if matches!(post_write, PostWrite::Drop) && !matches!(transmit.read, ReadState::Open) {
1807            transmit.write = WriteState::Dropped;
1808        }
1809
1810        Ok(match mem::replace(&mut transmit.read, new_state) {
1811            ReadState::Open => {
1812                assert!(matches!(&transmit.write, WriteState::Open));
1813
1814                let token = StoreToken::new(store.as_context_mut());
1815                let (tx, rx) = oneshot::channel();
1816                let state = WriteState::HostReady {
1817                    accept: Box::new(move |store, instance, reader| {
1818                        let (result, code) = accept_reader::<T, B, U>(
1819                            token.as_context_mut(store),
1820                            instance,
1821                            reader,
1822                            buffer,
1823                            kind,
1824                        )?;
1825                        _ = tx.send(result);
1826                        Ok(code)
1827                    }),
1828                    post_write,
1829                };
1830                self.concurrent_state_mut(store.0)
1831                    .get_mut(transmit_id)?
1832                    .write = state;
1833
1834                Err(rx)
1835            }
1836
1837            ReadState::GuestReady {
1838                ty,
1839                flat_abi: _,
1840                options,
1841                address,
1842                count,
1843                handle,
1844                ..
1845            } => {
1846                if let TransmitKind::Future = kind {
1847                    transmit.done = true;
1848                }
1849
1850                let read_handle = transmit.read_handle;
1851                let accept = move |mut store: StoreContextMut<U>| {
1852                    let (result, code) = accept_reader::<T, B, U>(
1853                        store.as_context_mut(),
1854                        self,
1855                        Reader::Guest {
1856                            options: &options,
1857                            ty,
1858                            address,
1859                            count,
1860                        },
1861                        buffer,
1862                        kind,
1863                    )?;
1864
1865                    self.concurrent_state_mut(store.0).set_event(
1866                        read_handle.rep(),
1867                        match ty {
1868                            TransmitIndex::Future(ty) => Event::FutureRead {
1869                                code,
1870                                pending: Some((ty, handle)),
1871                            },
1872                            TransmitIndex::Stream(ty) => Event::StreamRead {
1873                                code,
1874                                pending: Some((ty, handle)),
1875                            },
1876                        },
1877                    )?;
1878
1879                    anyhow::Ok(result)
1880                };
1881
1882                if T::MAY_REQUIRE_REALLOC {
1883                    // For payloads which may require a realloc call, use a
1884                    // oneshot::channel and background task.  This is necessary
1885                    // because calling the guest while there are host embedder
1886                    // frames on the stack is unsound.
1887                    let (tx, rx) = oneshot::channel();
1888                    let token = StoreToken::new(store.as_context_mut());
1889                    self.concurrent_state_mut(store.0).push_high_priority(
1890                        WorkItem::WorkerFunction(Mutex::new(Box::new(move |store, _| {
1891                            _ = tx.send(accept(token.as_context_mut(store))?);
1892                            Ok(())
1893                        }))),
1894                    );
1895                    Err(rx)
1896                } else {
1897                    // Optimize flat payloads (i.e. those which do not require
1898                    // calling the guest's realloc function) by lowering
1899                    // directly instead of using a oneshot::channel and
1900                    // background task.
1901                    Ok(accept(store)?)
1902                }
1903            }
1904
1905            ReadState::HostReady { accept } => {
1906                let count = buffer.remaining().len();
1907                let mut untyped = UntypedWriteBuffer::new(&mut buffer);
1908                let code = accept(Writer::Host {
1909                    buffer: &mut untyped,
1910                    count,
1911                })?;
1912                let (ReturnCode::Completed(_) | ReturnCode::Dropped(_)) = code else {
1913                    unreachable!()
1914                };
1915
1916                Ok(HostResult {
1917                    buffer,
1918                    dropped: false,
1919                })
1920            }
1921
1922            ReadState::Dropped => Ok(HostResult {
1923                buffer,
1924                dropped: true,
1925            }),
1926        })
1927    }
1928
1929    /// Async wrapper around `Self::host_write`.
1930    async fn host_write_async<T: func::Lower + Send + 'static, B: WriteBuffer<T>>(
1931        self,
1932        accessor: impl AsAccessor,
1933        id: TableId<TransmitHandle>,
1934        buffer: B,
1935        kind: TransmitKind,
1936    ) -> Result<HostResult<B>> {
1937        match accessor.as_accessor().with(move |mut access| {
1938            self.host_write(
1939                access.as_context_mut(),
1940                id,
1941                buffer,
1942                kind,
1943                PostWrite::Continue,
1944            )
1945        })? {
1946            Ok(result) => Ok(result),
1947            Err(rx) => Ok(rx.await?),
1948        }
1949    }
1950
1951    /// Read from the specified stream or future from the host.
1952    fn host_read<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
1953        self,
1954        store: StoreContextMut<U>,
1955        id: TableId<TransmitHandle>,
1956        mut buffer: B,
1957        kind: TransmitKind,
1958    ) -> Result<Result<HostResult<B>, oneshot::Receiver<HostResult<B>>>> {
1959        let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
1960        let transmit = self
1961            .concurrent_state_mut(store.0)
1962            .get_mut(transmit_id)
1963            .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?;
1964        log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write);
1965
1966        let new_state = if let WriteState::Dropped = &transmit.write {
1967            WriteState::Dropped
1968        } else {
1969            WriteState::Open
1970        };
1971
1972        Ok(match mem::replace(&mut transmit.write, new_state) {
1973            WriteState::Open => {
1974                assert!(matches!(&transmit.read, ReadState::Open));
1975
1976                let (tx, rx) = oneshot::channel();
1977                transmit.read = ReadState::HostReady {
1978                    accept: Box::new(move |writer| {
1979                        let (result, code) = accept_writer::<T, B, U>(writer, buffer, kind)?;
1980                        _ = tx.send(result);
1981                        Ok(code)
1982                    }),
1983                };
1984
1985                Err(rx)
1986            }
1987
1988            WriteState::GuestReady {
1989                ty,
1990                flat_abi: _,
1991                options,
1992                address,
1993                count,
1994                handle,
1995                post_write,
1996                ..
1997            } => {
1998                if let TransmitIndex::Future(_) = ty {
1999                    transmit.done = true;
2000                }
2001
2002                let write_handle = transmit.write_handle;
2003                let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self);
2004                let (result, code) = accept_writer::<T, B, U>(
2005                    Writer::Guest {
2006                        ty: payload(ty, lift.types),
2007                        lift,
2008                        address,
2009                        count,
2010                    },
2011                    buffer,
2012                    kind,
2013                )?;
2014
2015                let state = self.concurrent_state_mut(store.0);
2016                let pending = if let PostWrite::Drop = post_write {
2017                    state.get_mut(transmit_id)?.write = WriteState::Dropped;
2018                    false
2019                } else {
2020                    true
2021                };
2022
2023                state.set_event(
2024                    write_handle.rep(),
2025                    match ty {
2026                        TransmitIndex::Future(ty) => Event::FutureWrite {
2027                            code,
2028                            pending: pending.then_some((ty, handle)),
2029                        },
2030                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2031                            code,
2032                            pending: pending.then_some((ty, handle)),
2033                        },
2034                    },
2035                )?;
2036
2037                Ok(result)
2038            }
2039
2040            WriteState::HostReady { accept, post_write } => {
2041                accept(
2042                    store.0,
2043                    self,
2044                    Reader::Host {
2045                        accept: Box::new(|input, count| {
2046                            let count = count.min(buffer.remaining_capacity());
2047                            buffer.move_from(input.get_mut::<T>(), count);
2048                            count
2049                        }),
2050                    },
2051                )?;
2052
2053                if let PostWrite::Drop = post_write {
2054                    self.concurrent_state_mut(store.0)
2055                        .get_mut(transmit_id)?
2056                        .write = WriteState::Dropped;
2057                }
2058
2059                Ok(HostResult {
2060                    buffer,
2061                    dropped: false,
2062                })
2063            }
2064
2065            WriteState::Dropped => Ok(HostResult {
2066                buffer,
2067                dropped: true,
2068            }),
2069        })
2070    }
2071
2072    /// Async wrapper around `Self::host_read`.
2073    async fn host_read_async<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
2074        self,
2075        accessor: impl AsAccessor,
2076        id: TableId<TransmitHandle>,
2077        buffer: B,
2078        kind: TransmitKind,
2079    ) -> Result<HostResult<B>> {
2080        match accessor
2081            .as_accessor()
2082            .with(move |mut access| self.host_read(access.as_context_mut(), id, buffer, kind))?
2083        {
2084            Ok(result) => Ok(result),
2085            Err(rx) => Ok(rx.await?),
2086        }
2087    }
2088
2089    /// Drop the read end of a stream or future read from the host.
2090    fn host_drop_reader(
2091        self,
2092        store: &mut dyn VMStore,
2093        id: TableId<TransmitHandle>,
2094        kind: TransmitKind,
2095    ) -> Result<()> {
2096        let transmit_id = self.concurrent_state_mut(store).get(id)?.state;
2097        let state = self.concurrent_state_mut(store);
2098        let transmit = state
2099            .get_mut(transmit_id)
2100            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2101        log::trace!(
2102            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2103            transmit.read,
2104            transmit.write
2105        );
2106
2107        transmit.read = ReadState::Dropped;
2108        if let Some(waker) = transmit.reader_watcher.take() {
2109            waker.wake();
2110        }
2111
2112        // If the write end is already dropped, it should stay dropped,
2113        // otherwise, it should be opened.
2114        let new_state = if let WriteState::Dropped = &transmit.write {
2115            WriteState::Dropped
2116        } else {
2117            WriteState::Open
2118        };
2119
2120        let write_handle = transmit.write_handle;
2121
2122        match mem::replace(&mut transmit.write, new_state) {
2123            // If a guest is waiting to write, notify it that the read end has
2124            // been dropped.
2125            WriteState::GuestReady {
2126                ty,
2127                handle,
2128                post_write,
2129                ..
2130            } => {
2131                if let PostWrite::Drop = post_write {
2132                    state.delete_transmit(transmit_id)?;
2133                } else {
2134                    state.update_event(
2135                        write_handle.rep(),
2136                        match ty {
2137                            TransmitIndex::Future(ty) => Event::FutureWrite {
2138                                code: ReturnCode::Dropped(0),
2139                                pending: Some((ty, handle)),
2140                            },
2141                            TransmitIndex::Stream(ty) => Event::StreamWrite {
2142                                code: ReturnCode::Dropped(0),
2143                                pending: Some((ty, handle)),
2144                            },
2145                        },
2146                    )?;
2147                };
2148            }
2149
2150            WriteState::HostReady { accept, .. } => {
2151                accept(store, self, Reader::End)?;
2152            }
2153
2154            WriteState::Open => {
2155                state.update_event(
2156                    write_handle.rep(),
2157                    match kind {
2158                        TransmitKind::Future => Event::FutureWrite {
2159                            code: ReturnCode::Dropped(0),
2160                            pending: None,
2161                        },
2162                        TransmitKind::Stream => Event::StreamWrite {
2163                            code: ReturnCode::Dropped(0),
2164                            pending: None,
2165                        },
2166                    },
2167                )?;
2168            }
2169
2170            WriteState::Dropped => {
2171                log::trace!("host_drop_reader delete {transmit_id:?}");
2172                state.delete_transmit(transmit_id)?;
2173            }
2174        }
2175        Ok(())
2176    }
2177
2178    /// Drop the write end of a stream or future read from the host.
2179    fn host_drop_writer<T: func::Lower + Send + 'static, U>(
2180        self,
2181        mut store: StoreContextMut<U>,
2182        id: TableId<TransmitHandle>,
2183        default: Option<&dyn Fn() -> Result<T>>,
2184    ) -> Result<()> {
2185        let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
2186        let transmit = self
2187            .concurrent_state_mut(store.0)
2188            .get_mut(transmit_id)
2189            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2190        log::trace!(
2191            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2192            transmit.read,
2193            transmit.write
2194        );
2195
2196        if let Some(waker) = transmit.writer_watcher.take() {
2197            waker.wake();
2198        }
2199
2200        // Existing queued transmits must be updated with information for the impending writer closure
2201        match &mut transmit.write {
2202            WriteState::GuestReady { .. } => {
2203                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2204            }
2205            WriteState::HostReady { post_write, .. } => {
2206                *post_write = PostWrite::Drop;
2207            }
2208            v @ WriteState::Open => {
2209                if let (Some(default), false) = (
2210                    default,
2211                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2212                ) {
2213                    // This is a future, and we haven't written a value yet --
2214                    // write the default value.
2215                    _ = self.host_write(
2216                        store.as_context_mut(),
2217                        id,
2218                        Some(default()?),
2219                        TransmitKind::Future,
2220                        PostWrite::Drop,
2221                    )?;
2222                } else {
2223                    *v = WriteState::Dropped;
2224                }
2225            }
2226            WriteState::Dropped => unreachable!("write state is already dropped"),
2227        }
2228
2229        let transmit = self.concurrent_state_mut(store.0).get_mut(transmit_id)?;
2230
2231        // If the existing read state is dropped, then there's nothing to read
2232        // and we can keep it that way.
2233        //
2234        // If the read state was any other state, then we must set the new state to open
2235        // to indicate that there *is* data to be read
2236        let new_state = if let ReadState::Dropped = &transmit.read {
2237            ReadState::Dropped
2238        } else {
2239            ReadState::Open
2240        };
2241
2242        let read_handle = transmit.read_handle;
2243
2244        // Swap in the new read state
2245        match mem::replace(&mut transmit.read, new_state) {
2246            // If the guest was ready to read, then we cannot drop the reader (or writer);
2247            // we must deliver the event, and update the state associated with the handle to
2248            // represent that a read must be performed
2249            ReadState::GuestReady { ty, handle, .. } => {
2250                // Ensure the final read of the guest is queued, with appropriate closure indicator
2251                self.concurrent_state_mut(store.0).update_event(
2252                    read_handle.rep(),
2253                    match ty {
2254                        TransmitIndex::Future(ty) => Event::FutureRead {
2255                            code: ReturnCode::Dropped(0),
2256                            pending: Some((ty, handle)),
2257                        },
2258                        TransmitIndex::Stream(ty) => Event::StreamRead {
2259                            code: ReturnCode::Dropped(0),
2260                            pending: Some((ty, handle)),
2261                        },
2262                    },
2263                )?;
2264            }
2265
2266            // If the host was ready to read, and the writer end is being dropped (host->host write?)
2267            // signal to the reader that we've reached the end of the stream
2268            ReadState::HostReady { accept } => {
2269                accept(Writer::End)?;
2270            }
2271
2272            // If the read state is open, then there are no registered readers of the stream/future
2273            ReadState::Open => {
2274                self.concurrent_state_mut(store.0).update_event(
2275                    read_handle.rep(),
2276                    match default {
2277                        Some(_) => Event::FutureRead {
2278                            code: ReturnCode::Dropped(0),
2279                            pending: None,
2280                        },
2281                        None => Event::StreamRead {
2282                            code: ReturnCode::Dropped(0),
2283                            pending: None,
2284                        },
2285                    },
2286                )?;
2287            }
2288
2289            // If the read state was already dropped, then we can remove the transmit state completely
2290            // (both writer and reader have been dropped)
2291            ReadState::Dropped => {
2292                log::trace!("host_drop_writer delete {transmit_id:?}");
2293                self.concurrent_state_mut(store.0)
2294                    .delete_transmit(transmit_id)?;
2295            }
2296        }
2297        Ok(())
2298    }
2299
2300    /// Drop the writable end of the specified stream or future from the guest.
2301    pub(super) fn guest_drop_writable<T>(
2302        self,
2303        store: StoreContextMut<T>,
2304        ty: TransmitIndex,
2305        writer: u32,
2306    ) -> Result<()> {
2307        let table = self.id().get_mut(store.0).table_for_transmit(ty);
2308        let transmit_rep = match ty {
2309            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2310            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2311        };
2312
2313        let id = TableId::<TransmitHandle>::new(transmit_rep);
2314        log::trace!("guest_drop_writable: drop writer {id:?}");
2315        match ty {
2316            TransmitIndex::Stream(_) => {
2317                self.host_drop_writer(store, id, None::<&dyn Fn() -> Result<()>>)
2318            }
2319            TransmitIndex::Future(_) => self.host_drop_writer(
2320                store,
2321                id,
2322                Some(&|| {
2323                    Err::<(), _>(anyhow!(
2324                        "cannot drop future write end without first writing a value"
2325                    ))
2326                }),
2327            ),
2328        }
2329    }
2330
2331    /// Copy `count` items from `read_address` to `write_address` for the
2332    /// specified stream or future.
2333    fn copy<T: 'static>(
2334        self,
2335        mut store: StoreContextMut<T>,
2336        flat_abi: Option<FlatAbi>,
2337        write_ty: TransmitIndex,
2338        write_options: &Options,
2339        write_address: usize,
2340        read_ty: TransmitIndex,
2341        read_options: &Options,
2342        read_address: usize,
2343        count: usize,
2344        rep: u32,
2345    ) -> Result<()> {
2346        let types = self.id().get(store.0).component().types().clone();
2347        match (write_ty, read_ty) {
2348            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2349                assert_eq!(count, 1);
2350
2351                let val = types[types[write_ty].ty]
2352                    .payload
2353                    .map(|ty| {
2354                        let abi = types.canonical_abi(&ty);
2355                        // FIXME: needs to read an i64 for memory64
2356                        if write_address % usize::try_from(abi.align32)? != 0 {
2357                            bail!("write pointer not aligned");
2358                        }
2359
2360                        let lift =
2361                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2362                        let bytes = lift
2363                            .memory()
2364                            .get(write_address..)
2365                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2366                            .ok_or_else(|| {
2367                                anyhow::anyhow!("write pointer out of bounds of memory")
2368                            })?;
2369
2370                        Val::load(lift, ty, bytes)
2371                    })
2372                    .transpose()?;
2373
2374                if let Some(val) = val {
2375                    let lower =
2376                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2377                    let ty = types[types[read_ty].ty].payload.unwrap();
2378                    let ptr = func::validate_inbounds_dynamic(
2379                        types.canonical_abi(&ty),
2380                        lower.as_slice_mut(),
2381                        &ValRaw::u32(read_address.try_into().unwrap()),
2382                    )?;
2383                    val.store(lower, ty, ptr)?;
2384                }
2385            }
2386            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2387                if let Some(flat_abi) = flat_abi {
2388                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
2389                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2390                    if length_in_bytes > 0 {
2391                        if write_address % usize::try_from(flat_abi.align)? != 0 {
2392                            bail!("write pointer not aligned");
2393                        }
2394                        if read_address % usize::try_from(flat_abi.align)? != 0 {
2395                            bail!("read pointer not aligned");
2396                        }
2397
2398                        let store_opaque = store.0.store_opaque_mut();
2399
2400                        {
2401                            let src = write_options
2402                                .memory(store_opaque)
2403                                .get(write_address..)
2404                                .and_then(|b| b.get(..length_in_bytes))
2405                                .ok_or_else(|| {
2406                                    anyhow::anyhow!("write pointer out of bounds of memory")
2407                                })?
2408                                .as_ptr();
2409                            let dst = read_options
2410                                .memory_mut(store_opaque)
2411                                .get_mut(read_address..)
2412                                .and_then(|b| b.get_mut(..length_in_bytes))
2413                                .ok_or_else(|| {
2414                                    anyhow::anyhow!("read pointer out of bounds of memory")
2415                                })?
2416                                .as_mut_ptr();
2417                            // SAFETY: Both `src` and `dst` have been validated
2418                            // above.
2419                            unsafe { src.copy_to(dst, length_in_bytes) };
2420                        }
2421                    }
2422                } else {
2423                    let store_opaque = store.0.store_opaque_mut();
2424                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
2425                    let ty = types[types[write_ty].ty].payload.unwrap();
2426                    let abi = lift.types.canonical_abi(&ty);
2427                    let size = usize::try_from(abi.size32).unwrap();
2428                    if write_address % usize::try_from(abi.align32)? != 0 {
2429                        bail!("write pointer not aligned");
2430                    }
2431                    let bytes = lift
2432                        .memory()
2433                        .get(write_address..)
2434                        .and_then(|b| b.get(..size * count))
2435                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2436
2437                    let values = (0..count)
2438                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2439                        .collect::<Result<Vec<_>>>()?;
2440
2441                    let id = TableId::<TransmitHandle>::new(rep);
2442                    log::trace!("copy values {values:?} for {id:?}");
2443
2444                    let lower =
2445                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2446                    let ty = types[types[read_ty].ty].payload.unwrap();
2447                    let abi = lower.types.canonical_abi(&ty);
2448                    if read_address % usize::try_from(abi.align32)? != 0 {
2449                        bail!("read pointer not aligned");
2450                    }
2451                    let size = usize::try_from(abi.size32).unwrap();
2452                    lower
2453                        .as_slice_mut()
2454                        .get_mut(read_address..)
2455                        .and_then(|b| b.get_mut(..size * count))
2456                        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2457                    let mut ptr = read_address;
2458                    for value in values {
2459                        value.store(lower, ty, ptr)?;
2460                        ptr += size
2461                    }
2462                }
2463            }
2464            _ => unreachable!(),
2465        }
2466
2467        Ok(())
2468    }
2469
2470    /// Write to the specified stream or future from the guest.
2471    pub(super) fn guest_write<T: 'static>(
2472        self,
2473        mut store: StoreContextMut<T>,
2474        ty: TransmitIndex,
2475        options: OptionsIndex,
2476        flat_abi: Option<FlatAbi>,
2477        handle: u32,
2478        address: u32,
2479        count: u32,
2480    ) -> Result<ReturnCode> {
2481        let address = usize::try_from(address).unwrap();
2482        let count = usize::try_from(count).unwrap();
2483        let options = Options::new_index(store.0, self, options);
2484        if !options.async_() {
2485            bail!("synchronous stream and future writes not yet supported");
2486        }
2487        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
2488        let TransmitLocalState::Write { done } = *state else {
2489            bail!(
2490                "invalid handle {handle}; expected `Write`; got {:?}",
2491                *state
2492            );
2493        };
2494
2495        if done {
2496            bail!("cannot write to stream after being notified that the readable end dropped");
2497        }
2498
2499        *state = TransmitLocalState::Busy;
2500        let transmit_handle = TableId::<TransmitHandle>::new(rep);
2501        let concurrent_state = self.concurrent_state_mut(store.0);
2502        let transmit_id = concurrent_state.get(transmit_handle)?.state;
2503        let transmit = concurrent_state.get_mut(transmit_id)?;
2504        log::trace!(
2505            "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2506            transmit.read
2507        );
2508
2509        if transmit.done {
2510            bail!("cannot write to future after previous write succeeded or readable end dropped");
2511        }
2512
2513        let new_state = if let ReadState::Dropped = &transmit.read {
2514            ReadState::Dropped
2515        } else {
2516            ReadState::Open
2517        };
2518
2519        let set_guest_ready = |me: &mut ConcurrentState| {
2520            let transmit = me.get_mut(transmit_id)?;
2521            assert!(matches!(&transmit.write, WriteState::Open));
2522            transmit.write = WriteState::GuestReady {
2523                ty,
2524                flat_abi,
2525                options,
2526                address,
2527                count,
2528                handle,
2529                post_write: PostWrite::Continue,
2530            };
2531            Ok::<_, crate::Error>(())
2532        };
2533
2534        let result = match mem::replace(&mut transmit.read, new_state) {
2535            ReadState::GuestReady {
2536                ty: read_ty,
2537                flat_abi: read_flat_abi,
2538                options: read_options,
2539                address: read_address,
2540                count: read_count,
2541                handle: read_handle,
2542            } => {
2543                assert_eq!(flat_abi, read_flat_abi);
2544
2545                if let TransmitIndex::Future(_) = ty {
2546                    transmit.done = true;
2547                }
2548
2549                // Note that zero-length reads and writes are handling specially
2550                // by the spec to allow each end to signal readiness to the
2551                // other.  Quoting the spec:
2552                //
2553                // ```
2554                // The meaning of a read or write when the length is 0 is that
2555                // the caller is querying the "readiness" of the other
2556                // side. When a 0-length read/write rendezvous with a
2557                // non-0-length read/write, only the 0-length read/write
2558                // completes; the non-0-length read/write is kept pending (and
2559                // ready for a subsequent rendezvous).
2560                //
2561                // In the corner case where a 0-length read and write
2562                // rendezvous, only the writer is notified of readiness. To
2563                // avoid livelock, the Canonical ABI requires that a writer must
2564                // (eventually) follow a completed 0-length write with a
2565                // non-0-length write that is allowed to block (allowing the
2566                // reader end to run and rendezvous with its own non-0-length
2567                // read).
2568                // ```
2569
2570                let write_complete = count == 0 || read_count > 0;
2571                let read_complete = count > 0;
2572                let read_buffer_remaining = count < read_count;
2573
2574                let read_handle_rep = transmit.read_handle.rep();
2575
2576                let count = count.min(read_count);
2577
2578                self.copy(
2579                    store.as_context_mut(),
2580                    flat_abi,
2581                    ty,
2582                    &options,
2583                    address,
2584                    read_ty,
2585                    &read_options,
2586                    read_address,
2587                    count,
2588                    rep,
2589                )?;
2590
2591                let instance = self.id().get_mut(store.0);
2592                let types = instance.component().types();
2593                let item_size = payload(ty, types)
2594                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2595                    .unwrap_or(0);
2596                let concurrent_state = instance.concurrent_state_mut();
2597                if read_complete {
2598                    let count = u32::try_from(count).unwrap();
2599                    let total = if let Some(Event::StreamRead {
2600                        code: ReturnCode::Completed(old_total),
2601                        ..
2602                    }) = concurrent_state.take_event(read_handle_rep)?
2603                    {
2604                        count + old_total
2605                    } else {
2606                        count
2607                    };
2608
2609                    let code = ReturnCode::completed(ty.kind(), total);
2610
2611                    concurrent_state.set_event(
2612                        read_handle_rep,
2613                        match read_ty {
2614                            TransmitIndex::Future(ty) => Event::FutureRead {
2615                                code,
2616                                pending: Some((ty, read_handle)),
2617                            },
2618                            TransmitIndex::Stream(ty) => Event::StreamRead {
2619                                code,
2620                                pending: Some((ty, read_handle)),
2621                            },
2622                        },
2623                    )?;
2624                }
2625
2626                if read_buffer_remaining {
2627                    let transmit = concurrent_state.get_mut(transmit_id)?;
2628                    transmit.read = ReadState::GuestReady {
2629                        ty: read_ty,
2630                        flat_abi: read_flat_abi,
2631                        options: read_options,
2632                        address: read_address + (count * item_size),
2633                        count: read_count - count,
2634                        handle: read_handle,
2635                    };
2636                }
2637
2638                if write_complete {
2639                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2640                } else {
2641                    set_guest_ready(concurrent_state)?;
2642                    ReturnCode::Blocked
2643                }
2644            }
2645
2646            ReadState::HostReady { accept } => {
2647                if let TransmitIndex::Future(_) = ty {
2648                    transmit.done = true;
2649                }
2650
2651                let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self);
2652                accept(Writer::Guest {
2653                    ty: payload(ty, lift.types),
2654                    lift,
2655                    address,
2656                    count,
2657                })?
2658            }
2659
2660            ReadState::Open => {
2661                set_guest_ready(concurrent_state)?;
2662                ReturnCode::Blocked
2663            }
2664
2665            ReadState::Dropped => {
2666                if let TransmitIndex::Future(_) = ty {
2667                    transmit.done = true;
2668                }
2669
2670                ReturnCode::Dropped(0)
2671            }
2672        };
2673
2674        if result != ReturnCode::Blocked {
2675            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
2676                TransmitLocalState::Write {
2677                    done: matches!(
2678                        (result, ty),
2679                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
2680                    ),
2681                };
2682        }
2683
2684        Ok(result)
2685    }
2686
2687    /// Read from the specified stream or future from the guest.
2688    pub(super) fn guest_read<T: 'static>(
2689        self,
2690        mut store: StoreContextMut<T>,
2691        ty: TransmitIndex,
2692        options: OptionsIndex,
2693        flat_abi: Option<FlatAbi>,
2694        handle: u32,
2695        address: u32,
2696        count: u32,
2697    ) -> Result<ReturnCode> {
2698        let address = usize::try_from(address).unwrap();
2699        let options = Options::new_index(store.0, self, options);
2700        if !options.async_() {
2701            bail!("synchronous stream and future reads not yet supported");
2702        }
2703        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
2704        let TransmitLocalState::Read { done } = *state else {
2705            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
2706        };
2707
2708        if done {
2709            bail!("cannot read from stream after being notified that the writable end dropped");
2710        }
2711
2712        *state = TransmitLocalState::Busy;
2713        let transmit_handle = TableId::<TransmitHandle>::new(rep);
2714        let concurrent_state = self.concurrent_state_mut(store.0);
2715        let transmit_id = concurrent_state.get(transmit_handle)?.state;
2716        let transmit = concurrent_state.get_mut(transmit_id)?;
2717        log::trace!(
2718            "guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2719            transmit.write
2720        );
2721
2722        if transmit.done {
2723            bail!("cannot read from future after previous read succeeded");
2724        }
2725
2726        let new_state = if let WriteState::Dropped = &transmit.write {
2727            WriteState::Dropped
2728        } else {
2729            WriteState::Open
2730        };
2731
2732        let set_guest_ready = |me: &mut ConcurrentState| {
2733            let transmit = me.get_mut(transmit_id)?;
2734            assert!(matches!(&transmit.read, ReadState::Open));
2735            transmit.read = ReadState::GuestReady {
2736                ty,
2737                flat_abi,
2738                options,
2739                address,
2740                count: usize::try_from(count).unwrap(),
2741                handle,
2742            };
2743            Ok::<_, crate::Error>(())
2744        };
2745
2746        let result = match mem::replace(&mut transmit.write, new_state) {
2747            WriteState::GuestReady {
2748                ty: write_ty,
2749                flat_abi: write_flat_abi,
2750                options: write_options,
2751                address: write_address,
2752                count: write_count,
2753                handle: write_handle,
2754                post_write,
2755            } => {
2756                assert_eq!(flat_abi, write_flat_abi);
2757
2758                if let TransmitIndex::Future(_) = ty {
2759                    transmit.done = true;
2760                }
2761
2762                let write_handle_rep = transmit.write_handle.rep();
2763
2764                // See the comment in `guest_write` for the
2765                // `ReadState::GuestReady` case concerning zero-length reads and
2766                // writes.
2767
2768                let count = usize::try_from(count).unwrap();
2769
2770                let write_complete = write_count == 0 || count > 0;
2771                let read_complete = write_count > 0;
2772                let write_buffer_remaining = count < write_count;
2773
2774                let count = count.min(write_count);
2775
2776                self.copy(
2777                    store.as_context_mut(),
2778                    flat_abi,
2779                    write_ty,
2780                    &write_options,
2781                    write_address,
2782                    ty,
2783                    &options,
2784                    address,
2785                    count,
2786                    rep,
2787                )?;
2788
2789                let instance = self.id().get_mut(store.0);
2790                let types = instance.component().types();
2791                let item_size = payload(ty, types)
2792                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2793                    .unwrap_or(0);
2794                let concurrent_state = instance.concurrent_state_mut();
2795                let pending = if let PostWrite::Drop = post_write {
2796                    concurrent_state.get_mut(transmit_id)?.write = WriteState::Dropped;
2797                    false
2798                } else {
2799                    true
2800                };
2801
2802                if write_complete {
2803                    let count = u32::try_from(count).unwrap();
2804                    let total = if let Some(Event::StreamWrite {
2805                        code: ReturnCode::Completed(old_total),
2806                        ..
2807                    }) = concurrent_state.take_event(write_handle_rep)?
2808                    {
2809                        count + old_total
2810                    } else {
2811                        count
2812                    };
2813
2814                    let code = ReturnCode::completed(ty.kind(), total);
2815
2816                    concurrent_state.set_event(
2817                        write_handle_rep,
2818                        match write_ty {
2819                            TransmitIndex::Future(ty) => Event::FutureWrite {
2820                                code,
2821                                pending: pending.then_some((ty, write_handle)),
2822                            },
2823                            TransmitIndex::Stream(ty) => Event::StreamWrite {
2824                                code,
2825                                pending: pending.then_some((ty, write_handle)),
2826                            },
2827                        },
2828                    )?;
2829                }
2830
2831                if write_buffer_remaining {
2832                    let transmit = concurrent_state.get_mut(transmit_id)?;
2833                    transmit.write = WriteState::GuestReady {
2834                        ty: write_ty,
2835                        flat_abi: write_flat_abi,
2836                        options: write_options,
2837                        address: write_address + (count * item_size),
2838                        count: write_count - count,
2839                        handle: write_handle,
2840                        post_write,
2841                    };
2842                }
2843
2844                if read_complete {
2845                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2846                } else {
2847                    set_guest_ready(concurrent_state)?;
2848                    ReturnCode::Blocked
2849                }
2850            }
2851
2852            WriteState::HostReady { accept, post_write } => {
2853                if let TransmitIndex::Future(_) = ty {
2854                    transmit.done = true;
2855                }
2856
2857                let code = accept(
2858                    store.0,
2859                    self,
2860                    Reader::Guest {
2861                        options: &options,
2862                        ty,
2863                        address,
2864                        count: count.try_into().unwrap(),
2865                    },
2866                )?;
2867
2868                if let PostWrite::Drop = post_write {
2869                    self.concurrent_state_mut(store.0)
2870                        .get_mut(transmit_id)?
2871                        .write = WriteState::Dropped;
2872                }
2873
2874                code
2875            }
2876
2877            WriteState::Open => {
2878                set_guest_ready(concurrent_state)?;
2879                ReturnCode::Blocked
2880            }
2881
2882            WriteState::Dropped => ReturnCode::Dropped(0),
2883        };
2884
2885        if result != ReturnCode::Blocked {
2886            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
2887                TransmitLocalState::Read {
2888                    done: matches!(
2889                        (result, ty),
2890                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
2891                    ),
2892                };
2893        }
2894
2895        Ok(result)
2896    }
2897
2898    /// Drop the readable end of the specified stream or future from the guest.
2899    fn guest_drop_readable(
2900        self,
2901        store: &mut dyn VMStore,
2902        ty: TransmitIndex,
2903        reader: u32,
2904    ) -> Result<()> {
2905        let table = self.id().get_mut(store).table_for_transmit(ty);
2906        let (rep, _is_done) = match ty {
2907            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
2908            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
2909        };
2910        let kind = match ty {
2911            TransmitIndex::Stream(_) => TransmitKind::Stream,
2912            TransmitIndex::Future(_) => TransmitKind::Future,
2913        };
2914        let id = TableId::<TransmitHandle>::new(rep);
2915        log::trace!("guest_drop_readable: drop reader {id:?}");
2916        self.host_drop_reader(store, id, kind)
2917    }
2918
2919    /// Create a new error context for the given component.
2920    pub(crate) fn error_context_new(
2921        self,
2922        store: &mut StoreOpaque,
2923        ty: TypeComponentLocalErrorContextTableIndex,
2924        options: OptionsIndex,
2925        debug_msg_address: u32,
2926        debug_msg_len: u32,
2927    ) -> Result<u32> {
2928        let options = Options::new_index(store, self, options);
2929        let lift_ctx = &mut LiftContext::new(store, &options, self);
2930        //  Read string from guest memory
2931        let address = usize::try_from(debug_msg_address)?;
2932        let len = usize::try_from(debug_msg_len)?;
2933        lift_ctx
2934            .memory()
2935            .get(address..)
2936            .and_then(|b| b.get(..len))
2937            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
2938        let message = WasmStr::new(address, len, lift_ctx)?;
2939
2940        // Create a new ErrorContext that is tracked along with other concurrent state
2941        let err_ctx = ErrorContextState {
2942            debug_msg: message
2943                .to_str_from_memory(options.memory(store))?
2944                .to_string(),
2945        };
2946        let state = self.concurrent_state_mut(store);
2947        let table_id = state.push(err_ctx)?;
2948        let global_ref_count_idx =
2949            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
2950
2951        // Add to the global error context ref counts
2952        let _ = state
2953            .global_error_context_ref_counts
2954            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
2955
2956        // Error context are tracked both locally (to a single component instance) and globally
2957        // the counts for both must stay in sync.
2958        //
2959        // Here we reflect the newly created global concurrent error context state into the
2960        // component instance's locally tracked count, along with the appropriate key into the global
2961        // ref tracking data structures to enable later lookup
2962        let local_idx = self
2963            .id()
2964            .get_mut(store)
2965            .table_for_error_context(ty)
2966            .error_context_insert(table_id.rep())?;
2967
2968        Ok(local_idx)
2969    }
2970
2971    /// Retrieve the debug message from the specified error context.
2972    pub(super) fn error_context_debug_message<T>(
2973        self,
2974        store: StoreContextMut<T>,
2975        ty: TypeComponentLocalErrorContextTableIndex,
2976        options: OptionsIndex,
2977        err_ctx_handle: u32,
2978        debug_msg_address: u32,
2979    ) -> Result<()> {
2980        // Retrieve the error context and internal debug message
2981        let handle_table_id_rep = self
2982            .id()
2983            .get_mut(store.0)
2984            .table_for_error_context(ty)
2985            .error_context_rep(err_ctx_handle)?;
2986
2987        let state = self.concurrent_state_mut(store.0);
2988        // Get the state associated with the error context
2989        let ErrorContextState { debug_msg } =
2990            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
2991        let debug_msg = debug_msg.clone();
2992
2993        let options = Options::new_index(store.0, self, options);
2994        let types = self.id().get(store.0).component().types().clone();
2995        let lower_cx = &mut LowerContext::new(store, &options, &types, self);
2996        let debug_msg_address = usize::try_from(debug_msg_address)?;
2997        // Lower the string into the component's memory
2998        let offset = lower_cx
2999            .as_slice_mut()
3000            .get(debug_msg_address..)
3001            .and_then(|b| b.get(..debug_msg.bytes().len()))
3002            .map(|_| debug_msg_address)
3003            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3004        debug_msg
3005            .as_str()
3006            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3007
3008        Ok(())
3009    }
3010
3011    /// Implements the `future.drop-readable` intrinsic.
3012    pub(crate) fn future_drop_readable(
3013        self,
3014        store: &mut dyn VMStore,
3015        ty: TypeFutureTableIndex,
3016        reader: u32,
3017    ) -> Result<()> {
3018        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3019    }
3020
3021    /// Implements the `stream.drop-readable` intrinsic.
3022    pub(crate) fn stream_drop_readable(
3023        self,
3024        store: &mut dyn VMStore,
3025        ty: TypeStreamTableIndex,
3026        reader: u32,
3027    ) -> Result<()> {
3028        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3029    }
3030}
3031
3032impl ComponentInstance {
3033    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
3034        let (tables, types) = self.guest_tables();
3035        let runtime_instance = match ty {
3036            TransmitIndex::Stream(ty) => types[ty].instance,
3037            TransmitIndex::Future(ty) => types[ty].instance,
3038        };
3039        &mut tables[runtime_instance]
3040    }
3041
3042    fn table_for_error_context(
3043        self: Pin<&mut Self>,
3044        ty: TypeComponentLocalErrorContextTableIndex,
3045    ) -> &mut HandleTable {
3046        let (tables, types) = self.guest_tables();
3047        let runtime_instance = types[ty].instance;
3048        &mut tables[runtime_instance]
3049    }
3050
3051    fn get_mut_by_index(
3052        self: Pin<&mut Self>,
3053        ty: TransmitIndex,
3054        index: u32,
3055    ) -> Result<(u32, &mut TransmitLocalState)> {
3056        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
3057    }
3058
3059    /// Allocate a new future or stream and grant ownership of both the read and
3060    /// write ends to the (sub-)component instance to which the specified
3061    /// `TransmitIndex` belongs.
3062    fn guest_new(mut self: Pin<&mut Self>, ty: TransmitIndex) -> Result<ResourcePair> {
3063        let (write, read) = self.as_mut().concurrent_state_mut().new_transmit()?;
3064
3065        let table = self.as_mut().table_for_transmit(ty);
3066        let (read_handle, write_handle) = match ty {
3067            TransmitIndex::Future(ty) => (
3068                table.future_insert_read(ty, read.rep())?,
3069                table.future_insert_write(ty, write.rep())?,
3070            ),
3071            TransmitIndex::Stream(ty) => (
3072                table.stream_insert_read(ty, read.rep())?,
3073                table.stream_insert_write(ty, write.rep())?,
3074            ),
3075        };
3076
3077        let state = self.as_mut().concurrent_state_mut();
3078        state.get_mut(read)?.common.handle = Some(read_handle);
3079        state.get_mut(write)?.common.handle = Some(write_handle);
3080
3081        Ok(ResourcePair {
3082            write: write_handle,
3083            read: read_handle,
3084        })
3085    }
3086
3087    /// Cancel a pending write for the specified stream or future from the guest.
3088    fn guest_cancel_write(
3089        mut self: Pin<&mut Self>,
3090        ty: TransmitIndex,
3091        writer: u32,
3092        _async_: bool,
3093    ) -> Result<ReturnCode> {
3094        let (rep, state) = get_mut_by_index_from(self.as_mut().table_for_transmit(ty), ty, writer)?;
3095        let id = TableId::<TransmitHandle>::new(rep);
3096        log::trace!("guest cancel write {id:?} (handle {writer})");
3097        match state {
3098            TransmitLocalState::Write { .. } => {
3099                bail!("stream or future write cancelled when no write is pending")
3100            }
3101            TransmitLocalState::Read { .. } => {
3102                bail!("passed read end to `{{stream|future}}.cancel-write`")
3103            }
3104            TransmitLocalState::Busy => {
3105                *state = TransmitLocalState::Write { done: false };
3106            }
3107        }
3108        let state = self.concurrent_state_mut();
3109        let rep = state.get(id)?.state.rep();
3110        state.host_cancel_write(rep)
3111    }
3112
3113    /// Cancel a pending read for the specified stream or future from the guest.
3114    fn guest_cancel_read(
3115        mut self: Pin<&mut Self>,
3116        ty: TransmitIndex,
3117        reader: u32,
3118        _async_: bool,
3119    ) -> Result<ReturnCode> {
3120        let (rep, state) = get_mut_by_index_from(self.as_mut().table_for_transmit(ty), ty, reader)?;
3121        let id = TableId::<TransmitHandle>::new(rep);
3122        log::trace!("guest cancel read {id:?} (handle {reader})");
3123        match state {
3124            TransmitLocalState::Read { .. } => {
3125                bail!("stream or future read cancelled when no read is pending")
3126            }
3127            TransmitLocalState::Write { .. } => {
3128                bail!("passed write end to `{{stream|future}}.cancel-read`")
3129            }
3130            TransmitLocalState::Busy => {
3131                *state = TransmitLocalState::Read { done: false };
3132            }
3133        }
3134        let state = self.concurrent_state_mut();
3135        let rep = state.get(id)?.state.rep();
3136        state.host_cancel_read(rep)
3137    }
3138
3139    /// Drop the specified error context.
3140    pub(crate) fn error_context_drop(
3141        mut self: Pin<&mut Self>,
3142        ty: TypeComponentLocalErrorContextTableIndex,
3143        error_context: u32,
3144    ) -> Result<()> {
3145        let local_handle_table = self.as_mut().table_for_error_context(ty);
3146
3147        let rep = local_handle_table.error_context_drop(error_context)?;
3148
3149        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3150
3151        let state = self.concurrent_state_mut();
3152        let GlobalErrorContextRefCount(global_ref_count) = state
3153            .global_error_context_ref_counts
3154            .get_mut(&global_ref_count_idx)
3155            .expect("retrieve concurrent state for error context during drop");
3156
3157        // Reduce the component-global ref count, removing tracking if necessary
3158        assert!(*global_ref_count >= 1);
3159        *global_ref_count -= 1;
3160        if *global_ref_count == 0 {
3161            state
3162                .global_error_context_ref_counts
3163                .remove(&global_ref_count_idx);
3164
3165            state
3166                .delete(TableId::<ErrorContextState>::new(rep))
3167                .context("deleting component-global error context data")?;
3168        }
3169
3170        Ok(())
3171    }
3172
3173    /// Transfer ownership of the specified stream or future read end from one
3174    /// guest to another.
3175    fn guest_transfer(
3176        mut self: Pin<&mut Self>,
3177        src_idx: u32,
3178        src: TransmitIndex,
3179        dst: TransmitIndex,
3180    ) -> Result<u32> {
3181        let src_table = self.as_mut().table_for_transmit(src);
3182        let (rep, is_done) = match src {
3183            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
3184            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
3185        };
3186        if is_done {
3187            bail!("cannot lift after being notified that the writable end dropped");
3188        }
3189        let dst_table = self.as_mut().table_for_transmit(dst);
3190        let handle = match dst {
3191            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
3192            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
3193        }?;
3194        self.concurrent_state_mut()
3195            .get_mut(TableId::<TransmitHandle>::new(rep))?
3196            .common
3197            .handle = Some(handle);
3198        Ok(handle)
3199    }
3200
3201    /// Implements the `future.new` intrinsic.
3202    pub(crate) fn future_new(
3203        self: Pin<&mut Self>,
3204        ty: TypeFutureTableIndex,
3205    ) -> Result<ResourcePair> {
3206        self.guest_new(TransmitIndex::Future(ty))
3207    }
3208
3209    /// Implements the `future.cancel-write` intrinsic.
3210    pub(crate) fn future_cancel_write(
3211        self: Pin<&mut Self>,
3212        ty: TypeFutureTableIndex,
3213        async_: bool,
3214        writer: u32,
3215    ) -> Result<u32> {
3216        self.guest_cancel_write(TransmitIndex::Future(ty), writer, async_)
3217            .map(|result| result.encode())
3218    }
3219
3220    /// Implements the `future.cancel-read` intrinsic.
3221    pub(crate) fn future_cancel_read(
3222        self: Pin<&mut Self>,
3223        ty: TypeFutureTableIndex,
3224        async_: bool,
3225        reader: u32,
3226    ) -> Result<u32> {
3227        self.guest_cancel_read(TransmitIndex::Future(ty), reader, async_)
3228            .map(|result| result.encode())
3229    }
3230
3231    /// Implements the `stream.new` intrinsic.
3232    pub(crate) fn stream_new(
3233        self: Pin<&mut Self>,
3234        ty: TypeStreamTableIndex,
3235    ) -> Result<ResourcePair> {
3236        self.guest_new(TransmitIndex::Stream(ty))
3237    }
3238
3239    /// Implements the `stream.cancel-write` intrinsic.
3240    pub(crate) fn stream_cancel_write(
3241        self: Pin<&mut Self>,
3242        ty: TypeStreamTableIndex,
3243        async_: bool,
3244        writer: u32,
3245    ) -> Result<u32> {
3246        self.guest_cancel_write(TransmitIndex::Stream(ty), writer, async_)
3247            .map(|result| result.encode())
3248    }
3249
3250    /// Implements the `stream.cancel-read` intrinsic.
3251    pub(crate) fn stream_cancel_read(
3252        self: Pin<&mut Self>,
3253        ty: TypeStreamTableIndex,
3254        async_: bool,
3255        reader: u32,
3256    ) -> Result<u32> {
3257        self.guest_cancel_read(TransmitIndex::Stream(ty), reader, async_)
3258            .map(|result| result.encode())
3259    }
3260
3261    /// Transfer ownership of the specified future read end from one guest to
3262    /// another.
3263    pub(crate) fn future_transfer(
3264        self: Pin<&mut Self>,
3265        src_idx: u32,
3266        src: TypeFutureTableIndex,
3267        dst: TypeFutureTableIndex,
3268    ) -> Result<u32> {
3269        self.guest_transfer(
3270            src_idx,
3271            TransmitIndex::Future(src),
3272            TransmitIndex::Future(dst),
3273        )
3274    }
3275
3276    /// Transfer ownership of the specified stream read end from one guest to
3277    /// another.
3278    pub(crate) fn stream_transfer(
3279        self: Pin<&mut Self>,
3280        src_idx: u32,
3281        src: TypeStreamTableIndex,
3282        dst: TypeStreamTableIndex,
3283    ) -> Result<u32> {
3284        self.guest_transfer(
3285            src_idx,
3286            TransmitIndex::Stream(src),
3287            TransmitIndex::Stream(dst),
3288        )
3289    }
3290
3291    /// Copy the specified error context from one component to another.
3292    pub(crate) fn error_context_transfer(
3293        mut self: Pin<&mut Self>,
3294        src_idx: u32,
3295        src: TypeComponentLocalErrorContextTableIndex,
3296        dst: TypeComponentLocalErrorContextTableIndex,
3297    ) -> Result<u32> {
3298        let rep = self
3299            .as_mut()
3300            .table_for_error_context(src)
3301            .error_context_rep(src_idx)?;
3302        let dst_idx = self
3303            .as_mut()
3304            .table_for_error_context(dst)
3305            .error_context_insert(rep)?;
3306
3307        // Update the global (cross-subcomponent) count for error contexts
3308        // as the new component has essentially created a new reference that will
3309        // be dropped/handled independently
3310        let global_ref_count = self
3311            .concurrent_state_mut()
3312            .global_error_context_ref_counts
3313            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
3314            .context("global ref count present for existing (sub)component error context")?;
3315        global_ref_count.0 += 1;
3316
3317        Ok(dst_idx)
3318    }
3319}
3320
3321impl ConcurrentState {
3322    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
3323        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
3324    }
3325
3326    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3327        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
3328    }
3329
3330    /// Set or update the event for the specified waitable.
3331    ///
3332    /// If there is already an event set for this waitable, we assert that it is
3333    /// of the same variant as the new one and reuse the `ReturnCode` count and
3334    /// the `pending` field if applicable.
3335    // TODO: This is a bit awkward due to how
3336    // `Event::{Stream,Future}{Write,Read}` and
3337    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
3338    // Consider updating those representations in a way that allows this
3339    // function to be simplified.
3340    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3341        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
3342
3343        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
3344            let (ReturnCode::Completed(count)
3345            | ReturnCode::Dropped(count)
3346            | ReturnCode::Cancelled(count)) = old
3347            else {
3348                unreachable!()
3349            };
3350
3351            match new {
3352                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
3353                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
3354                _ => unreachable!(),
3355            }
3356        }
3357
3358        let event = match (waitable.take_event(self)?, event) {
3359            (None, _) => event,
3360            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
3361            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
3362            (
3363                Some(Event::StreamWrite {
3364                    code: old_code,
3365                    pending: old_pending,
3366                }),
3367                Event::StreamWrite { code, pending },
3368            ) => Event::StreamWrite {
3369                code: update_code(old_code, code),
3370                pending: old_pending.or(pending),
3371            },
3372            (
3373                Some(Event::StreamRead {
3374                    code: old_code,
3375                    pending: old_pending,
3376                }),
3377                Event::StreamRead { code, pending },
3378            ) => Event::StreamRead {
3379                code: update_code(old_code, code),
3380                pending: old_pending.or(pending),
3381            },
3382            _ => unreachable!(),
3383        };
3384
3385        waitable.set_event(self, Some(event))
3386    }
3387
3388    /// Allocate a new future or stream, including the `TransmitState` and the
3389    /// `TransmitHandle`s corresponding to the read and write ends.
3390    fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
3391        let state_id = self.push(TransmitState::default())?;
3392
3393        let write = self.push(TransmitHandle::new(state_id))?;
3394        let read = self.push(TransmitHandle::new(state_id))?;
3395
3396        let state = self.get_mut(state_id)?;
3397        state.write_handle = write;
3398        state.read_handle = read;
3399
3400        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
3401
3402        Ok((write, read))
3403    }
3404
3405    /// Delete the specified future or stream, including the read and write ends.
3406    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
3407        let state = self.delete(state_id)?;
3408        self.delete(state.write_handle)?;
3409        self.delete(state.read_handle)?;
3410
3411        log::trace!(
3412            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
3413            state.write_handle,
3414            state.read_handle,
3415        );
3416
3417        Ok(())
3418    }
3419
3420    /// Cancel a pending stream or future write from the host.
3421    ///
3422    /// # Arguments
3423    ///
3424    /// * `rep` - The `TransmitState` rep for the stream or future.
3425    fn host_cancel_write(&mut self, rep: u32) -> Result<ReturnCode> {
3426        let transmit_id = TableId::<TransmitState>::new(rep);
3427        let transmit = self.get_mut(transmit_id)?;
3428        log::trace!(
3429            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3430            transmit.read,
3431            transmit.write
3432        );
3433
3434        let code = if let Some(event) =
3435            Waitable::Transmit(transmit.write_handle).take_event(self)?
3436        {
3437            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3438                unreachable!();
3439            };
3440            match (code, event) {
3441                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3442                    ReturnCode::Cancelled(count)
3443                }
3444                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3445                _ => unreachable!(),
3446            }
3447        } else {
3448            ReturnCode::Cancelled(0)
3449        };
3450
3451        let transmit = self.get_mut(transmit_id)?;
3452
3453        match &transmit.write {
3454            WriteState::GuestReady { .. } | WriteState::HostReady { .. } => {
3455                transmit.write = WriteState::Open;
3456            }
3457
3458            WriteState::Open | WriteState::Dropped => {}
3459        }
3460
3461        log::trace!("cancelled write {transmit_id:?}");
3462
3463        Ok(code)
3464    }
3465
3466    /// Cancel a pending stream or future read from the host.
3467    ///
3468    /// # Arguments
3469    ///
3470    /// * `rep` - The `TransmitState` rep for the stream or future.
3471    fn host_cancel_read(&mut self, rep: u32) -> Result<ReturnCode> {
3472        let transmit_id = TableId::<TransmitState>::new(rep);
3473        let transmit = self.get_mut(transmit_id)?;
3474        log::trace!(
3475            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3476            transmit.read,
3477            transmit.write
3478        );
3479
3480        let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
3481            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3482                unreachable!();
3483            };
3484            match (code, event) {
3485                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3486                    ReturnCode::Cancelled(count)
3487                }
3488                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3489                _ => unreachable!(),
3490            }
3491        } else {
3492            ReturnCode::Cancelled(0)
3493        };
3494
3495        let transmit = self.get_mut(transmit_id)?;
3496
3497        match &transmit.read {
3498            ReadState::GuestReady { .. } | ReadState::HostReady { .. } => {
3499                transmit.read = ReadState::Open;
3500            }
3501
3502            ReadState::Open | ReadState::Dropped => {}
3503        }
3504
3505        log::trace!("cancelled read {transmit_id:?}");
3506
3507        Ok(code)
3508    }
3509}
3510
3511pub(crate) struct ResourcePair {
3512    pub(crate) write: u32,
3513    pub(crate) read: u32,
3514}