wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{
3    Event, GlobalErrorContextRefCount, LocalErrorContextRefCount, StateTable, Waitable,
4    WaitableCommon, WaitableState,
5};
6use crate::component::concurrent::{ConcurrentState, WorkItem};
7use crate::component::func::{self, LiftContext, LowerContext, Options};
8use crate::component::matching::InstanceType;
9use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
10use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
11use crate::store::{StoreOpaque, StoreToken};
12use crate::vm::VMStore;
13use crate::{AsContextMut, StoreContextMut, ValRaw};
14use anyhow::{Context, Result, anyhow, bail};
15use buffers::Extender;
16use buffers::UntypedWriteBuffer;
17use futures::channel::oneshot;
18use std::boxed::Box;
19use std::fmt;
20use std::future;
21use std::iter;
22use std::marker::PhantomData;
23use std::mem::{self, MaybeUninit};
24use std::string::{String, ToString};
25use std::sync::{Arc, Mutex};
26use std::task::{Poll, Waker};
27use std::vec::Vec;
28use wasmtime_environ::component::{
29    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
30    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
31    TypeFutureTableIndex, TypeStreamTableIndex,
32};
33
34pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
35
36mod buffers;
37
38/// Enum for distinguishing between a stream or future in functions that handle
39/// both.
40#[derive(Copy, Clone, Debug)]
41enum TransmitKind {
42    Stream,
43    Future,
44}
45
46/// Represents `{stream,future}.{read,write}` results.
47#[derive(Copy, Clone, Debug, PartialEq)]
48pub enum ReturnCode {
49    Blocked,
50    Completed(u32),
51    Dropped(u32),
52    Cancelled(u32),
53}
54
55impl ReturnCode {
56    /// Pack `self` into a single 32-bit integer that may be returned to the
57    /// guest.
58    ///
59    /// This corresponds to `pack_copy_result` in the Component Model spec.
60    pub fn encode(&self) -> u32 {
61        const BLOCKED: u32 = 0xffff_ffff;
62        const COMPLETED: u32 = 0x0;
63        const DROPPED: u32 = 0x1;
64        const CANCELLED: u32 = 0x2;
65        match self {
66            ReturnCode::Blocked => BLOCKED,
67            ReturnCode::Completed(n) => {
68                debug_assert!(*n < (1 << 28));
69                (n << 4) | COMPLETED
70            }
71            ReturnCode::Dropped(n) => {
72                debug_assert!(*n < (1 << 28));
73                (n << 4) | DROPPED
74            }
75            ReturnCode::Cancelled(n) => {
76                debug_assert!(*n < (1 << 28));
77                (n << 4) | CANCELLED
78            }
79        }
80    }
81
82    /// Returns `Self::Completed` with the specified count (or zero if
83    /// `matches!(kind, TransmitKind::Future)`)
84    fn completed(kind: TransmitKind, count: u32) -> Self {
85        Self::Completed(if let TransmitKind::Future = kind {
86            0
87        } else {
88            count
89        })
90    }
91}
92
93/// Represents a stream or future type index.
94///
95/// This is useful as a parameter type for functions which operate on either a
96/// future or a stream.
97#[derive(Copy, Clone, Debug)]
98pub(super) enum TableIndex {
99    Stream(TypeStreamTableIndex),
100    Future(TypeFutureTableIndex),
101}
102
103impl TableIndex {
104    fn kind(&self) -> TransmitKind {
105        match self {
106            TableIndex::Stream(_) => TransmitKind::Stream,
107            TableIndex::Future(_) => TransmitKind::Future,
108        }
109    }
110}
111
112/// Action to take after writing
113enum PostWrite {
114    /// Continue performing writes
115    Continue,
116    /// Drop the channel post-write
117    Drop,
118}
119
120/// Represents the result of a host-initiated stream or future read or write.
121struct HostResult<B> {
122    /// The buffer provided when reading or writing.
123    buffer: B,
124    /// Whether the other end of the stream or future has been dropped.
125    dropped: bool,
126}
127
128/// Retrieve the payload type of the specified stream or future, or `None` if it
129/// has no payload type.
130fn payload(ty: TableIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
131    match ty {
132        TableIndex::Future(ty) => types[types[ty].ty].payload,
133        TableIndex::Stream(ty) => types[types[ty].ty].payload,
134    }
135}
136
137/// Retrieve the host rep and state for the specified guest-visible waitable
138/// handle.
139fn get_mut_by_index_from(
140    state_table: &mut StateTable<WaitableState>,
141    ty: TableIndex,
142    index: u32,
143) -> Result<(u32, &mut StreamFutureState)> {
144    Ok(match ty {
145        TableIndex::Stream(ty) => {
146            let (rep, WaitableState::Stream(actual_ty, state)) =
147                state_table.get_mut_by_index(index)?
148            else {
149                bail!("invalid stream handle");
150            };
151            if *actual_ty != ty {
152                bail!("invalid stream handle");
153            }
154            (rep, state)
155        }
156        TableIndex::Future(ty) => {
157            let (rep, WaitableState::Future(actual_ty, state)) =
158                state_table.get_mut_by_index(index)?
159            else {
160                bail!("invalid future handle");
161            };
162            if *actual_ty != ty {
163                bail!("invalid future handle");
164            }
165            (rep, state)
166        }
167    })
168}
169
170/// Construct a `WaitableState` using the specified type and state.
171fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState {
172    match ty {
173        TableIndex::Stream(ty) => WaitableState::Stream(ty, state),
174        TableIndex::Future(ty) => WaitableState::Future(ty, state),
175    }
176}
177
178/// Complete a write initiated by a host-owned future or stream by matching it
179/// with the specified `Reader`.
180fn accept_reader<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
181    mut store: StoreContextMut<U>,
182    instance: Instance,
183    reader: Reader,
184    mut buffer: B,
185    kind: TransmitKind,
186) -> Result<(HostResult<B>, ReturnCode)> {
187    Ok(match reader {
188        Reader::Guest {
189            options,
190            ty,
191            address,
192            count,
193        } => {
194            let types = instance.id().get(store.0).component().types().clone();
195            let count = buffer.remaining().len().min(count);
196
197            let lower = &mut if T::MAY_REQUIRE_REALLOC {
198                LowerContext::new
199            } else {
200                LowerContext::new_without_realloc
201            }(store.as_context_mut(), options, &types, instance);
202
203            if address % usize::try_from(T::ALIGN32)? != 0 {
204                bail!("read pointer not aligned");
205            }
206            lower
207                .as_slice_mut()
208                .get_mut(address..)
209                .and_then(|b| b.get_mut(..T::SIZE32 * count))
210                .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
211
212            if let Some(ty) = payload(ty, &types) {
213                T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
214            }
215
216            buffer.skip(count);
217            (
218                HostResult {
219                    buffer,
220                    dropped: false,
221                },
222                ReturnCode::completed(kind, count.try_into().unwrap()),
223            )
224        }
225        Reader::Host { accept } => {
226            let count = buffer.remaining().len();
227            let mut untyped = UntypedWriteBuffer::new(&mut buffer);
228            let count = accept(&mut untyped, count);
229            (
230                HostResult {
231                    buffer,
232                    dropped: false,
233                },
234                ReturnCode::completed(kind, count.try_into().unwrap()),
235            )
236        }
237        Reader::End => (
238            HostResult {
239                buffer,
240                dropped: true,
241            },
242            ReturnCode::Dropped(0),
243        ),
244    })
245}
246
247/// Complete a read initiated by a host-owned future or stream by matching it with the
248/// specified `Writer`.
249fn accept_writer<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
250    writer: Writer,
251    mut buffer: B,
252    kind: TransmitKind,
253) -> Result<(HostResult<B>, ReturnCode)> {
254    Ok(match writer {
255        Writer::Guest {
256            lift,
257            ty,
258            address,
259            count,
260        } => {
261            let count = count.min(buffer.remaining_capacity());
262            if T::IS_RUST_UNIT_TYPE {
263                // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
264                // zero-sized type, so `MaybeUninit::uninit().assume_init()`
265                // is a valid way to populate the zero-sized buffer.
266                buffer.extend(
267                    iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() })
268                        .take(count),
269                )
270            } else {
271                let ty = ty.unwrap();
272                if address % usize::try_from(T::ALIGN32)? != 0 {
273                    bail!("write pointer not aligned");
274                }
275                lift.memory()
276                    .get(address..)
277                    .and_then(|b| b.get(..T::SIZE32 * count))
278                    .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
279
280                let list = &WasmList::new(address, count, lift, ty)?;
281                T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))?
282            }
283            (
284                HostResult {
285                    buffer,
286                    dropped: false,
287                },
288                ReturnCode::completed(kind, count.try_into().unwrap()),
289            )
290        }
291        Writer::Host {
292            buffer: input,
293            count,
294        } => {
295            let count = count.min(buffer.remaining_capacity());
296            buffer.move_from(input.get_mut::<T>(), count);
297            (
298                HostResult {
299                    buffer,
300                    dropped: false,
301                },
302                ReturnCode::completed(kind, count.try_into().unwrap()),
303            )
304        }
305        Writer::End => (
306            HostResult {
307                buffer,
308                dropped: true,
309            },
310            ReturnCode::Dropped(0),
311        ),
312    })
313}
314
315/// Return a `Future` which will resolve once the reader end corresponding to
316/// the specified writer end of a future or stream is dropped.
317async fn watch_reader(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
318    future::poll_fn(|cx| {
319        accessor
320            .as_accessor()
321            .with(|mut access| {
322                let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
323                let state_id = concurrent_state.get(id)?.state;
324                let state = concurrent_state.get_mut(state_id)?;
325                anyhow::Ok(if matches!(&state.read, ReadState::Dropped) {
326                    Poll::Ready(())
327                } else {
328                    state.reader_watcher = Some(cx.waker().clone());
329                    Poll::Pending
330                })
331            })
332            .unwrap_or(Poll::Ready(()))
333    })
334    .await
335}
336
337/// Return a `Future` which will resolve once the writer end corresponding to
338/// the specified reader end of a future or stream is dropped.
339async fn watch_writer(accessor: impl AsAccessor, instance: Instance, id: TableId<TransmitHandle>) {
340    future::poll_fn(|cx| {
341        accessor
342            .as_accessor()
343            .with(|mut access| {
344                let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0);
345                let state_id = concurrent_state.get(id)?.state;
346                let state = concurrent_state.get_mut(state_id)?;
347                anyhow::Ok(
348                    if matches!(
349                        &state.write,
350                        WriteState::Dropped
351                            | WriteState::GuestReady {
352                                post_write: PostWrite::Drop,
353                                ..
354                            }
355                            | WriteState::HostReady {
356                                post_write: PostWrite::Drop,
357                                ..
358                            }
359                    ) {
360                        Poll::Ready(())
361                    } else {
362                        state.writer_watcher = Some(cx.waker().clone());
363                        Poll::Pending
364                    },
365                )
366            })
367            .unwrap_or(Poll::Ready(()))
368    })
369    .await
370}
371
372/// Represents the state of a stream or future handle from the perspective of a
373/// given component instance.
374#[derive(Debug, Eq, PartialEq)]
375pub(super) enum StreamFutureState {
376    /// The write end of the stream or future.
377    Write {
378        /// Whether the component instance has been notified that the stream or
379        /// future is "done" (i.e. the other end has dropped, or, in the case of
380        /// a future, a value has been transmitted).
381        done: bool,
382    },
383    /// The read end of the stream or future.
384    Read {
385        /// Whether the component instance has been notified that the stream or
386        /// future is "done" (i.e. the other end has dropped, or, in the case of
387        /// a future, a value has been transmitted).
388        done: bool,
389    },
390    /// A read or write is in progress.
391    Busy,
392}
393
394/// Represents the state associated with an error context
395#[derive(Debug, PartialEq, Eq, PartialOrd)]
396pub(super) struct ErrorContextState {
397    /// Debug message associated with the error context
398    pub(crate) debug_msg: String,
399}
400
401/// Represents the size and alignment for a "flat" Component Model type,
402/// i.e. one containing no pointers or handles.
403#[derive(Debug, Clone, Copy, PartialEq, Eq)]
404pub(super) struct FlatAbi {
405    pub(super) size: u32,
406    pub(super) align: u32,
407}
408
409/// Represents the writable end of a Component Model `future`.
410///
411/// Note that `FutureWriter` instances must be disposed of using either `write`
412/// or `close`; otherwise the in-store representation will leak and the reader
413/// end will hang indefinitely.  Consider using [`GuardedFutureWriter`] to
414/// ensure that disposal happens automatically.
415pub struct FutureWriter<T> {
416    default: fn() -> T,
417    id: TableId<TransmitHandle>,
418    instance: Instance,
419}
420
421impl<T> FutureWriter<T> {
422    fn new(default: fn() -> T, id: TableId<TransmitHandle>, instance: Instance) -> Self {
423        Self {
424            default,
425            id,
426            instance,
427        }
428    }
429
430    /// Write the specified value to this `future`.
431    ///
432    /// The returned `Future` will yield `true` if the read end accepted the
433    /// value; otherwise it will return `false`, meaning the read end was dropped
434    /// before the value could be delivered.
435    ///
436    /// # Panics
437    ///
438    /// Panics if the store that the [`Accessor`] is derived from does not own
439    /// this future.
440    pub async fn write(self, accessor: impl AsAccessor, value: T) -> bool
441    where
442        T: func::Lower + Send + Sync + 'static,
443    {
444        self.guard(accessor).write(value).await
445    }
446
447    /// Mut-ref signature instead of by-value signature for
448    /// `GuardedFutureWriter` to more easily call.
449    async fn write_(&mut self, accessor: impl AsAccessor, value: T) -> bool
450    where
451        T: func::Lower + Send + Sync + 'static,
452    {
453        let accessor = accessor.as_accessor();
454
455        let result = self
456            .instance
457            .host_write_async(accessor, self.id, Some(value), TransmitKind::Future)
458            .await;
459
460        match result {
461            Ok(HostResult { dropped, .. }) => !dropped,
462            Err(_) => todo!("guarantee buffer recovery if `host_write` fails"),
463        }
464    }
465
466    /// Wait for the read end of this `future` is dropped.
467    ///
468    /// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or
469    /// from within a host function for example.
470    ///
471    /// # Panics
472    ///
473    /// Panics if the store that the [`Accessor`] is derived from does not own
474    /// this future.
475    pub async fn watch_reader(&mut self, accessor: impl AsAccessor) {
476        watch_reader(accessor, self.instance, self.id).await
477    }
478
479    /// Close this `FutureWriter`, writing the default value.
480    ///
481    /// # Panics
482    ///
483    /// Panics if the store that the [`Accessor`] is derived from does not own
484    /// this future. Usage of this future after calling `close` will also cause
485    /// a panic.
486    pub fn close(&mut self, mut store: impl AsContextMut)
487    where
488        T: func::Lower + Send + Sync + 'static,
489    {
490        let id = mem::replace(&mut self.id, TableId::new(0));
491        let default = self.default;
492        self.instance
493            .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default())))
494            .unwrap();
495    }
496
497    /// Convenience method around [`Self::close`].
498    pub fn close_with(&mut self, accessor: impl AsAccessor)
499    where
500        T: func::Lower + Send + Sync + 'static,
501    {
502        accessor.as_accessor().with(|access| self.close(access))
503    }
504
505    /// Returns a [`GuardedFutureWriter`] which will auto-close this future on
506    /// drop and clean it up from the store.
507    ///
508    /// Note that the `accessor` provided must own this future and is
509    /// additionally transferred to the `GuardedFutureWriter` return value.
510    pub fn guard<A>(self, accessor: A) -> GuardedFutureWriter<T, A>
511    where
512        T: func::Lower + Send + Sync + 'static,
513        A: AsAccessor,
514    {
515        GuardedFutureWriter::new(accessor, self)
516    }
517}
518
519/// A [`FutureWriter`] paired with an [`Accessor`].
520///
521/// This is an RAII wrapper around [`FutureWriter`] that ensures it is closed
522/// when dropped. This can be created through [`GuardedFutureWriter::new`] or
523/// [`FutureWriter::guard`].
524pub struct GuardedFutureWriter<T, A>
525where
526    T: func::Lower + Send + Sync + 'static,
527    A: AsAccessor,
528{
529    // This field is `None` to implement the conversion from this guard back to
530    // `FutureWriter`. When `None` is seen in the destructor it will cause the
531    // destructor to do nothing.
532    writer: Option<FutureWriter<T>>,
533    accessor: A,
534}
535
536impl<T, A> GuardedFutureWriter<T, A>
537where
538    T: func::Lower + Send + Sync + 'static,
539    A: AsAccessor,
540{
541    /// Create a new `GuardedFutureWriter` with the specified `accessor` and
542    /// `writer`.
543    pub fn new(accessor: A, writer: FutureWriter<T>) -> Self {
544        Self {
545            writer: Some(writer),
546            accessor,
547        }
548    }
549
550    /// Wrapper for [`FutureWriter::write`].
551    pub async fn write(mut self, value: T) -> bool
552    where
553        T: func::Lower + Send + Sync + 'static,
554    {
555        self.writer
556            .as_mut()
557            .unwrap()
558            .write_(&self.accessor, value)
559            .await
560    }
561
562    /// Wrapper for [`FutureWriter::watch_reader`]
563    pub async fn watch_reader(&mut self) {
564        self.writer
565            .as_mut()
566            .unwrap()
567            .watch_reader(&self.accessor)
568            .await
569    }
570
571    /// Extracts the underlying [`FutureWriter`] from this guard, returning it
572    /// back.
573    pub fn into_future(self) -> FutureWriter<T> {
574        self.into()
575    }
576}
577
578impl<T, A> From<GuardedFutureWriter<T, A>> for FutureWriter<T>
579where
580    T: func::Lower + Send + Sync + 'static,
581    A: AsAccessor,
582{
583    fn from(mut guard: GuardedFutureWriter<T, A>) -> Self {
584        guard.writer.take().unwrap()
585    }
586}
587
588impl<T, A> Drop for GuardedFutureWriter<T, A>
589where
590    T: func::Lower + Send + Sync + 'static,
591    A: AsAccessor,
592{
593    fn drop(&mut self) {
594        if let Some(writer) = &mut self.writer {
595            writer.close_with(&self.accessor)
596        }
597    }
598}
599
600/// Represents the readable end of a Component Model `future`.
601///
602/// Note that `FutureReader` instances must be disposed of using either `read`
603/// or `close`; otherwise the in-store representation will leak and the writer
604/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
605/// ensure that disposal happens automatically.
606pub struct FutureReader<T> {
607    instance: Instance,
608    id: TableId<TransmitHandle>,
609    _phantom: PhantomData<T>,
610}
611
612impl<T> FutureReader<T> {
613    fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
614        Self {
615            instance,
616            id,
617            _phantom: PhantomData,
618        }
619    }
620
621    /// Read the value from this `future`.
622    ///
623    /// The returned `Future` will yield `Err` if the guest has trapped
624    /// before it could produce a result.
625    ///
626    /// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or
627    /// from within a host function for example.
628    ///
629    /// # Panics
630    ///
631    /// Panics if the store that the [`Accessor`] is derived from does not own
632    /// this future.
633    pub async fn read(self, accessor: impl AsAccessor) -> Option<T>
634    where
635        T: func::Lift + Send + 'static,
636    {
637        self.guard(accessor).read().await
638    }
639
640    async fn read_(&mut self, accessor: impl AsAccessor) -> Option<T>
641    where
642        T: func::Lift + Send + 'static,
643    {
644        let accessor = accessor.as_accessor();
645
646        let result = self
647            .instance
648            .host_read_async(accessor, self.id, None, TransmitKind::Future)
649            .await;
650
651        if let Ok(HostResult {
652            mut buffer,
653            dropped: false,
654        }) = result
655        {
656            buffer.take()
657        } else {
658            None
659        }
660    }
661
662    /// Wait for the write end of this `future` to be dropped.
663    ///
664    /// The [`Accessor`] provided can be acquired from
665    /// [`Instance::run_concurrent`] or from within a host function for example.
666    ///
667    /// # Panics
668    ///
669    /// Panics if the store that the [`Accessor`] is derived from does not own
670    /// this future.
671    pub async fn watch_writer(&mut self, accessor: impl AsAccessor) {
672        watch_writer(accessor, self.instance, self.id).await;
673    }
674
675    /// Convert this `FutureReader` into a [`Val`].
676    // See TODO comment for `FutureAny`; this is prone to handle leakage.
677    pub fn into_val(self) -> Val {
678        Val::Future(FutureAny(self.id.rep()))
679    }
680
681    /// Attempt to convert the specified [`Val`] to a `FutureReader`.
682    pub fn from_val(
683        mut store: impl AsContextMut<Data: Send>,
684        instance: Instance,
685        value: &Val,
686    ) -> Result<Self> {
687        let Val::Future(FutureAny(rep)) = value else {
688            bail!("expected `future`; got `{}`", value.desc());
689        };
690        let store = store.as_context_mut();
691        let id = TableId::<TransmitHandle>::new(*rep);
692        instance.concurrent_state_mut(store.0).get(id)?; // Just make sure it's present
693        Ok(Self::new(id, instance))
694    }
695
696    /// Transfer ownership of the read end of a future from a guest to the host.
697    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
698        match ty {
699            InterfaceType::Future(src) => {
700                let state_table = cx
701                    .instance_mut()
702                    .concurrent_state_mut()
703                    .state_table(TableIndex::Future(src));
704                let (rep, state) =
705                    get_mut_by_index_from(state_table, TableIndex::Future(src), index)?;
706
707                match state {
708                    StreamFutureState::Read { .. } => {
709                        state_table.remove_by_index(index)?;
710                    }
711                    StreamFutureState::Write { .. } => bail!("cannot transfer write end of future"),
712                    StreamFutureState::Busy => bail!("cannot transfer busy future"),
713                }
714
715                let id = TableId::<TransmitHandle>::new(rep);
716                let concurrent_state = cx.instance_mut().concurrent_state_mut();
717                let state = concurrent_state.get(id)?.state;
718
719                if concurrent_state.get(state)?.done {
720                    bail!("cannot lift future after previous read succeeded");
721                }
722
723                Ok(Self::new(id, cx.instance_handle()))
724            }
725            _ => func::bad_type_info(),
726        }
727    }
728
729    /// Close this `FutureReader`, writing the default value.
730    ///
731    /// # Panics
732    ///
733    /// Panics if the store that the [`Accessor`] is derived from does not own
734    /// this future. Usage of this future after calling `close` will also cause
735    /// a panic.
736    pub fn close(&mut self, mut store: impl AsContextMut) {
737        // `self` should never be used again, but leave an invalid handle there just in case.
738        let id = mem::replace(&mut self.id, TableId::new(0));
739        self.instance
740            .host_drop_reader(
741                store.as_context_mut().0.traitobj_mut(),
742                id,
743                TransmitKind::Future,
744            )
745            .unwrap();
746    }
747
748    /// Convenience method around [`Self::close`].
749    pub fn close_with(&mut self, accessor: impl AsAccessor) {
750        accessor.as_accessor().with(|access| self.close(access))
751    }
752
753    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
754    /// drop and clean it up from the store.
755    ///
756    /// Note that the `accessor` provided must own this future and is
757    /// additionally transferred to the `GuardedFutureReader` return value.
758    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
759    where
760        A: AsAccessor,
761    {
762        GuardedFutureReader::new(accessor, self)
763    }
764}
765
766impl<T> fmt::Debug for FutureReader<T> {
767    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
768        f.debug_struct("FutureReader")
769            .field("id", &self.id)
770            .field("instance", &self.instance)
771            .finish()
772    }
773}
774
775/// Transfer ownership of the read end of a future from the host to a guest.
776pub(crate) fn lower_future_to_index<U>(
777    rep: u32,
778    cx: &mut LowerContext<'_, U>,
779    ty: InterfaceType,
780) -> Result<u32> {
781    match ty {
782        InterfaceType::Future(dst) => {
783            let concurrent_state = cx.instance_mut().concurrent_state_mut();
784            let state = concurrent_state
785                .get(TableId::<TransmitHandle>::new(rep))?
786                .state;
787            let rep = concurrent_state.get(state)?.read_handle.rep();
788
789            concurrent_state
790                .state_table(TableIndex::Future(dst))
791                .insert(
792                    rep,
793                    WaitableState::Future(dst, StreamFutureState::Read { done: false }),
794                )
795        }
796        _ => func::bad_type_info(),
797    }
798}
799
800// SAFETY: This relies on the `ComponentType` implementation for `u32` being
801// safe and correct since we lift and lower future handles as `u32`s.
802unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
803    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
804
805    type Lower = <u32 as func::ComponentType>::Lower;
806
807    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
808        match ty {
809            InterfaceType::Future(_) => Ok(()),
810            other => bail!("expected `future`, found `{}`", func::desc(other)),
811        }
812    }
813}
814
815// SAFETY: See the comment on the `ComponentType` `impl` for this type.
816unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
817    fn linear_lower_to_flat<U>(
818        &self,
819        cx: &mut LowerContext<'_, U>,
820        ty: InterfaceType,
821        dst: &mut MaybeUninit<Self::Lower>,
822    ) -> Result<()> {
823        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
824            cx,
825            InterfaceType::U32,
826            dst,
827        )
828    }
829
830    fn linear_lower_to_memory<U>(
831        &self,
832        cx: &mut LowerContext<'_, U>,
833        ty: InterfaceType,
834        offset: usize,
835    ) -> Result<()> {
836        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
837            cx,
838            InterfaceType::U32,
839            offset,
840        )
841    }
842}
843
844// SAFETY: See the comment on the `ComponentType` `impl` for this type.
845unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
846    fn linear_lift_from_flat(
847        cx: &mut LiftContext<'_>,
848        ty: InterfaceType,
849        src: &Self::Lower,
850    ) -> Result<Self> {
851        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
852        Self::lift_from_index(cx, ty, index)
853    }
854
855    fn linear_lift_from_memory(
856        cx: &mut LiftContext<'_>,
857        ty: InterfaceType,
858        bytes: &[u8],
859    ) -> Result<Self> {
860        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
861        Self::lift_from_index(cx, ty, index)
862    }
863}
864
865/// A [`FutureReader`] paired with an [`Accessor`].
866///
867/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
868/// when dropped. This can be created through [`GuardedFutureReader::new`] or
869/// [`FutureReader::guard`].
870pub struct GuardedFutureReader<T, A>
871where
872    A: AsAccessor,
873{
874    // This field is `None` to implement the conversion from this guard back to
875    // `FutureReader`. When `None` is seen in the destructor it will cause the
876    // destructor to do nothing.
877    reader: Option<FutureReader<T>>,
878    accessor: A,
879}
880
881impl<T, A> GuardedFutureReader<T, A>
882where
883    A: AsAccessor,
884{
885    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
886    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
887        Self {
888            reader: Some(reader),
889            accessor,
890        }
891    }
892
893    /// Wrapper for [`FutureReader::read`].
894    pub async fn read(mut self) -> Option<T>
895    where
896        T: func::Lift + Send + 'static,
897    {
898        self.reader.as_mut().unwrap().read_(&self.accessor).await
899    }
900
901    /// Wrapper for [`FutureReader::watch_writer`].
902    pub async fn watch_writer(&mut self) {
903        self.reader
904            .as_mut()
905            .unwrap()
906            .watch_writer(&self.accessor)
907            .await
908    }
909
910    /// Extracts the underlying [`FutureReader`] from this guard, returning it
911    /// back.
912    pub fn into_future(self) -> FutureReader<T> {
913        self.into()
914    }
915}
916
917impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
918where
919    A: AsAccessor,
920{
921    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
922        guard.reader.take().unwrap()
923    }
924}
925
926impl<T, A> Drop for GuardedFutureReader<T, A>
927where
928    A: AsAccessor,
929{
930    fn drop(&mut self) {
931        if let Some(reader) = &mut self.reader {
932            reader.close_with(&self.accessor)
933        }
934    }
935}
936
937/// Represents the writable end of a Component Model `stream`.
938///
939/// Note that `StreamWriter` instances must be disposed of using `close`;
940/// otherwise the in-store representation will leak and the reader end will hang
941/// indefinitely.  Consider using [`GuardedStreamWriter`] to ensure that
942/// disposal happens automatically.
943pub struct StreamWriter<T> {
944    instance: Instance,
945    id: TableId<TransmitHandle>,
946    closed: bool,
947    _phantom: PhantomData<T>,
948}
949
950impl<T> StreamWriter<T> {
951    fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
952        Self {
953            instance,
954            id,
955            closed: false,
956            _phantom: PhantomData,
957        }
958    }
959
960    /// Returns whether this stream is "closed" meaning that the other end of
961    /// the stream has been dropped.
962    pub fn is_closed(&self) -> bool {
963        self.closed
964    }
965
966    /// Write the specified items to the `stream`.
967    ///
968    /// Note that this will only write as many items as the reader accepts
969    /// during its current or next read.  Use `write_all` to loop until the
970    /// buffer is drained or the read end is dropped.
971    ///
972    /// The returned `Future` will yield the input buffer back,
973    /// possibly consuming a subset of the items or nothing depending on the
974    /// number of items the reader accepted.
975    ///
976    /// The [`is_closed`](Self::is_closed) method can be used to determine
977    /// whether the stream was learned to be closed after this operation completes.
978    ///
979    /// # Panics
980    ///
981    /// Panics if the store that the [`Accessor`] is derived from does not own
982    /// this future.
983    pub async fn write<B>(&mut self, accessor: impl AsAccessor, buffer: B) -> B
984    where
985        T: func::Lower + 'static,
986        B: WriteBuffer<T>,
987    {
988        let result = self
989            .instance
990            .host_write_async(
991                accessor.as_accessor(),
992                self.id,
993                buffer,
994                TransmitKind::Stream,
995            )
996            .await;
997
998        match result {
999            Ok(HostResult { buffer, dropped }) => {
1000                if self.closed {
1001                    debug_assert!(dropped);
1002                }
1003                self.closed = dropped;
1004                buffer
1005            }
1006            Err(_) => todo!("guarantee buffer recovery if `host_write` fails"),
1007        }
1008    }
1009
1010    /// Write the specified values until either the buffer is drained or the
1011    /// read end is dropped.
1012    ///
1013    /// The buffer is returned back to the caller and may still contain items
1014    /// within it if the other end of this stream was dropped. Use the
1015    /// [`is_closed`](Self::is_closed) method to determine if the other end is
1016    /// dropped.
1017    ///
1018    /// # Panics
1019    ///
1020    /// Panics if the store that the [`Accessor`] is derived from does not own
1021    /// this future.
1022    pub async fn write_all<B>(&mut self, accessor: impl AsAccessor, mut buffer: B) -> B
1023    where
1024        T: func::Lower + 'static,
1025        B: WriteBuffer<T>,
1026    {
1027        let accessor = accessor.as_accessor();
1028        while !self.is_closed() && buffer.remaining().len() > 0 {
1029            buffer = self.write(accessor, buffer).await;
1030        }
1031        buffer
1032    }
1033
1034    /// Wait for the read end of this `stream` to be dropped.
1035    ///
1036    /// # Panics
1037    ///
1038    /// Panics if the store that the [`Accessor`] is derived from does not own
1039    /// this future.
1040    pub async fn watch_reader(&mut self, accessor: impl AsAccessor) {
1041        watch_reader(accessor, self.instance, self.id).await
1042    }
1043
1044    /// Close this `StreamWriter`, writing the default value.
1045    ///
1046    /// # Panics
1047    ///
1048    /// Panics if the store that the [`Accessor`] is derived from does not own
1049    /// this future. Usage of this future after calling `close` will also cause
1050    /// a panic.
1051    pub fn close(&mut self, mut store: impl AsContextMut) {
1052        // `self` should never be used again, but leave an invalid handle there just in case.
1053        let id = mem::replace(&mut self.id, TableId::new(0));
1054        self.instance
1055            .host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>)
1056            .unwrap()
1057    }
1058
1059    /// Convenience method around [`Self::close`].
1060    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1061        accessor.as_accessor().with(|access| self.close(access))
1062    }
1063
1064    /// Returns a [`GuardedStreamWriter`] which will auto-close this stream on
1065    /// drop and clean it up from the store.
1066    ///
1067    /// Note that the `accessor` provided must own this future and is
1068    /// additionally transferred to the `GuardedStreamWriter` return value.
1069    pub fn guard<A>(self, accessor: A) -> GuardedStreamWriter<T, A>
1070    where
1071        A: AsAccessor,
1072    {
1073        GuardedStreamWriter::new(accessor, self)
1074    }
1075}
1076
1077/// A [`StreamWriter`] paired with an [`Accessor`].
1078///
1079/// This is an RAII wrapper around [`StreamWriter`] that ensures it is closed
1080/// when dropped. This can be created through [`GuardedStreamWriter::new`] or
1081/// [`StreamWriter::guard`].
1082pub struct GuardedStreamWriter<T, A>
1083where
1084    A: AsAccessor,
1085{
1086    // This field is `None` to implement the conversion from this guard back to
1087    // `StreamWriter`. When `None` is seen in the destructor it will cause the
1088    // destructor to do nothing.
1089    writer: Option<StreamWriter<T>>,
1090    accessor: A,
1091}
1092
1093impl<T, A> GuardedStreamWriter<T, A>
1094where
1095    A: AsAccessor,
1096{
1097    /// Create a new `GuardedStreamWriter` with the specified `accessor` and `writer`.
1098    pub fn new(accessor: A, writer: StreamWriter<T>) -> Self {
1099        Self {
1100            writer: Some(writer),
1101            accessor,
1102        }
1103    }
1104
1105    /// Wrapper for [`StreamWriter::is_closed`].
1106    pub fn is_closed(&self) -> bool {
1107        self.writer.as_ref().unwrap().is_closed()
1108    }
1109
1110    /// Wrapper for [`StreamWriter::write`].
1111    pub async fn write<B>(&mut self, buffer: B) -> B
1112    where
1113        T: func::Lower + 'static,
1114        B: WriteBuffer<T>,
1115    {
1116        self.writer
1117            .as_mut()
1118            .unwrap()
1119            .write(&self.accessor, buffer)
1120            .await
1121    }
1122
1123    /// Wrapper for [`StreamWriter::write_all`].
1124    pub async fn write_all<B>(&mut self, buffer: B) -> B
1125    where
1126        T: func::Lower + 'static,
1127        B: WriteBuffer<T>,
1128    {
1129        self.writer
1130            .as_mut()
1131            .unwrap()
1132            .write_all(&self.accessor, buffer)
1133            .await
1134    }
1135
1136    /// Wrapper for [`StreamWriter::watch_reader`].
1137    pub async fn watch_reader(&mut self) {
1138        self.writer
1139            .as_mut()
1140            .unwrap()
1141            .watch_reader(&self.accessor)
1142            .await
1143    }
1144
1145    /// Extracts the underlying [`StreamWriter`] from this guard, returning it
1146    /// back.
1147    pub fn into_stream(self) -> StreamWriter<T> {
1148        self.into()
1149    }
1150}
1151
1152impl<T, A> From<GuardedStreamWriter<T, A>> for StreamWriter<T>
1153where
1154    A: AsAccessor,
1155{
1156    fn from(mut guard: GuardedStreamWriter<T, A>) -> Self {
1157        guard.writer.take().unwrap()
1158    }
1159}
1160
1161impl<T, A> Drop for GuardedStreamWriter<T, A>
1162where
1163    A: AsAccessor,
1164{
1165    fn drop(&mut self) {
1166        if let Some(writer) = &mut self.writer {
1167            writer.close_with(&self.accessor)
1168        }
1169    }
1170}
1171
1172/// Represents the readable end of a Component Model `stream`.
1173///
1174/// Note that `StreamReader` instances must be disposed of using `close`;
1175/// otherwise the in-store representation will leak and the writer end will hang
1176/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1177/// disposal happens automatically.
1178pub struct StreamReader<T> {
1179    instance: Instance,
1180    id: TableId<TransmitHandle>,
1181    closed: bool,
1182    _phantom: PhantomData<T>,
1183}
1184
1185impl<T> StreamReader<T> {
1186    fn new(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1187        Self {
1188            instance,
1189            id,
1190            closed: false,
1191            _phantom: PhantomData,
1192        }
1193    }
1194
1195    /// Returns whether this stream is "closed" meaning that the other end of
1196    /// the stream has been dropped.
1197    pub fn is_closed(&self) -> bool {
1198        self.closed
1199    }
1200
1201    /// Read values from this `stream`.
1202    ///
1203    /// The returned `Future` will yield a `(Some(_), _)` if the read completed
1204    /// (possibly with zero items if the write was empty).  It will return
1205    /// `(None, _)` if the read failed due to the closure of the write end. In
1206    /// either case, the returned buffer will be the same one passed as a
1207    /// parameter, with zero or more items added.
1208    ///
1209    /// # Panics
1210    ///
1211    /// Panics if the store that the [`Accessor`] is derived from does not own
1212    /// this future.
1213    pub async fn read<B>(&mut self, accessor: impl AsAccessor, buffer: B) -> B
1214    where
1215        T: func::Lift + 'static,
1216        B: ReadBuffer<T> + Send + 'static,
1217    {
1218        let result = self
1219            .instance
1220            .host_read_async(
1221                accessor.as_accessor(),
1222                self.id,
1223                buffer,
1224                TransmitKind::Stream,
1225            )
1226            .await;
1227
1228        match result {
1229            Ok(HostResult { buffer, dropped }) => {
1230                if self.closed {
1231                    debug_assert!(dropped);
1232                }
1233                self.closed = dropped;
1234                buffer
1235            }
1236            Err(_) => {
1237                todo!("guarantee buffer recovery if `host_read` fails")
1238            }
1239        }
1240    }
1241
1242    /// Wait until the write end of this `stream` is dropped.
1243    ///
1244    /// # Panics
1245    ///
1246    /// Panics if the store that the [`Accessor`] is derived from does not own
1247    /// this future.
1248    pub async fn watch_writer(&mut self, accessor: impl AsAccessor) {
1249        watch_writer(accessor, self.instance, self.id).await
1250    }
1251
1252    /// Convert this `StreamReader` into a [`Val`].
1253    // See TODO comment for `StreamAny`; this is prone to handle leakage.
1254    pub fn into_val(self) -> Val {
1255        Val::Stream(StreamAny(self.id.rep()))
1256    }
1257
1258    /// Attempt to convert the specified [`Val`] to a `StreamReader`.
1259    pub fn from_val(
1260        mut store: impl AsContextMut<Data: Send>,
1261        instance: Instance,
1262        value: &Val,
1263    ) -> Result<Self> {
1264        let Val::Stream(StreamAny(rep)) = value else {
1265            bail!("expected `stream`; got `{}`", value.desc());
1266        };
1267        let store = store.as_context_mut();
1268        let id = TableId::<TransmitHandle>::new(*rep);
1269        instance.concurrent_state_mut(store.0).get(id)?; // Just make sure it's present
1270        Ok(Self::new(id, instance))
1271    }
1272
1273    /// Transfer ownership of the read end of a stream from a guest to the host.
1274    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1275        match ty {
1276            InterfaceType::Stream(src) => {
1277                let state_table = cx
1278                    .instance_mut()
1279                    .concurrent_state_mut()
1280                    .state_table(TableIndex::Stream(src));
1281                let (rep, state) =
1282                    get_mut_by_index_from(state_table, TableIndex::Stream(src), index)?;
1283
1284                match state {
1285                    StreamFutureState::Read { done: true } => bail!(
1286                        "cannot lift stream after being notified that the writable end dropped"
1287                    ),
1288                    StreamFutureState::Read { done: false } => {
1289                        state_table.remove_by_index(index)?;
1290                    }
1291                    StreamFutureState::Write { .. } => bail!("cannot transfer write end of stream"),
1292                    StreamFutureState::Busy => bail!("cannot transfer busy stream"),
1293                }
1294
1295                let id = TableId::<TransmitHandle>::new(rep);
1296
1297                Ok(Self::new(id, cx.instance_handle()))
1298            }
1299            _ => func::bad_type_info(),
1300        }
1301    }
1302
1303    /// Close this `StreamReader`, writing the default value.
1304    ///
1305    /// # Panics
1306    ///
1307    /// Panics if the store that the [`Accessor`] is derived from does not own
1308    /// this future. Usage of this future after calling `close` will also cause
1309    /// a panic.
1310    pub fn close(&mut self, mut store: impl AsContextMut) {
1311        // `self` should never be used again, but leave an invalid handle there just in case.
1312        let id = mem::replace(&mut self.id, TableId::new(0));
1313        self.instance
1314            .host_drop_reader(
1315                store.as_context_mut().0.traitobj_mut(),
1316                id,
1317                TransmitKind::Stream,
1318            )
1319            .unwrap()
1320    }
1321
1322    /// Convenience method around [`Self::close`].
1323    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1324        accessor.as_accessor().with(|access| self.close(access))
1325    }
1326
1327    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1328    /// drop and clean it up from the store.
1329    ///
1330    /// Note that the `accessor` provided must own this future and is
1331    /// additionally transferred to the `GuardedStreamReader` return value.
1332    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1333    where
1334        A: AsAccessor,
1335    {
1336        GuardedStreamReader::new(accessor, self)
1337    }
1338}
1339
1340impl<T> fmt::Debug for StreamReader<T> {
1341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1342        f.debug_struct("StreamReader")
1343            .field("id", &self.id)
1344            .field("instance", &self.instance)
1345            .finish()
1346    }
1347}
1348
1349/// Transfer ownership of the read end of a stream from the host to a guest.
1350pub(crate) fn lower_stream_to_index<U>(
1351    rep: u32,
1352    cx: &mut LowerContext<'_, U>,
1353    ty: InterfaceType,
1354) -> Result<u32> {
1355    match ty {
1356        InterfaceType::Stream(dst) => {
1357            let concurrent_state = cx.instance_mut().concurrent_state_mut();
1358            let state = concurrent_state
1359                .get(TableId::<TransmitHandle>::new(rep))?
1360                .state;
1361            let rep = concurrent_state.get(state)?.read_handle.rep();
1362
1363            concurrent_state
1364                .state_table(TableIndex::Stream(dst))
1365                .insert(
1366                    rep,
1367                    WaitableState::Stream(dst, StreamFutureState::Read { done: false }),
1368                )
1369        }
1370        _ => func::bad_type_info(),
1371    }
1372}
1373
1374// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1375// safe and correct since we lift and lower stream handles as `u32`s.
1376unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1377    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1378
1379    type Lower = <u32 as func::ComponentType>::Lower;
1380
1381    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1382        match ty {
1383            InterfaceType::Stream(_) => Ok(()),
1384            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1385        }
1386    }
1387}
1388
1389// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1390unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1391    fn linear_lower_to_flat<U>(
1392        &self,
1393        cx: &mut LowerContext<'_, U>,
1394        ty: InterfaceType,
1395        dst: &mut MaybeUninit<Self::Lower>,
1396    ) -> Result<()> {
1397        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1398            cx,
1399            InterfaceType::U32,
1400            dst,
1401        )
1402    }
1403
1404    fn linear_lower_to_memory<U>(
1405        &self,
1406        cx: &mut LowerContext<'_, U>,
1407        ty: InterfaceType,
1408        offset: usize,
1409    ) -> Result<()> {
1410        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1411            cx,
1412            InterfaceType::U32,
1413            offset,
1414        )
1415    }
1416}
1417
1418// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1419unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1420    fn linear_lift_from_flat(
1421        cx: &mut LiftContext<'_>,
1422        ty: InterfaceType,
1423        src: &Self::Lower,
1424    ) -> Result<Self> {
1425        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1426        Self::lift_from_index(cx, ty, index)
1427    }
1428
1429    fn linear_lift_from_memory(
1430        cx: &mut LiftContext<'_>,
1431        ty: InterfaceType,
1432        bytes: &[u8],
1433    ) -> Result<Self> {
1434        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1435        Self::lift_from_index(cx, ty, index)
1436    }
1437}
1438
1439/// A [`StreamReader`] paired with an [`Accessor`].
1440///
1441/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1442/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1443/// [`StreamReader::guard`].
1444pub struct GuardedStreamReader<T, A>
1445where
1446    A: AsAccessor,
1447{
1448    // This field is `None` to implement the conversion from this guard back to
1449    // `StreamReader`. When `None` is seen in the destructor it will cause the
1450    // destructor to do nothing.
1451    reader: Option<StreamReader<T>>,
1452    accessor: A,
1453}
1454
1455impl<T, A> GuardedStreamReader<T, A>
1456where
1457    A: AsAccessor,
1458{
1459    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1460    /// `reader`.
1461    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1462        Self {
1463            reader: Some(reader),
1464            accessor,
1465        }
1466    }
1467
1468    /// Wrapper for `StreamReader::is_closed`
1469    pub fn is_closed(&self) -> bool {
1470        self.reader.as_ref().unwrap().is_closed()
1471    }
1472
1473    /// Wrapper for `StreamReader::read`.
1474    pub async fn read<B>(&mut self, buffer: B) -> B
1475    where
1476        T: func::Lift + 'static,
1477        B: ReadBuffer<T> + Send + 'static,
1478    {
1479        self.reader
1480            .as_mut()
1481            .unwrap()
1482            .read(&self.accessor, buffer)
1483            .await
1484    }
1485
1486    /// Wrapper for `StreamReader::watch_writer`.
1487    pub async fn watch_writer(&mut self) {
1488        self.reader
1489            .as_mut()
1490            .unwrap()
1491            .watch_writer(&self.accessor)
1492            .await
1493    }
1494
1495    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1496    /// back.
1497    pub fn into_stream(self) -> StreamReader<T> {
1498        self.into()
1499    }
1500}
1501
1502impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1503where
1504    A: AsAccessor,
1505{
1506    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1507        guard.reader.take().unwrap()
1508    }
1509}
1510
1511impl<T, A> Drop for GuardedStreamReader<T, A>
1512where
1513    A: AsAccessor,
1514{
1515    fn drop(&mut self) {
1516        if let Some(reader) = &mut self.reader {
1517            reader.close_with(&self.accessor)
1518        }
1519    }
1520}
1521
1522/// Represents a Component Model `error-context`.
1523pub struct ErrorContext {
1524    rep: u32,
1525}
1526
1527impl ErrorContext {
1528    pub(crate) fn new(rep: u32) -> Self {
1529        Self { rep }
1530    }
1531
1532    /// Convert this `ErrorContext` into a [`Val`].
1533    pub fn into_val(self) -> Val {
1534        Val::ErrorContext(ErrorContextAny(self.rep))
1535    }
1536
1537    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1538    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1539        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1540            bail!("expected `error-context`; got `{}`", value.desc());
1541        };
1542        Ok(Self::new(*rep))
1543    }
1544
1545    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1546        match ty {
1547            InterfaceType::ErrorContext(src) => {
1548                let (rep, _) = cx
1549                    .instance_mut()
1550                    .concurrent_state_mut()
1551                    .error_context_tables
1552                    .get_mut(src)
1553                    .expect("error context table index present in (sub)component table during lift")
1554                    .get_mut_by_index(index)?;
1555
1556                Ok(Self { rep })
1557            }
1558            _ => func::bad_type_info(),
1559        }
1560    }
1561}
1562
1563pub(crate) fn lower_error_context_to_index<U>(
1564    rep: u32,
1565    cx: &mut LowerContext<'_, U>,
1566    ty: InterfaceType,
1567) -> Result<u32> {
1568    match ty {
1569        InterfaceType::ErrorContext(dst) => {
1570            let tbl = cx
1571                .instance_mut()
1572                .concurrent_state_mut()
1573                .error_context_tables
1574                .get_mut(dst)
1575                .expect("error context table index present in (sub)component table during lower");
1576
1577            if let Some((dst_idx, dst_state)) = tbl.get_mut_by_rep(rep) {
1578                dst_state.0 += 1;
1579                Ok(dst_idx)
1580            } else {
1581                tbl.insert(rep, LocalErrorContextRefCount(1))
1582            }
1583        }
1584        _ => func::bad_type_info(),
1585    }
1586}
1587
1588// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1589// safe and correct since we lift and lower future handles as `u32`s.
1590unsafe impl func::ComponentType for ErrorContext {
1591    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1592
1593    type Lower = <u32 as func::ComponentType>::Lower;
1594
1595    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1596        match ty {
1597            InterfaceType::ErrorContext(_) => Ok(()),
1598            other => bail!("expected `error`, found `{}`", func::desc(other)),
1599        }
1600    }
1601}
1602
1603// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1604unsafe impl func::Lower for ErrorContext {
1605    fn linear_lower_to_flat<T>(
1606        &self,
1607        cx: &mut LowerContext<'_, T>,
1608        ty: InterfaceType,
1609        dst: &mut MaybeUninit<Self::Lower>,
1610    ) -> Result<()> {
1611        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1612            cx,
1613            InterfaceType::U32,
1614            dst,
1615        )
1616    }
1617
1618    fn linear_lower_to_memory<T>(
1619        &self,
1620        cx: &mut LowerContext<'_, T>,
1621        ty: InterfaceType,
1622        offset: usize,
1623    ) -> Result<()> {
1624        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1625            cx,
1626            InterfaceType::U32,
1627            offset,
1628        )
1629    }
1630}
1631
1632// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1633unsafe impl func::Lift for ErrorContext {
1634    fn linear_lift_from_flat(
1635        cx: &mut LiftContext<'_>,
1636        ty: InterfaceType,
1637        src: &Self::Lower,
1638    ) -> Result<Self> {
1639        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1640        Self::lift_from_index(cx, ty, index)
1641    }
1642
1643    fn linear_lift_from_memory(
1644        cx: &mut LiftContext<'_>,
1645        ty: InterfaceType,
1646        bytes: &[u8],
1647    ) -> Result<Self> {
1648        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1649        Self::lift_from_index(cx, ty, index)
1650    }
1651}
1652
1653/// Represents the read or write end of a stream or future.
1654pub(super) struct TransmitHandle {
1655    pub(super) common: WaitableCommon,
1656    /// See `TransmitState`
1657    state: TableId<TransmitState>,
1658}
1659
1660impl TransmitHandle {
1661    fn new(state: TableId<TransmitState>) -> Self {
1662        Self {
1663            common: WaitableCommon::default(),
1664            state,
1665        }
1666    }
1667}
1668
1669impl TableDebug for TransmitHandle {
1670    fn type_name() -> &'static str {
1671        "TransmitHandle"
1672    }
1673}
1674
1675/// Represents the state of a stream or future.
1676struct TransmitState {
1677    /// The write end of the stream or future.
1678    write_handle: TableId<TransmitHandle>,
1679    /// The read end of the stream or future.
1680    read_handle: TableId<TransmitHandle>,
1681    /// See `WriteState`
1682    write: WriteState,
1683    /// See `ReadState`
1684    read: ReadState,
1685    /// The `Waker`, if any, to be woken when the write end of the stream or
1686    /// future is dropped.
1687    ///
1688    /// This will signal to the host-owned read end that the write end has been
1689    /// dropped.
1690    writer_watcher: Option<Waker>,
1691    /// Like `writer_watcher`, but for the reverse direction.
1692    reader_watcher: Option<Waker>,
1693    /// Whether futher values may be transmitted via this stream or future.
1694    done: bool,
1695}
1696
1697impl Default for TransmitState {
1698    fn default() -> Self {
1699        Self {
1700            write_handle: TableId::new(0),
1701            read_handle: TableId::new(0),
1702            read: ReadState::Open,
1703            write: WriteState::Open,
1704            reader_watcher: None,
1705            writer_watcher: None,
1706            done: false,
1707        }
1708    }
1709}
1710
1711impl TableDebug for TransmitState {
1712    fn type_name() -> &'static str {
1713        "TransmitState"
1714    }
1715}
1716
1717/// Represents the state of the write end of a stream or future.
1718enum WriteState {
1719    /// The write end is open, but no write is pending.
1720    Open,
1721    /// The write end is owned by a guest task and a write is pending.
1722    GuestReady {
1723        ty: TableIndex,
1724        flat_abi: Option<FlatAbi>,
1725        options: Options,
1726        address: usize,
1727        count: usize,
1728        handle: u32,
1729        post_write: PostWrite,
1730    },
1731    /// The write end is owned by a host task and a write is pending.
1732    HostReady {
1733        accept:
1734            Box<dyn FnOnce(&mut dyn VMStore, Instance, Reader) -> Result<ReturnCode> + Send + Sync>,
1735        post_write: PostWrite,
1736    },
1737    /// The write end has been dropped.
1738    Dropped,
1739}
1740
1741impl fmt::Debug for WriteState {
1742    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1743        match self {
1744            Self::Open => f.debug_tuple("Open").finish(),
1745            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1746            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1747            Self::Dropped => f.debug_tuple("Dropped").finish(),
1748        }
1749    }
1750}
1751
1752/// Represents the state of the read end of a stream or future.
1753enum ReadState {
1754    /// The read end is open, but no read is pending.
1755    Open,
1756    /// The read end is owned by a guest task and a read is pending.
1757    GuestReady {
1758        ty: TableIndex,
1759        flat_abi: Option<FlatAbi>,
1760        options: Options,
1761        address: usize,
1762        count: usize,
1763        handle: u32,
1764    },
1765    /// The read end is owned by a host task and a read is pending.
1766    HostReady {
1767        accept: Box<dyn FnOnce(Writer) -> Result<ReturnCode> + Send + Sync>,
1768    },
1769    /// The read end has been dropped.
1770    Dropped,
1771}
1772
1773impl fmt::Debug for ReadState {
1774    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1775        match self {
1776            Self::Open => f.debug_tuple("Open").finish(),
1777            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1778            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1779            Self::Dropped => f.debug_tuple("Dropped").finish(),
1780        }
1781    }
1782}
1783
1784/// Parameter type to pass to a `ReadState::HostReady` closure.
1785///
1786/// See also `accept_writer`.
1787enum Writer<'a> {
1788    /// The write end is owned by a guest task.
1789    Guest {
1790        lift: &'a mut LiftContext<'a>,
1791        ty: Option<InterfaceType>,
1792        address: usize,
1793        count: usize,
1794    },
1795    /// The write end is owned by the host.
1796    Host {
1797        buffer: &'a mut UntypedWriteBuffer<'a>,
1798        count: usize,
1799    },
1800    /// The write end has been dropped.
1801    End,
1802}
1803
1804/// Parameter type to pass to a `WriteState::HostReady` closure.
1805///
1806/// See also `accept_reader`.
1807enum Reader<'a> {
1808    /// The read end is owned by a guest task.
1809    Guest {
1810        options: &'a Options,
1811        ty: TableIndex,
1812        address: usize,
1813        count: usize,
1814    },
1815    /// The read end is owned by the host.
1816    Host {
1817        accept: Box<dyn FnOnce(&mut UntypedWriteBuffer, usize) -> usize + 'a>,
1818    },
1819    /// The read end has been dropped.
1820    End,
1821}
1822
1823impl Instance {
1824    /// Create a new Component Model `future` as pair of writable and readable ends,
1825    /// the latter of which may be passed to guest code.
1826    ///
1827    /// `default` is a callback to be used if the writable end of the future is
1828    /// closed without having written a value.  You may supply e.g. `||
1829    /// unreachable!()` if you're sure that won't happen.
1830    pub fn future<T: func::Lower + func::Lift + Send + Sync + 'static>(
1831        self,
1832        mut store: impl AsContextMut,
1833        default: fn() -> T,
1834    ) -> Result<(FutureWriter<T>, FutureReader<T>)> {
1835        let (write, read) = self
1836            .concurrent_state_mut(store.as_context_mut().0)
1837            .new_transmit()?;
1838
1839        Ok((
1840            FutureWriter::new(default, write, self),
1841            FutureReader::new(read, self),
1842        ))
1843    }
1844
1845    /// Create a new Component Model `stream` as pair of writable and readable ends,
1846    /// the latter of which may be passed to guest code.
1847    pub fn stream<T: func::Lower + func::Lift + Send + 'static>(
1848        self,
1849        mut store: impl AsContextMut,
1850    ) -> Result<(StreamWriter<T>, StreamReader<T>)> {
1851        let (write, read) = self
1852            .concurrent_state_mut(store.as_context_mut().0)
1853            .new_transmit()?;
1854
1855        Ok((
1856            StreamWriter::new(write, self),
1857            StreamReader::new(read, self),
1858        ))
1859    }
1860
1861    /// Write to the specified stream or future from the host.
1862    fn host_write<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U>(
1863        self,
1864        mut store: StoreContextMut<U>,
1865        id: TableId<TransmitHandle>,
1866        mut buffer: B,
1867        kind: TransmitKind,
1868        post_write: PostWrite,
1869    ) -> Result<Result<HostResult<B>, oneshot::Receiver<HostResult<B>>>> {
1870        let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
1871        let transmit = self
1872            .concurrent_state_mut(store.0)
1873            .get_mut(transmit_id)
1874            .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?;
1875        log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read);
1876
1877        let new_state = if let ReadState::Dropped = &transmit.read {
1878            ReadState::Dropped
1879        } else {
1880            ReadState::Open
1881        };
1882
1883        if matches!(post_write, PostWrite::Drop) && !matches!(transmit.read, ReadState::Open) {
1884            transmit.write = WriteState::Dropped;
1885        }
1886
1887        Ok(match mem::replace(&mut transmit.read, new_state) {
1888            ReadState::Open => {
1889                assert!(matches!(&transmit.write, WriteState::Open));
1890
1891                let token = StoreToken::new(store.as_context_mut());
1892                let (tx, rx) = oneshot::channel();
1893                let state = WriteState::HostReady {
1894                    accept: Box::new(move |store, instance, reader| {
1895                        let (result, code) = accept_reader::<T, B, U>(
1896                            token.as_context_mut(store),
1897                            instance,
1898                            reader,
1899                            buffer,
1900                            kind,
1901                        )?;
1902                        _ = tx.send(result);
1903                        Ok(code)
1904                    }),
1905                    post_write,
1906                };
1907                self.concurrent_state_mut(store.0)
1908                    .get_mut(transmit_id)?
1909                    .write = state;
1910
1911                Err(rx)
1912            }
1913
1914            ReadState::GuestReady {
1915                ty,
1916                flat_abi: _,
1917                options,
1918                address,
1919                count,
1920                handle,
1921                ..
1922            } => {
1923                if let TransmitKind::Future = kind {
1924                    transmit.done = true;
1925                }
1926
1927                let read_handle = transmit.read_handle;
1928                let accept = move |mut store: StoreContextMut<U>| {
1929                    let (result, code) = accept_reader::<T, B, U>(
1930                        store.as_context_mut(),
1931                        self,
1932                        Reader::Guest {
1933                            options: &options,
1934                            ty,
1935                            address,
1936                            count,
1937                        },
1938                        buffer,
1939                        kind,
1940                    )?;
1941
1942                    self.concurrent_state_mut(store.0).set_event(
1943                        read_handle.rep(),
1944                        match ty {
1945                            TableIndex::Future(ty) => Event::FutureRead {
1946                                code,
1947                                pending: Some((ty, handle)),
1948                            },
1949                            TableIndex::Stream(ty) => Event::StreamRead {
1950                                code,
1951                                pending: Some((ty, handle)),
1952                            },
1953                        },
1954                    )?;
1955
1956                    anyhow::Ok(result)
1957                };
1958
1959                if T::MAY_REQUIRE_REALLOC {
1960                    // For payloads which may require a realloc call, use a
1961                    // oneshot::channel and background task.  This is necessary
1962                    // because calling the guest while there are host embedder
1963                    // frames on the stack is unsound.
1964                    let (tx, rx) = oneshot::channel();
1965                    let token = StoreToken::new(store.as_context_mut());
1966                    self.concurrent_state_mut(store.0).push_high_priority(
1967                        WorkItem::WorkerFunction(Mutex::new(Box::new(move |store, _| {
1968                            _ = tx.send(accept(token.as_context_mut(store))?);
1969                            Ok(())
1970                        }))),
1971                    );
1972                    Err(rx)
1973                } else {
1974                    // Optimize flat payloads (i.e. those which do not require
1975                    // calling the guest's realloc function) by lowering
1976                    // directly instead of using a oneshot::channel and
1977                    // background task.
1978                    Ok(accept(store)?)
1979                }
1980            }
1981
1982            ReadState::HostReady { accept } => {
1983                let count = buffer.remaining().len();
1984                let mut untyped = UntypedWriteBuffer::new(&mut buffer);
1985                let code = accept(Writer::Host {
1986                    buffer: &mut untyped,
1987                    count,
1988                })?;
1989                let (ReturnCode::Completed(_) | ReturnCode::Dropped(_)) = code else {
1990                    unreachable!()
1991                };
1992
1993                Ok(HostResult {
1994                    buffer,
1995                    dropped: false,
1996                })
1997            }
1998
1999            ReadState::Dropped => Ok(HostResult {
2000                buffer,
2001                dropped: true,
2002            }),
2003        })
2004    }
2005
2006    /// Async wrapper around `Self::host_write`.
2007    async fn host_write_async<T: func::Lower + Send + 'static, B: WriteBuffer<T>>(
2008        self,
2009        accessor: impl AsAccessor,
2010        id: TableId<TransmitHandle>,
2011        buffer: B,
2012        kind: TransmitKind,
2013    ) -> Result<HostResult<B>> {
2014        match accessor.as_accessor().with(move |mut access| {
2015            self.host_write(
2016                access.as_context_mut(),
2017                id,
2018                buffer,
2019                kind,
2020                PostWrite::Continue,
2021            )
2022        })? {
2023            Ok(result) => Ok(result),
2024            Err(rx) => Ok(rx.await?),
2025        }
2026    }
2027
2028    /// Read from the specified stream or future from the host.
2029    fn host_read<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
2030        self,
2031        store: StoreContextMut<U>,
2032        id: TableId<TransmitHandle>,
2033        mut buffer: B,
2034        kind: TransmitKind,
2035    ) -> Result<Result<HostResult<B>, oneshot::Receiver<HostResult<B>>>> {
2036        let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
2037        let transmit = self
2038            .concurrent_state_mut(store.0)
2039            .get_mut(transmit_id)
2040            .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?;
2041        log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write);
2042
2043        let new_state = if let WriteState::Dropped = &transmit.write {
2044            WriteState::Dropped
2045        } else {
2046            WriteState::Open
2047        };
2048
2049        Ok(match mem::replace(&mut transmit.write, new_state) {
2050            WriteState::Open => {
2051                assert!(matches!(&transmit.read, ReadState::Open));
2052
2053                let (tx, rx) = oneshot::channel();
2054                transmit.read = ReadState::HostReady {
2055                    accept: Box::new(move |writer| {
2056                        let (result, code) = accept_writer::<T, B, U>(writer, buffer, kind)?;
2057                        _ = tx.send(result);
2058                        Ok(code)
2059                    }),
2060                };
2061
2062                Err(rx)
2063            }
2064
2065            WriteState::GuestReady {
2066                ty,
2067                flat_abi: _,
2068                options,
2069                address,
2070                count,
2071                handle,
2072                post_write,
2073                ..
2074            } => {
2075                if let TableIndex::Future(_) = ty {
2076                    transmit.done = true;
2077                }
2078
2079                let write_handle = transmit.write_handle;
2080                let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self);
2081                let (result, code) = accept_writer::<T, B, U>(
2082                    Writer::Guest {
2083                        ty: payload(ty, lift.types),
2084                        lift,
2085                        address,
2086                        count,
2087                    },
2088                    buffer,
2089                    kind,
2090                )?;
2091
2092                let state = self.concurrent_state_mut(store.0);
2093                let pending = if let PostWrite::Drop = post_write {
2094                    state.get_mut(transmit_id)?.write = WriteState::Dropped;
2095                    false
2096                } else {
2097                    true
2098                };
2099
2100                state.set_event(
2101                    write_handle.rep(),
2102                    match ty {
2103                        TableIndex::Future(ty) => Event::FutureWrite {
2104                            code,
2105                            pending: pending.then_some((ty, handle)),
2106                        },
2107                        TableIndex::Stream(ty) => Event::StreamWrite {
2108                            code,
2109                            pending: pending.then_some((ty, handle)),
2110                        },
2111                    },
2112                )?;
2113
2114                Ok(result)
2115            }
2116
2117            WriteState::HostReady { accept, post_write } => {
2118                accept(
2119                    store.0.traitobj_mut(),
2120                    self,
2121                    Reader::Host {
2122                        accept: Box::new(|input, count| {
2123                            let count = count.min(buffer.remaining_capacity());
2124                            buffer.move_from(input.get_mut::<T>(), count);
2125                            count
2126                        }),
2127                    },
2128                )?;
2129
2130                if let PostWrite::Drop = post_write {
2131                    self.concurrent_state_mut(store.0)
2132                        .get_mut(transmit_id)?
2133                        .write = WriteState::Dropped;
2134                }
2135
2136                Ok(HostResult {
2137                    buffer,
2138                    dropped: false,
2139                })
2140            }
2141
2142            WriteState::Dropped => Ok(HostResult {
2143                buffer,
2144                dropped: true,
2145            }),
2146        })
2147    }
2148
2149    /// Async wrapper around `Self::host_read`.
2150    async fn host_read_async<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
2151        self,
2152        accessor: impl AsAccessor,
2153        id: TableId<TransmitHandle>,
2154        buffer: B,
2155        kind: TransmitKind,
2156    ) -> Result<HostResult<B>> {
2157        match accessor
2158            .as_accessor()
2159            .with(move |mut access| self.host_read(access.as_context_mut(), id, buffer, kind))?
2160        {
2161            Ok(result) => Ok(result),
2162            Err(rx) => Ok(rx.await?),
2163        }
2164    }
2165
2166    /// Drop the read end of a stream or future read from the host.
2167    fn host_drop_reader(
2168        self,
2169        store: &mut dyn VMStore,
2170        id: TableId<TransmitHandle>,
2171        kind: TransmitKind,
2172    ) -> Result<()> {
2173        let transmit_id = self.concurrent_state_mut(store).get(id)?.state;
2174        let state = self.concurrent_state_mut(store);
2175        let transmit = state
2176            .get_mut(transmit_id)
2177            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2178        log::trace!(
2179            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2180            transmit.read,
2181            transmit.write
2182        );
2183
2184        transmit.read = ReadState::Dropped;
2185        if let Some(waker) = transmit.reader_watcher.take() {
2186            waker.wake();
2187        }
2188
2189        // If the write end is already dropped, it should stay dropped,
2190        // otherwise, it should be opened.
2191        let new_state = if let WriteState::Dropped = &transmit.write {
2192            WriteState::Dropped
2193        } else {
2194            WriteState::Open
2195        };
2196
2197        let write_handle = transmit.write_handle;
2198
2199        match mem::replace(&mut transmit.write, new_state) {
2200            // If a guest is waiting to write, notify it that the read end has
2201            // been dropped.
2202            WriteState::GuestReady {
2203                ty,
2204                handle,
2205                post_write,
2206                ..
2207            } => {
2208                if let PostWrite::Drop = post_write {
2209                    state.delete_transmit(transmit_id)?;
2210                } else {
2211                    state.update_event(
2212                        write_handle.rep(),
2213                        match ty {
2214                            TableIndex::Future(ty) => Event::FutureWrite {
2215                                code: ReturnCode::Dropped(0),
2216                                pending: Some((ty, handle)),
2217                            },
2218                            TableIndex::Stream(ty) => Event::StreamWrite {
2219                                code: ReturnCode::Dropped(0),
2220                                pending: Some((ty, handle)),
2221                            },
2222                        },
2223                    )?;
2224                };
2225            }
2226
2227            WriteState::HostReady { accept, .. } => {
2228                accept(store, self, Reader::End)?;
2229            }
2230
2231            WriteState::Open => {
2232                state.update_event(
2233                    write_handle.rep(),
2234                    match kind {
2235                        TransmitKind::Future => Event::FutureWrite {
2236                            code: ReturnCode::Dropped(0),
2237                            pending: None,
2238                        },
2239                        TransmitKind::Stream => Event::StreamWrite {
2240                            code: ReturnCode::Dropped(0),
2241                            pending: None,
2242                        },
2243                    },
2244                )?;
2245            }
2246
2247            WriteState::Dropped => {
2248                log::trace!("host_drop_reader delete {transmit_id:?}");
2249                state.delete_transmit(transmit_id)?;
2250            }
2251        }
2252        Ok(())
2253    }
2254
2255    /// Drop the write end of a stream or future read from the host.
2256    fn host_drop_writer<T: func::Lower + Send + 'static, U>(
2257        self,
2258        mut store: StoreContextMut<U>,
2259        id: TableId<TransmitHandle>,
2260        default: Option<&dyn Fn() -> Result<T>>,
2261    ) -> Result<()> {
2262        let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state;
2263        let transmit = self
2264            .concurrent_state_mut(store.0)
2265            .get_mut(transmit_id)
2266            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2267        log::trace!(
2268            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2269            transmit.read,
2270            transmit.write
2271        );
2272
2273        if let Some(waker) = transmit.writer_watcher.take() {
2274            waker.wake();
2275        }
2276
2277        // Existing queued transmits must be updated with information for the impending writer closure
2278        match &mut transmit.write {
2279            WriteState::GuestReady { .. } => {
2280                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2281            }
2282            WriteState::HostReady { post_write, .. } => {
2283                *post_write = PostWrite::Drop;
2284            }
2285            v @ WriteState::Open => {
2286                if let (Some(default), false) = (
2287                    default,
2288                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2289                ) {
2290                    // This is a future, and we haven't written a value yet --
2291                    // write the default value.
2292                    _ = self.host_write(
2293                        store.as_context_mut(),
2294                        id,
2295                        Some(default()?),
2296                        TransmitKind::Future,
2297                        PostWrite::Drop,
2298                    )?;
2299                } else {
2300                    *v = WriteState::Dropped;
2301                }
2302            }
2303            WriteState::Dropped => unreachable!("write state is already dropped"),
2304        }
2305
2306        let transmit = self.concurrent_state_mut(store.0).get_mut(transmit_id)?;
2307
2308        // If the existing read state is dropped, then there's nothing to read
2309        // and we can keep it that way.
2310        //
2311        // If the read state was any other state, then we must set the new state to open
2312        // to indicate that there *is* data to be read
2313        let new_state = if let ReadState::Dropped = &transmit.read {
2314            ReadState::Dropped
2315        } else {
2316            ReadState::Open
2317        };
2318
2319        let read_handle = transmit.read_handle;
2320
2321        // Swap in the new read state
2322        match mem::replace(&mut transmit.read, new_state) {
2323            // If the guest was ready to read, then we cannot drop the reader (or writer);
2324            // we must deliver the event, and update the state associated with the handle to
2325            // represent that a read must be performed
2326            ReadState::GuestReady { ty, handle, .. } => {
2327                // Ensure the final read of the guest is queued, with appropriate closure indicator
2328                self.concurrent_state_mut(store.0).update_event(
2329                    read_handle.rep(),
2330                    match ty {
2331                        TableIndex::Future(ty) => Event::FutureRead {
2332                            code: ReturnCode::Dropped(0),
2333                            pending: Some((ty, handle)),
2334                        },
2335                        TableIndex::Stream(ty) => Event::StreamRead {
2336                            code: ReturnCode::Dropped(0),
2337                            pending: Some((ty, handle)),
2338                        },
2339                    },
2340                )?;
2341            }
2342
2343            // If the host was ready to read, and the writer end is being dropped (host->host write?)
2344            // signal to the reader that we've reached the end of the stream
2345            ReadState::HostReady { accept } => {
2346                accept(Writer::End)?;
2347            }
2348
2349            // If the read state is open, then there are no registered readers of the stream/future
2350            ReadState::Open => {
2351                self.concurrent_state_mut(store.0).update_event(
2352                    read_handle.rep(),
2353                    match default {
2354                        Some(_) => Event::FutureRead {
2355                            code: ReturnCode::Dropped(0),
2356                            pending: None,
2357                        },
2358                        None => Event::StreamRead {
2359                            code: ReturnCode::Dropped(0),
2360                            pending: None,
2361                        },
2362                    },
2363                )?;
2364            }
2365
2366            // If the read state was already dropped, then we can remove the transmit state completely
2367            // (both writer and reader have been dropped)
2368            ReadState::Dropped => {
2369                log::trace!("host_drop_writer delete {transmit_id:?}");
2370                self.concurrent_state_mut(store.0)
2371                    .delete_transmit(transmit_id)?;
2372            }
2373        }
2374        Ok(())
2375    }
2376
2377    /// Drop the writable end of the specified stream or future from the guest.
2378    pub(super) fn guest_drop_writable<T>(
2379        self,
2380        store: StoreContextMut<T>,
2381        ty: TableIndex,
2382        writer: u32,
2383    ) -> Result<()> {
2384        let (transmit_rep, state) = self
2385            .concurrent_state_mut(store.0)
2386            .state_table(ty)
2387            .remove_by_index(writer)
2388            .context("failed to find writer")?;
2389        let (state, kind) = match state {
2390            WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
2391            WaitableState::Future(_, state) => (state, TransmitKind::Future),
2392            _ => {
2393                bail!("invalid stream or future handle");
2394            }
2395        };
2396        match state {
2397            StreamFutureState::Write { .. } => {}
2398            StreamFutureState::Read { .. } => {
2399                bail!("passed read end to `{{stream|future}}.drop-writable`")
2400            }
2401            StreamFutureState::Busy => bail!("cannot drop busy stream or future"),
2402        }
2403
2404        let id = TableId::<TransmitHandle>::new(transmit_rep);
2405        log::trace!("guest_drop_writable: drop writer {id:?}");
2406        match kind {
2407            TransmitKind::Stream => {
2408                self.host_drop_writer(store, id, None::<&dyn Fn() -> Result<()>>)
2409            }
2410            TransmitKind::Future => self.host_drop_writer(
2411                store,
2412                id,
2413                Some(&|| {
2414                    Err::<(), _>(anyhow!(
2415                        "cannot drop future write end without first writing a value"
2416                    ))
2417                }),
2418            ),
2419        }
2420    }
2421
2422    /// Copy `count` items from `read_address` to `write_address` for the
2423    /// specified stream or future.
2424    fn copy<T: 'static>(
2425        self,
2426        mut store: StoreContextMut<T>,
2427        flat_abi: Option<FlatAbi>,
2428        write_ty: TableIndex,
2429        write_options: &Options,
2430        write_address: usize,
2431        read_ty: TableIndex,
2432        read_options: &Options,
2433        read_address: usize,
2434        count: usize,
2435        rep: u32,
2436    ) -> Result<()> {
2437        let types = self.id().get(store.0).component().types().clone();
2438        match (write_ty, read_ty) {
2439            (TableIndex::Future(write_ty), TableIndex::Future(read_ty)) => {
2440                assert_eq!(count, 1);
2441
2442                let val = types[types[write_ty].ty]
2443                    .payload
2444                    .map(|ty| {
2445                        let abi = types.canonical_abi(&ty);
2446                        // FIXME: needs to read an i64 for memory64
2447                        if write_address % usize::try_from(abi.align32)? != 0 {
2448                            bail!("write pointer not aligned");
2449                        }
2450
2451                        let lift =
2452                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2453                        let bytes = lift
2454                            .memory()
2455                            .get(write_address..)
2456                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2457                            .ok_or_else(|| {
2458                                anyhow::anyhow!("write pointer out of bounds of memory")
2459                            })?;
2460
2461                        Val::load(lift, ty, bytes)
2462                    })
2463                    .transpose()?;
2464
2465                if let Some(val) = val {
2466                    let lower =
2467                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2468                    let ty = types[types[read_ty].ty].payload.unwrap();
2469                    let ptr = func::validate_inbounds_dynamic(
2470                        types.canonical_abi(&ty),
2471                        lower.as_slice_mut(),
2472                        &ValRaw::u32(read_address.try_into().unwrap()),
2473                    )?;
2474                    val.store(lower, ty, ptr)?;
2475                }
2476            }
2477            (TableIndex::Stream(write_ty), TableIndex::Stream(read_ty)) => {
2478                if let Some(flat_abi) = flat_abi {
2479                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
2480                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2481                    if length_in_bytes > 0 {
2482                        if write_address % usize::try_from(flat_abi.align)? != 0 {
2483                            bail!("write pointer not aligned");
2484                        }
2485                        if read_address % usize::try_from(flat_abi.align)? != 0 {
2486                            bail!("read pointer not aligned");
2487                        }
2488
2489                        let store_opaque = store.0.store_opaque_mut();
2490
2491                        {
2492                            let src = write_options
2493                                .memory(store_opaque)
2494                                .get(write_address..)
2495                                .and_then(|b| b.get(..length_in_bytes))
2496                                .ok_or_else(|| {
2497                                    anyhow::anyhow!("write pointer out of bounds of memory")
2498                                })?
2499                                .as_ptr();
2500                            let dst = read_options
2501                                .memory_mut(store_opaque)
2502                                .get_mut(read_address..)
2503                                .and_then(|b| b.get_mut(..length_in_bytes))
2504                                .ok_or_else(|| {
2505                                    anyhow::anyhow!("read pointer out of bounds of memory")
2506                                })?
2507                                .as_mut_ptr();
2508                            // SAFETY: Both `src` and `dst` have been validated
2509                            // above.
2510                            unsafe { src.copy_to(dst, length_in_bytes) };
2511                        }
2512                    }
2513                } else {
2514                    let store_opaque = store.0.store_opaque_mut();
2515                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
2516                    let ty = types[types[write_ty].ty].payload.unwrap();
2517                    let abi = lift.types.canonical_abi(&ty);
2518                    let size = usize::try_from(abi.size32).unwrap();
2519                    if write_address % usize::try_from(abi.align32)? != 0 {
2520                        bail!("write pointer not aligned");
2521                    }
2522                    let bytes = lift
2523                        .memory()
2524                        .get(write_address..)
2525                        .and_then(|b| b.get(..size * count))
2526                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2527
2528                    let values = (0..count)
2529                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2530                        .collect::<Result<Vec<_>>>()?;
2531
2532                    let id = TableId::<TransmitHandle>::new(rep);
2533                    log::trace!("copy values {values:?} for {id:?}");
2534
2535                    let lower =
2536                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2537                    let ty = types[types[read_ty].ty].payload.unwrap();
2538                    let abi = lower.types.canonical_abi(&ty);
2539                    if read_address % usize::try_from(abi.align32)? != 0 {
2540                        bail!("read pointer not aligned");
2541                    }
2542                    let size = usize::try_from(abi.size32).unwrap();
2543                    lower
2544                        .as_slice_mut()
2545                        .get_mut(read_address..)
2546                        .and_then(|b| b.get_mut(..size * count))
2547                        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2548                    let mut ptr = read_address;
2549                    for value in values {
2550                        value.store(lower, ty, ptr)?;
2551                        ptr += size
2552                    }
2553                }
2554            }
2555            _ => unreachable!(),
2556        }
2557
2558        Ok(())
2559    }
2560
2561    /// Write to the specified stream or future from the guest.
2562    pub(super) fn guest_write<T: 'static>(
2563        self,
2564        mut store: StoreContextMut<T>,
2565        ty: TableIndex,
2566        options: OptionsIndex,
2567        flat_abi: Option<FlatAbi>,
2568        handle: u32,
2569        address: u32,
2570        count: u32,
2571    ) -> Result<ReturnCode> {
2572        let address = usize::try_from(address).unwrap();
2573        let count = usize::try_from(count).unwrap();
2574        let options = Options::new_index(store.0, self, options);
2575        if !options.async_() {
2576            bail!("synchronous stream and future writes not yet supported");
2577        }
2578        let concurrent_state = self.concurrent_state_mut(store.0);
2579        let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?;
2580        let StreamFutureState::Write { done } = *state else {
2581            bail!(
2582                "invalid handle {handle}; expected `Write`; got {:?}",
2583                *state
2584            );
2585        };
2586
2587        if done {
2588            bail!("cannot write to stream after being notified that the readable end dropped");
2589        }
2590
2591        *state = StreamFutureState::Busy;
2592        let transmit_handle = TableId::<TransmitHandle>::new(rep);
2593        let transmit_id = concurrent_state.get(transmit_handle)?.state;
2594        let transmit = concurrent_state.get_mut(transmit_id)?;
2595        log::trace!(
2596            "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2597            transmit.read
2598        );
2599
2600        if transmit.done {
2601            bail!("cannot write to future after previous write succeeded or readable end dropped");
2602        }
2603
2604        let new_state = if let ReadState::Dropped = &transmit.read {
2605            ReadState::Dropped
2606        } else {
2607            ReadState::Open
2608        };
2609
2610        let set_guest_ready = |me: &mut ConcurrentState| {
2611            let transmit = me.get_mut(transmit_id)?;
2612            assert!(matches!(&transmit.write, WriteState::Open));
2613            transmit.write = WriteState::GuestReady {
2614                ty,
2615                flat_abi,
2616                options,
2617                address,
2618                count,
2619                handle,
2620                post_write: PostWrite::Continue,
2621            };
2622            Ok::<_, crate::Error>(())
2623        };
2624
2625        let result = match mem::replace(&mut transmit.read, new_state) {
2626            ReadState::GuestReady {
2627                ty: read_ty,
2628                flat_abi: read_flat_abi,
2629                options: read_options,
2630                address: read_address,
2631                count: read_count,
2632                handle: read_handle,
2633            } => {
2634                assert_eq!(flat_abi, read_flat_abi);
2635
2636                if let TableIndex::Future(_) = ty {
2637                    transmit.done = true;
2638                }
2639
2640                // Note that zero-length reads and writes are handling specially
2641                // by the spec to allow each end to signal readiness to the
2642                // other.  Quoting the spec:
2643                //
2644                // ```
2645                // The meaning of a read or write when the length is 0 is that
2646                // the caller is querying the "readiness" of the other
2647                // side. When a 0-length read/write rendezvous with a
2648                // non-0-length read/write, only the 0-length read/write
2649                // completes; the non-0-length read/write is kept pending (and
2650                // ready for a subsequent rendezvous).
2651                //
2652                // In the corner case where a 0-length read and write
2653                // rendezvous, only the writer is notified of readiness. To
2654                // avoid livelock, the Canonical ABI requires that a writer must
2655                // (eventually) follow a completed 0-length write with a
2656                // non-0-length write that is allowed to block (allowing the
2657                // reader end to run and rendezvous with its own non-0-length
2658                // read).
2659                // ```
2660
2661                let write_complete = count == 0 || read_count > 0;
2662                let read_complete = count > 0;
2663                let read_buffer_remaining = count < read_count;
2664
2665                let read_handle_rep = transmit.read_handle.rep();
2666
2667                let count = count.min(read_count);
2668
2669                self.copy(
2670                    store.as_context_mut(),
2671                    flat_abi,
2672                    ty,
2673                    &options,
2674                    address,
2675                    read_ty,
2676                    &read_options,
2677                    read_address,
2678                    count,
2679                    rep,
2680                )?;
2681
2682                let instance = self.id().get_mut(store.0);
2683                let types = instance.component().types();
2684                let item_size = payload(ty, types)
2685                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2686                    .unwrap_or(0);
2687                let concurrent_state = instance.concurrent_state_mut();
2688                if read_complete {
2689                    let count = u32::try_from(count).unwrap();
2690                    let total = if let Some(Event::StreamRead {
2691                        code: ReturnCode::Completed(old_total),
2692                        ..
2693                    }) = concurrent_state.take_event(read_handle_rep)?
2694                    {
2695                        count + old_total
2696                    } else {
2697                        count
2698                    };
2699
2700                    let code = ReturnCode::completed(ty.kind(), total);
2701
2702                    concurrent_state.set_event(
2703                        read_handle_rep,
2704                        match read_ty {
2705                            TableIndex::Future(ty) => Event::FutureRead {
2706                                code,
2707                                pending: Some((ty, read_handle)),
2708                            },
2709                            TableIndex::Stream(ty) => Event::StreamRead {
2710                                code,
2711                                pending: Some((ty, read_handle)),
2712                            },
2713                        },
2714                    )?;
2715                }
2716
2717                if read_buffer_remaining {
2718                    let transmit = concurrent_state.get_mut(transmit_id)?;
2719                    transmit.read = ReadState::GuestReady {
2720                        ty: read_ty,
2721                        flat_abi: read_flat_abi,
2722                        options: read_options,
2723                        address: read_address + (count * item_size),
2724                        count: read_count - count,
2725                        handle: read_handle,
2726                    };
2727                }
2728
2729                if write_complete {
2730                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2731                } else {
2732                    set_guest_ready(concurrent_state)?;
2733                    ReturnCode::Blocked
2734                }
2735            }
2736
2737            ReadState::HostReady { accept } => {
2738                if let TableIndex::Future(_) = ty {
2739                    transmit.done = true;
2740                }
2741
2742                let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self);
2743                accept(Writer::Guest {
2744                    ty: payload(ty, lift.types),
2745                    lift,
2746                    address,
2747                    count,
2748                })?
2749            }
2750
2751            ReadState::Open => {
2752                set_guest_ready(concurrent_state)?;
2753                ReturnCode::Blocked
2754            }
2755
2756            ReadState::Dropped => {
2757                if let TableIndex::Future(_) = ty {
2758                    transmit.done = true;
2759                }
2760
2761                ReturnCode::Dropped(0)
2762            }
2763        };
2764
2765        if result != ReturnCode::Blocked {
2766            let state = self.concurrent_state_mut(store.0);
2767            *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write {
2768                done: matches!(
2769                    (result, ty),
2770                    (ReturnCode::Dropped(_), TableIndex::Stream(_))
2771                ),
2772            };
2773        }
2774
2775        Ok(result)
2776    }
2777
2778    /// Read from the specified stream or future from the guest.
2779    pub(super) fn guest_read<T: 'static>(
2780        self,
2781        mut store: StoreContextMut<T>,
2782        ty: TableIndex,
2783        options: OptionsIndex,
2784        flat_abi: Option<FlatAbi>,
2785        handle: u32,
2786        address: u32,
2787        count: u32,
2788    ) -> Result<ReturnCode> {
2789        let address = usize::try_from(address).unwrap();
2790        let options = Options::new_index(store.0, self, options);
2791        if !options.async_() {
2792            bail!("synchronous stream and future reads not yet supported");
2793        }
2794        let concurrent_state = self.concurrent_state_mut(store.0);
2795        let (rep, state) = concurrent_state.get_mut_by_index(ty, handle)?;
2796        let StreamFutureState::Read { done } = *state else {
2797            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
2798        };
2799
2800        if done {
2801            bail!("cannot read from stream after being notified that the writable end dropped");
2802        }
2803
2804        *state = StreamFutureState::Busy;
2805        let transmit_handle = TableId::<TransmitHandle>::new(rep);
2806        let transmit_id = concurrent_state.get(transmit_handle)?.state;
2807        let transmit = concurrent_state.get_mut(transmit_id)?;
2808        log::trace!(
2809            "guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2810            transmit.write
2811        );
2812
2813        if transmit.done {
2814            bail!("cannot read from future after previous read succeeded");
2815        }
2816
2817        let new_state = if let WriteState::Dropped = &transmit.write {
2818            WriteState::Dropped
2819        } else {
2820            WriteState::Open
2821        };
2822
2823        let set_guest_ready = |me: &mut ConcurrentState| {
2824            let transmit = me.get_mut(transmit_id)?;
2825            assert!(matches!(&transmit.read, ReadState::Open));
2826            transmit.read = ReadState::GuestReady {
2827                ty,
2828                flat_abi,
2829                options,
2830                address,
2831                count: usize::try_from(count).unwrap(),
2832                handle,
2833            };
2834            Ok::<_, crate::Error>(())
2835        };
2836
2837        let result = match mem::replace(&mut transmit.write, new_state) {
2838            WriteState::GuestReady {
2839                ty: write_ty,
2840                flat_abi: write_flat_abi,
2841                options: write_options,
2842                address: write_address,
2843                count: write_count,
2844                handle: write_handle,
2845                post_write,
2846            } => {
2847                assert_eq!(flat_abi, write_flat_abi);
2848
2849                if let TableIndex::Future(_) = ty {
2850                    transmit.done = true;
2851                }
2852
2853                let write_handle_rep = transmit.write_handle.rep();
2854
2855                // See the comment in `guest_write` for the
2856                // `ReadState::GuestReady` case concerning zero-length reads and
2857                // writes.
2858
2859                let count = usize::try_from(count).unwrap();
2860
2861                let write_complete = write_count == 0 || count > 0;
2862                let read_complete = write_count > 0;
2863                let write_buffer_remaining = count < write_count;
2864
2865                let count = count.min(write_count);
2866
2867                self.copy(
2868                    store.as_context_mut(),
2869                    flat_abi,
2870                    write_ty,
2871                    &write_options,
2872                    write_address,
2873                    ty,
2874                    &options,
2875                    address,
2876                    count,
2877                    rep,
2878                )?;
2879
2880                let instance = self.id().get_mut(store.0);
2881                let types = instance.component().types();
2882                let item_size = payload(ty, types)
2883                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2884                    .unwrap_or(0);
2885                let concurrent_state = instance.concurrent_state_mut();
2886                let pending = if let PostWrite::Drop = post_write {
2887                    concurrent_state.get_mut(transmit_id)?.write = WriteState::Dropped;
2888                    false
2889                } else {
2890                    true
2891                };
2892
2893                if write_complete {
2894                    let count = u32::try_from(count).unwrap();
2895                    let total = if let Some(Event::StreamWrite {
2896                        code: ReturnCode::Completed(old_total),
2897                        ..
2898                    }) = concurrent_state.take_event(write_handle_rep)?
2899                    {
2900                        count + old_total
2901                    } else {
2902                        count
2903                    };
2904
2905                    let code = ReturnCode::completed(ty.kind(), total);
2906
2907                    concurrent_state.set_event(
2908                        write_handle_rep,
2909                        match write_ty {
2910                            TableIndex::Future(ty) => Event::FutureWrite {
2911                                code,
2912                                pending: pending.then_some((ty, write_handle)),
2913                            },
2914                            TableIndex::Stream(ty) => Event::StreamWrite {
2915                                code,
2916                                pending: pending.then_some((ty, write_handle)),
2917                            },
2918                        },
2919                    )?;
2920                }
2921
2922                if write_buffer_remaining {
2923                    let transmit = concurrent_state.get_mut(transmit_id)?;
2924                    transmit.write = WriteState::GuestReady {
2925                        ty: write_ty,
2926                        flat_abi: write_flat_abi,
2927                        options: write_options,
2928                        address: write_address + (count * item_size),
2929                        count: write_count - count,
2930                        handle: write_handle,
2931                        post_write,
2932                    };
2933                }
2934
2935                if read_complete {
2936                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2937                } else {
2938                    set_guest_ready(concurrent_state)?;
2939                    ReturnCode::Blocked
2940                }
2941            }
2942
2943            WriteState::HostReady { accept, post_write } => {
2944                if let TableIndex::Future(_) = ty {
2945                    transmit.done = true;
2946                }
2947
2948                let code = accept(
2949                    store.0.traitobj_mut(),
2950                    self,
2951                    Reader::Guest {
2952                        options: &options,
2953                        ty,
2954                        address,
2955                        count: count.try_into().unwrap(),
2956                    },
2957                )?;
2958
2959                if let PostWrite::Drop = post_write {
2960                    self.concurrent_state_mut(store.0)
2961                        .get_mut(transmit_id)?
2962                        .write = WriteState::Dropped;
2963                }
2964
2965                code
2966            }
2967
2968            WriteState::Open => {
2969                set_guest_ready(concurrent_state)?;
2970                ReturnCode::Blocked
2971            }
2972
2973            WriteState::Dropped => ReturnCode::Dropped(0),
2974        };
2975
2976        if result != ReturnCode::Blocked {
2977            let state = self.concurrent_state_mut(store.0);
2978            *state.get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read {
2979                done: matches!(
2980                    (result, ty),
2981                    (ReturnCode::Dropped(_), TableIndex::Stream(_))
2982                ),
2983            };
2984        }
2985
2986        Ok(result)
2987    }
2988
2989    /// Drop the readable end of the specified stream or future from the guest.
2990    fn guest_drop_readable(
2991        self,
2992        store: &mut dyn VMStore,
2993        ty: TableIndex,
2994        reader: u32,
2995    ) -> Result<()> {
2996        let concurrent_state = self.concurrent_state_mut(store);
2997        let (rep, state) = concurrent_state.state_table(ty).remove_by_index(reader)?;
2998        let (state, kind) = match state {
2999            WaitableState::Stream(_, state) => (state, TransmitKind::Stream),
3000            WaitableState::Future(_, state) => (state, TransmitKind::Future),
3001            _ => {
3002                bail!("invalid stream or future handle");
3003            }
3004        };
3005        match state {
3006            StreamFutureState::Read { .. } => {}
3007            StreamFutureState::Write { .. } => {
3008                bail!("passed write end to `{{stream|future}}.drop-readable`")
3009            }
3010            StreamFutureState::Busy => bail!("cannot drop busy stream or future"),
3011        }
3012        let id = TableId::<TransmitHandle>::new(rep);
3013        log::trace!("guest_drop_readable: drop reader {id:?}");
3014        self.host_drop_reader(store, id, kind)
3015    }
3016
3017    /// Create a new error context for the given component.
3018    pub(crate) fn error_context_new(
3019        self,
3020        store: &mut StoreOpaque,
3021        ty: TypeComponentLocalErrorContextTableIndex,
3022        options: OptionsIndex,
3023        debug_msg_address: u32,
3024        debug_msg_len: u32,
3025    ) -> Result<u32> {
3026        let options = Options::new_index(store, self, options);
3027        let lift_ctx = &mut LiftContext::new(store, &options, self);
3028        //  Read string from guest memory
3029        let address = usize::try_from(debug_msg_address)?;
3030        let len = usize::try_from(debug_msg_len)?;
3031        lift_ctx
3032            .memory()
3033            .get(address..)
3034            .and_then(|b| b.get(..len))
3035            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3036        let message = WasmStr::new(address, len, lift_ctx)?;
3037
3038        // Create a new ErrorContext that is tracked along with other concurrent state
3039        let err_ctx = ErrorContextState {
3040            debug_msg: message
3041                .to_str_from_memory(options.memory(store))?
3042                .to_string(),
3043        };
3044        let state = self.concurrent_state_mut(store);
3045        let table_id = state.push(err_ctx)?;
3046        let global_ref_count_idx =
3047            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3048
3049        // Add to the global error context ref counts
3050        let _ = state
3051            .global_error_context_ref_counts
3052            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3053
3054        // Error context are tracked both locally (to a single component instance) and globally
3055        // the counts for both must stay in sync.
3056        //
3057        // Here we reflect the newly created global concurrent error context state into the
3058        // component instance's locally tracked count, along with the appropriate key into the global
3059        // ref tracking data structures to enable later lookup
3060        let local_tbl = &mut state.error_context_tables[ty];
3061
3062        assert!(
3063            !local_tbl.has_handle(table_id.rep()),
3064            "newly created error context state already tracked by component"
3065        );
3066        let local_idx = local_tbl.insert(table_id.rep(), LocalErrorContextRefCount(1))?;
3067
3068        Ok(local_idx)
3069    }
3070
3071    /// Retrieve the debug message from the specified error context.
3072    pub(super) fn error_context_debug_message<T>(
3073        self,
3074        store: StoreContextMut<T>,
3075        ty: TypeComponentLocalErrorContextTableIndex,
3076        options: OptionsIndex,
3077        err_ctx_handle: u32,
3078        debug_msg_address: u32,
3079    ) -> Result<()> {
3080        // Retrieve the error context and internal debug message
3081        let state = self.concurrent_state_mut(store.0);
3082        let (state_table_id_rep, _) = state
3083            .error_context_tables
3084            .get_mut(ty)
3085            .context("error context table index present in (sub)component lookup during debug_msg")?
3086            .get_mut_by_index(err_ctx_handle)?;
3087
3088        // Get the state associated with the error context
3089        let ErrorContextState { debug_msg } =
3090            state.get_mut(TableId::<ErrorContextState>::new(state_table_id_rep))?;
3091        let debug_msg = debug_msg.clone();
3092
3093        let options = Options::new_index(store.0, self, options);
3094        let types = self.id().get(store.0).component().types().clone();
3095        let lower_cx = &mut LowerContext::new(store, &options, &types, self);
3096        let debug_msg_address = usize::try_from(debug_msg_address)?;
3097        // Lower the string into the component's memory
3098        let offset = lower_cx
3099            .as_slice_mut()
3100            .get(debug_msg_address..)
3101            .and_then(|b| b.get(..debug_msg.bytes().len()))
3102            .map(|_| debug_msg_address)
3103            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3104        debug_msg
3105            .as_str()
3106            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3107
3108        Ok(())
3109    }
3110
3111    /// Implements the `future.drop-readable` intrinsic.
3112    pub(crate) fn future_drop_readable(
3113        self,
3114        store: &mut dyn VMStore,
3115        ty: TypeFutureTableIndex,
3116        reader: u32,
3117    ) -> Result<()> {
3118        self.guest_drop_readable(store, TableIndex::Future(ty), reader)
3119    }
3120
3121    /// Implements the `stream.drop-readable` intrinsic.
3122    pub(crate) fn stream_drop_readable(
3123        self,
3124        store: &mut dyn VMStore,
3125        ty: TypeStreamTableIndex,
3126        reader: u32,
3127    ) -> Result<()> {
3128        self.guest_drop_readable(store, TableIndex::Stream(ty), reader)
3129    }
3130}
3131
3132impl ConcurrentState {
3133    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
3134        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
3135    }
3136
3137    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3138        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
3139    }
3140
3141    /// Set or update the event for the specified waitable.
3142    ///
3143    /// If there is already an event set for this waitable, we assert that it is
3144    /// of the same variant as the new one and reuse the `ReturnCode` count and
3145    /// the `pending` field if applicable.
3146    // TODO: This is a bit awkward due to how
3147    // `Event::{Stream,Future}{Write,Read}` and
3148    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
3149    // Consider updating those representations in a way that allows this
3150    // function to be simplified.
3151    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3152        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
3153
3154        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
3155            let (ReturnCode::Completed(count)
3156            | ReturnCode::Dropped(count)
3157            | ReturnCode::Cancelled(count)) = old
3158            else {
3159                unreachable!()
3160            };
3161
3162            match new {
3163                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
3164                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
3165                _ => unreachable!(),
3166            }
3167        }
3168
3169        let event = match (waitable.take_event(self)?, event) {
3170            (None, _) => event,
3171            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
3172            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
3173            (
3174                Some(Event::StreamWrite {
3175                    code: old_code,
3176                    pending: old_pending,
3177                }),
3178                Event::StreamWrite { code, pending },
3179            ) => Event::StreamWrite {
3180                code: update_code(old_code, code),
3181                pending: old_pending.or(pending),
3182            },
3183            (
3184                Some(Event::StreamRead {
3185                    code: old_code,
3186                    pending: old_pending,
3187                }),
3188                Event::StreamRead { code, pending },
3189            ) => Event::StreamRead {
3190                code: update_code(old_code, code),
3191                pending: old_pending.or(pending),
3192            },
3193            _ => unreachable!(),
3194        };
3195
3196        waitable.set_event(self, Some(event))
3197    }
3198
3199    fn get_mut_by_index(
3200        &mut self,
3201        ty: TableIndex,
3202        index: u32,
3203    ) -> Result<(u32, &mut StreamFutureState)> {
3204        get_mut_by_index_from(self.state_table(ty), ty, index)
3205    }
3206
3207    /// Allocate a new future or stream, including the `TransmitState` and the
3208    /// `TransmitHandle`s corresponding to the read and write ends.
3209    fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
3210        let state_id = self.push(TransmitState::default())?;
3211
3212        let write = self.push(TransmitHandle::new(state_id))?;
3213        let read = self.push(TransmitHandle::new(state_id))?;
3214
3215        let state = self.get_mut(state_id)?;
3216        state.write_handle = write;
3217        state.read_handle = read;
3218
3219        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
3220
3221        Ok((write, read))
3222    }
3223
3224    /// Delete the specified future or stream, including the read and write ends.
3225    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
3226        let state = self.delete(state_id)?;
3227        self.delete(state.write_handle)?;
3228        self.delete(state.read_handle)?;
3229
3230        log::trace!(
3231            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
3232            state.write_handle,
3233            state.read_handle,
3234        );
3235
3236        Ok(())
3237    }
3238
3239    fn state_table(&mut self, ty: TableIndex) -> &mut StateTable<WaitableState> {
3240        let runtime_instance = match ty {
3241            TableIndex::Stream(ty) => self.component.types()[ty].instance,
3242            TableIndex::Future(ty) => self.component.types()[ty].instance,
3243        };
3244        &mut self.waitable_tables[runtime_instance]
3245    }
3246
3247    /// Allocate a new future or stream and grant ownership of both the read and
3248    /// write ends to the (sub-)component instance to which the specified
3249    /// `TableIndex` belongs.
3250    fn guest_new(&mut self, ty: TableIndex) -> Result<ResourcePair> {
3251        let (write, read) = self.new_transmit()?;
3252        let read = self.state_table(ty).insert(
3253            read.rep(),
3254            waitable_state(ty, StreamFutureState::Read { done: false }),
3255        )?;
3256        let write = self.state_table(ty).insert(
3257            write.rep(),
3258            waitable_state(ty, StreamFutureState::Write { done: false }),
3259        )?;
3260        Ok(ResourcePair { write, read })
3261    }
3262
3263    /// Cancel a pending stream or future write from the host.
3264    ///
3265    /// # Arguments
3266    ///
3267    /// * `rep` - The `TransmitState` rep for the stream or future.
3268    fn host_cancel_write(&mut self, rep: u32) -> Result<ReturnCode> {
3269        let transmit_id = TableId::<TransmitState>::new(rep);
3270        let transmit = self.get_mut(transmit_id)?;
3271        log::trace!(
3272            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3273            transmit.read,
3274            transmit.write
3275        );
3276
3277        let code = if let Some(event) =
3278            Waitable::Transmit(transmit.write_handle).take_event(self)?
3279        {
3280            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3281                unreachable!();
3282            };
3283            match (code, event) {
3284                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3285                    ReturnCode::Cancelled(count)
3286                }
3287                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3288                _ => unreachable!(),
3289            }
3290        } else {
3291            ReturnCode::Cancelled(0)
3292        };
3293
3294        let transmit = self.get_mut(transmit_id)?;
3295
3296        match &transmit.write {
3297            WriteState::GuestReady { .. } | WriteState::HostReady { .. } => {
3298                transmit.write = WriteState::Open;
3299            }
3300
3301            WriteState::Open | WriteState::Dropped => {}
3302        }
3303
3304        log::trace!("cancelled write {transmit_id:?}");
3305
3306        Ok(code)
3307    }
3308
3309    /// Cancel a pending stream or future read from the host.
3310    ///
3311    /// # Arguments
3312    ///
3313    /// * `rep` - The `TransmitState` rep for the stream or future.
3314    fn host_cancel_read(&mut self, rep: u32) -> Result<ReturnCode> {
3315        let transmit_id = TableId::<TransmitState>::new(rep);
3316        let transmit = self.get_mut(transmit_id)?;
3317        log::trace!(
3318            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3319            transmit.read,
3320            transmit.write
3321        );
3322
3323        let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? {
3324            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3325                unreachable!();
3326            };
3327            match (code, event) {
3328                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3329                    ReturnCode::Cancelled(count)
3330                }
3331                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3332                _ => unreachable!(),
3333            }
3334        } else {
3335            ReturnCode::Cancelled(0)
3336        };
3337
3338        let transmit = self.get_mut(transmit_id)?;
3339
3340        match &transmit.read {
3341            ReadState::GuestReady { .. } | ReadState::HostReady { .. } => {
3342                transmit.read = ReadState::Open;
3343            }
3344
3345            ReadState::Open | ReadState::Dropped => {}
3346        }
3347
3348        log::trace!("cancelled read {transmit_id:?}");
3349
3350        Ok(code)
3351    }
3352
3353    /// Cancel a pending write for the specified stream or future from the guest.
3354    fn guest_cancel_write(
3355        &mut self,
3356        ty: TableIndex,
3357        writer: u32,
3358        _async_: bool,
3359    ) -> Result<ReturnCode> {
3360        let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3361            self.state_table(ty).get_mut_by_index(writer)?
3362        else {
3363            bail!("invalid stream or future handle");
3364        };
3365        let id = TableId::<TransmitHandle>::new(rep);
3366        log::trace!("guest cancel write {id:?} (handle {writer})");
3367        match state {
3368            StreamFutureState::Write { .. } => {
3369                bail!("stream or future write cancelled when no write is pending")
3370            }
3371            StreamFutureState::Read { .. } => {
3372                bail!("passed read end to `{{stream|future}}.cancel-write`")
3373            }
3374            StreamFutureState::Busy => {
3375                *state = StreamFutureState::Write { done: false };
3376            }
3377        }
3378        let rep = self.get(id)?.state.rep();
3379        self.host_cancel_write(rep)
3380    }
3381
3382    /// Cancel a pending read for the specified stream or future from the guest.
3383    fn guest_cancel_read(
3384        &mut self,
3385        ty: TableIndex,
3386        reader: u32,
3387        _async_: bool,
3388    ) -> Result<ReturnCode> {
3389        let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) =
3390            self.state_table(ty).get_mut_by_index(reader)?
3391        else {
3392            bail!("invalid stream or future handle");
3393        };
3394        let id = TableId::<TransmitHandle>::new(rep);
3395        log::trace!("guest cancel read {id:?} (handle {reader})");
3396        match state {
3397            StreamFutureState::Read { .. } => {
3398                bail!("stream or future read cancelled when no read is pending")
3399            }
3400            StreamFutureState::Write { .. } => {
3401                bail!("passed write end to `{{stream|future}}.cancel-read`")
3402            }
3403            StreamFutureState::Busy => {
3404                *state = StreamFutureState::Read { done: false };
3405            }
3406        }
3407        let rep = self.get(id)?.state.rep();
3408        self.host_cancel_read(rep)
3409    }
3410
3411    /// Drop the specified error context.
3412    pub(crate) fn error_context_drop(
3413        &mut self,
3414        ty: TypeComponentLocalErrorContextTableIndex,
3415        error_context: u32,
3416    ) -> Result<()> {
3417        let local_state_table = self
3418            .error_context_tables
3419            .get_mut(ty)
3420            .context("error context table index present in (sub)component table during drop")?;
3421
3422        // Reduce the local (sub)component ref count, removing tracking if necessary
3423        let (rep, local_ref_removed) = {
3424            let (rep, LocalErrorContextRefCount(local_ref_count)) =
3425                local_state_table.get_mut_by_index(error_context)?;
3426            assert!(*local_ref_count > 0);
3427            *local_ref_count -= 1;
3428            let mut local_ref_removed = false;
3429            if *local_ref_count == 0 {
3430                local_ref_removed = true;
3431                local_state_table
3432                    .remove_by_index(error_context)
3433                    .context("removing error context from component-local tracking")?;
3434            }
3435            (rep, local_ref_removed)
3436        };
3437
3438        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3439
3440        let GlobalErrorContextRefCount(global_ref_count) = self
3441            .global_error_context_ref_counts
3442            .get_mut(&global_ref_count_idx)
3443            .expect("retrieve concurrent state for error context during drop");
3444
3445        // Reduce the component-global ref count, removing tracking if necessary
3446        assert!(*global_ref_count >= 1);
3447        *global_ref_count -= 1;
3448        if *global_ref_count == 0 {
3449            assert!(local_ref_removed);
3450
3451            self.global_error_context_ref_counts
3452                .remove(&global_ref_count_idx);
3453
3454            self.delete(TableId::<ErrorContextState>::new(rep))
3455                .context("deleting component-global error context data")?;
3456        }
3457
3458        Ok(())
3459    }
3460
3461    /// Transfer ownership of the specified stream or future read end from one
3462    /// guest to another.
3463    fn guest_transfer<U: PartialEq + Eq + std::fmt::Debug>(
3464        &mut self,
3465        src_idx: u32,
3466        src: U,
3467        src_instance: RuntimeComponentInstanceIndex,
3468        dst: U,
3469        dst_instance: RuntimeComponentInstanceIndex,
3470        match_state: impl Fn(&mut WaitableState) -> Result<(U, &mut StreamFutureState)>,
3471        make_state: impl Fn(U, StreamFutureState) -> WaitableState,
3472    ) -> Result<u32> {
3473        let src_table = &mut self.waitable_tables[src_instance];
3474        let (_rep, src_state) = src_table.get_mut_by_index(src_idx)?;
3475        let (src_ty, _) = match_state(src_state)?;
3476        if src_ty != src {
3477            bail!("invalid future handle");
3478        }
3479
3480        let src_table = &mut self.waitable_tables[src_instance];
3481        let (rep, src_state) = src_table.get_mut_by_index(src_idx)?;
3482        let (_, src_state) = match_state(src_state)?;
3483
3484        match src_state {
3485            StreamFutureState::Read { done: true } => {
3486                bail!("cannot lift stream after being notified that the writable end dropped")
3487            }
3488            StreamFutureState::Read { done: false } => {
3489                src_table.remove_by_index(src_idx)?;
3490
3491                let dst_table = &mut self.waitable_tables[dst_instance];
3492                dst_table.insert(
3493                    rep,
3494                    make_state(dst, StreamFutureState::Read { done: false }),
3495                )
3496            }
3497            StreamFutureState::Write { .. } => {
3498                bail!("cannot transfer write end of stream or future")
3499            }
3500            StreamFutureState::Busy => bail!("cannot transfer busy stream or future"),
3501        }
3502    }
3503
3504    /// Implements the `future.new` intrinsic.
3505    pub(crate) fn future_new(&mut self, ty: TypeFutureTableIndex) -> Result<ResourcePair> {
3506        self.guest_new(TableIndex::Future(ty))
3507    }
3508
3509    /// Implements the `future.cancel-write` intrinsic.
3510    pub(crate) fn future_cancel_write(
3511        &mut self,
3512        ty: TypeFutureTableIndex,
3513        async_: bool,
3514        writer: u32,
3515    ) -> Result<u32> {
3516        self.guest_cancel_write(TableIndex::Future(ty), writer, async_)
3517            .map(|result| result.encode())
3518    }
3519
3520    /// Implements the `future.cancel-read` intrinsic.
3521    pub(crate) fn future_cancel_read(
3522        &mut self,
3523        ty: TypeFutureTableIndex,
3524        async_: bool,
3525        reader: u32,
3526    ) -> Result<u32> {
3527        self.guest_cancel_read(TableIndex::Future(ty), reader, async_)
3528            .map(|result| result.encode())
3529    }
3530
3531    /// Implements the `stream.new` intrinsic.
3532    pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result<ResourcePair> {
3533        self.guest_new(TableIndex::Stream(ty))
3534    }
3535
3536    /// Implements the `stream.cancel-write` intrinsic.
3537    pub(crate) fn stream_cancel_write(
3538        &mut self,
3539        ty: TypeStreamTableIndex,
3540        async_: bool,
3541        writer: u32,
3542    ) -> Result<u32> {
3543        self.guest_cancel_write(TableIndex::Stream(ty), writer, async_)
3544            .map(|result| result.encode())
3545    }
3546
3547    /// Implements the `stream.cancel-read` intrinsic.
3548    pub(crate) fn stream_cancel_read(
3549        &mut self,
3550        ty: TypeStreamTableIndex,
3551        async_: bool,
3552        reader: u32,
3553    ) -> Result<u32> {
3554        self.guest_cancel_read(TableIndex::Stream(ty), reader, async_)
3555            .map(|result| result.encode())
3556    }
3557
3558    /// Transfer ownership of the specified future read end from one guest to
3559    /// another.
3560    pub(crate) fn future_transfer(
3561        &mut self,
3562        src_idx: u32,
3563        src: TypeFutureTableIndex,
3564        dst: TypeFutureTableIndex,
3565    ) -> Result<u32> {
3566        self.guest_transfer(
3567            src_idx,
3568            src,
3569            self.component.types()[src].instance,
3570            dst,
3571            self.component.types()[dst].instance,
3572            |state| {
3573                if let WaitableState::Future(ty, state) = state {
3574                    Ok((*ty, state))
3575                } else {
3576                    Err(anyhow!("invalid future handle"))
3577                }
3578            },
3579            WaitableState::Future,
3580        )
3581    }
3582
3583    /// Transfer ownership of the specified stream read end from one guest to
3584    /// another.
3585    pub(crate) fn stream_transfer(
3586        &mut self,
3587        src_idx: u32,
3588        src: TypeStreamTableIndex,
3589        dst: TypeStreamTableIndex,
3590    ) -> Result<u32> {
3591        self.guest_transfer(
3592            src_idx,
3593            src,
3594            self.component.types()[src].instance,
3595            dst,
3596            self.component.types()[dst].instance,
3597            |state| {
3598                if let WaitableState::Stream(ty, state) = state {
3599                    Ok((*ty, state))
3600                } else {
3601                    Err(anyhow!("invalid stream handle"))
3602                }
3603            },
3604            WaitableState::Stream,
3605        )
3606    }
3607
3608    /// Copy the specified error context from one component to another.
3609    pub(crate) fn error_context_transfer(
3610        &mut self,
3611        src_idx: u32,
3612        src: TypeComponentLocalErrorContextTableIndex,
3613        dst: TypeComponentLocalErrorContextTableIndex,
3614    ) -> Result<u32> {
3615        let (rep, _) = {
3616            let rep = self
3617                .error_context_tables
3618                .get_mut(src)
3619                .context("error context table index present in (sub)component lookup")?
3620                .get_mut_by_index(src_idx)?;
3621            rep
3622        };
3623        let dst = self
3624            .error_context_tables
3625            .get_mut(dst)
3626            .context("error context table index present in (sub)component lookup")?;
3627
3628        // Update the component local for the destination
3629        let updated_count = if let Some((dst_idx, count)) = dst.get_mut_by_rep(rep) {
3630            (*count).0 += 1;
3631            dst_idx
3632        } else {
3633            dst.insert(rep, LocalErrorContextRefCount(1))?
3634        };
3635
3636        // Update the global (cross-subcomponent) count for error contexts
3637        // as the new component has essentially created a new reference that will
3638        // be dropped/handled independently
3639        let global_ref_count = self
3640            .global_error_context_ref_counts
3641            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
3642            .context("global ref count present for existing (sub)component error context")?;
3643        global_ref_count.0 += 1;
3644
3645        Ok(updated_count)
3646    }
3647}
3648
3649pub(crate) struct ResourcePair {
3650    pub(crate) write: u32,
3651    pub(crate) read: u32,
3652}