wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext, Options};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Error, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use core::fmt;
15use core::future;
16use core::iter;
17use core::marker::PhantomData;
18use core::mem::{self, MaybeUninit};
19use core::pin::Pin;
20use core::task::{Context, Poll, Waker, ready};
21use futures::channel::oneshot;
22use futures::{FutureExt as _, stream};
23use std::boxed::Box;
24use std::io::Cursor;
25use std::string::{String, ToString};
26use std::sync::{Arc, Mutex};
27use std::vec::Vec;
28use wasmtime_environ::component::{
29    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
30    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
31    TypeFutureTableIndex, TypeStreamTableIndex,
32};
33
34pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
35
36mod buffers;
37
38/// Enum for distinguishing between a stream or future in functions that handle
39/// both.
40#[derive(Copy, Clone, Debug)]
41pub enum TransmitKind {
42    Stream,
43    Future,
44}
45
46/// Represents `{stream,future}.{read,write}` results.
47#[derive(Copy, Clone, Debug, PartialEq)]
48pub enum ReturnCode {
49    Blocked,
50    Completed(u32),
51    Dropped(u32),
52    Cancelled(u32),
53}
54
55impl ReturnCode {
56    /// Pack `self` into a single 32-bit integer that may be returned to the
57    /// guest.
58    ///
59    /// This corresponds to `pack_copy_result` in the Component Model spec.
60    pub fn encode(&self) -> u32 {
61        const BLOCKED: u32 = 0xffff_ffff;
62        const COMPLETED: u32 = 0x0;
63        const DROPPED: u32 = 0x1;
64        const CANCELLED: u32 = 0x2;
65        match self {
66            ReturnCode::Blocked => BLOCKED,
67            ReturnCode::Completed(n) => {
68                debug_assert!(*n < (1 << 28));
69                (n << 4) | COMPLETED
70            }
71            ReturnCode::Dropped(n) => {
72                debug_assert!(*n < (1 << 28));
73                (n << 4) | DROPPED
74            }
75            ReturnCode::Cancelled(n) => {
76                debug_assert!(*n < (1 << 28));
77                (n << 4) | CANCELLED
78            }
79        }
80    }
81
82    /// Returns `Self::Completed` with the specified count (or zero if
83    /// `matches!(kind, TransmitKind::Future)`)
84    fn completed(kind: TransmitKind, count: u32) -> Self {
85        Self::Completed(if let TransmitKind::Future = kind {
86            0
87        } else {
88            count
89        })
90    }
91}
92
93/// Represents a stream or future type index.
94///
95/// This is useful as a parameter type for functions which operate on either a
96/// future or a stream.
97#[derive(Copy, Clone, Debug)]
98pub enum TransmitIndex {
99    Stream(TypeStreamTableIndex),
100    Future(TypeFutureTableIndex),
101}
102
103impl TransmitIndex {
104    pub fn kind(&self) -> TransmitKind {
105        match self {
106            TransmitIndex::Stream(_) => TransmitKind::Stream,
107            TransmitIndex::Future(_) => TransmitKind::Future,
108        }
109    }
110}
111
112/// Retrieve the payload type of the specified stream or future, or `None` if it
113/// has no payload type.
114fn payload(ty: TransmitIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
115    match ty {
116        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
117        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
118    }
119}
120
121/// Retrieve the host rep and state for the specified guest-visible waitable
122/// handle.
123fn get_mut_by_index_from(
124    handle_table: &mut HandleTable,
125    ty: TransmitIndex,
126    index: u32,
127) -> Result<(u32, &mut TransmitLocalState)> {
128    match ty {
129        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
130        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
131    }
132}
133
134fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
135    mut store: StoreContextMut<U>,
136    instance: Instance,
137    options: &Options,
138    ty: TransmitIndex,
139    address: usize,
140    count: usize,
141    buffer: &mut B,
142) -> Result<()> {
143    let types = instance.id().get(store.0).component().types().clone();
144    let count = buffer.remaining().len().min(count);
145
146    let lower = &mut if T::MAY_REQUIRE_REALLOC {
147        LowerContext::new
148    } else {
149        LowerContext::new_without_realloc
150    }(store.as_context_mut(), options, &types, instance);
151
152    if address % usize::try_from(T::ALIGN32)? != 0 {
153        bail!("read pointer not aligned");
154    }
155    lower
156        .as_slice_mut()
157        .get_mut(address..)
158        .and_then(|b| b.get_mut(..T::SIZE32 * count))
159        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
160
161    if let Some(ty) = payload(ty, &types) {
162        T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
163    }
164
165    buffer.skip(count);
166
167    Ok(())
168}
169
170fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
171    lift: &mut LiftContext<'_>,
172    ty: Option<InterfaceType>,
173    buffer: &mut B,
174    address: usize,
175    count: usize,
176) -> Result<()> {
177    let count = count.min(buffer.remaining_capacity());
178    if T::IS_RUST_UNIT_TYPE {
179        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
180        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
181        // is a valid way to populate the zero-sized buffer.
182        buffer.extend(
183            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
184        )
185    } else {
186        let ty = ty.unwrap();
187        if address % usize::try_from(T::ALIGN32)? != 0 {
188            bail!("write pointer not aligned");
189        }
190        lift.memory()
191            .get(address..)
192            .and_then(|b| b.get(..T::SIZE32 * count))
193            .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
194
195        let list = &WasmList::new(address, count, lift, ty)?;
196        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
197    }
198    Ok(())
199}
200
201/// Represents the state associated with an error context
202#[derive(Debug, PartialEq, Eq, PartialOrd)]
203pub(super) struct ErrorContextState {
204    /// Debug message associated with the error context
205    pub(crate) debug_msg: String,
206}
207
208/// Represents the size and alignment for a "flat" Component Model type,
209/// i.e. one containing no pointers or handles.
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub(super) struct FlatAbi {
212    pub(super) size: u32,
213    pub(super) align: u32,
214}
215
216/// Represents the buffer for a host- or guest-initiated stream read.
217pub struct Destination<'a, T, B> {
218    instance: Instance,
219    id: TableId<TransmitState>,
220    buffer: &'a mut B,
221    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
222    _phantom: PhantomData<fn() -> T>,
223}
224
225impl<'a, T, B> Destination<'a, T, B> {
226    /// Reborrow `self` so it can be used again later.
227    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
228        Destination {
229            instance: self.instance,
230            id: self.id,
231            buffer: &mut *self.buffer,
232            host_buffer: self.host_buffer.as_deref_mut(),
233            _phantom: PhantomData,
234        }
235    }
236
237    /// Take the buffer out of `self`, leaving a default-initialized one in its
238    /// place.
239    ///
240    /// This can be useful for reusing the previously-stored buffer's capacity
241    /// instead of allocating a fresh one.
242    pub fn take_buffer(&mut self) -> B
243    where
244        B: Default,
245    {
246        mem::take(self.buffer)
247    }
248
249    /// Store the specified buffer in `self`.
250    ///
251    /// Any items contained in the buffer will be delivered to the reader after
252    /// the `StreamProducer::poll_produce` call to which this `Destination` was
253    /// passed returns (unless overwritten by another call to `set_buffer`).
254    ///
255    /// If items are stored via this buffer _and_ written via a
256    /// `DirectDestination` view of `self`, then the items in the buffer will be
257    /// delivered after the ones written using `DirectDestination`.
258    pub fn set_buffer(&mut self, buffer: B) {
259        *self.buffer = buffer;
260    }
261
262    /// Return the remaining number of items the current read has capacity to
263    /// accept, if known.
264    ///
265    /// This will return `Some(_)` if the reader is a guest; it will return
266    /// `None` if the reader is the host.
267    ///
268    /// Note that, if this returns `None(0)`, the producer must still attempt to
269    /// produce at least one item if the value of `finish` passed to
270    /// `StreamProducer::poll_produce` is false.  In that case, the reader is
271    /// effectively asking when the producer will be able to produce items
272    /// without blocking (or reach a terminal state such as end-of-stream),
273    /// meaning the next non-zero read must complete without blocking.
274    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
275        let transmit = self
276            .instance
277            .concurrent_state_mut(store.as_context_mut().0)
278            .get_mut(self.id)
279            .unwrap();
280
281        if let &ReadState::GuestReady { count, .. } = &transmit.read {
282            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
283                unreachable!()
284            };
285
286            Some(count - guest_offset)
287        } else {
288            None
289        }
290    }
291}
292
293impl<'a, B> Destination<'a, u8, B> {
294    /// Return a `DirectDestination` view of `self`.
295    ///
296    /// If the reader is a guest, this will provide direct access to the guest's
297    /// read buffer.  If the reader is a host, this will provide access to a
298    /// buffer which will be delivered to the host before any items stored using
299    /// `Destination::set_buffer`.
300    ///
301    /// `capacity` will only be used if the reader is a host, in which case it
302    /// will update the length of the buffer, possibly zero-initializing the new
303    /// elements if the new length is larger than the old length.
304    pub fn as_direct<D>(
305        mut self,
306        store: StoreContextMut<'a, D>,
307        capacity: usize,
308    ) -> DirectDestination<'a, D> {
309        if let Some(buffer) = self.host_buffer.as_deref_mut() {
310            buffer.set_position(0);
311            if buffer.get_mut().is_empty() {
312                buffer.get_mut().resize(capacity, 0);
313            }
314        }
315
316        DirectDestination {
317            instance: self.instance,
318            id: self.id,
319            host_buffer: self.host_buffer,
320            store,
321        }
322    }
323}
324
325/// Represents a read from a `stream<u8>`, providing direct access to the
326/// writer's buffer.
327pub struct DirectDestination<'a, D: 'static> {
328    instance: Instance,
329    id: TableId<TransmitState>,
330    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
331    store: StoreContextMut<'a, D>,
332}
333
334impl<D: 'static> DirectDestination<'_, D> {
335    /// Provide direct access to the writer's buffer.
336    pub fn remaining(&mut self) -> &mut [u8] {
337        if let Some(buffer) = self.host_buffer.as_deref_mut() {
338            buffer.get_mut()
339        } else {
340            let transmit = self
341                .instance
342                .concurrent_state_mut(self.store.as_context_mut().0)
343                .get_mut(self.id)
344                .unwrap();
345
346            let &ReadState::GuestReady {
347                address,
348                count,
349                options,
350                ..
351            } = &transmit.read
352            else {
353                unreachable!();
354            };
355
356            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
357                unreachable!()
358            };
359
360            options
361                .memory_mut(self.store.0)
362                .get_mut((address + guest_offset)..)
363                .and_then(|b| b.get_mut(..(count - guest_offset)))
364                .unwrap()
365        }
366    }
367
368    /// Mark the specified number of bytes as written to the writer's buffer.
369    ///
370    /// This will panic if the count is larger than the size of the
371    /// buffer returned by `Self::remaining`.
372    pub fn mark_written(&mut self, count: usize) {
373        if let Some(buffer) = self.host_buffer.as_deref_mut() {
374            buffer.set_position(
375                buffer
376                    .position()
377                    .checked_add(u64::try_from(count).unwrap())
378                    .unwrap(),
379            );
380        } else {
381            let transmit = self
382                .instance
383                .concurrent_state_mut(self.store.as_context_mut().0)
384                .get_mut(self.id)
385                .unwrap();
386
387            let ReadState::GuestReady {
388                count: read_count, ..
389            } = &transmit.read
390            else {
391                unreachable!();
392            };
393
394            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
395                unreachable!()
396            };
397
398            if *guest_offset + count > *read_count {
399                panic!(
400                    "write count ({count}) must be less than or equal to read count ({read_count})"
401                )
402            } else {
403                *guest_offset += count;
404            }
405        }
406    }
407}
408
409/// Represents the state of a `Stream{Producer,Consumer}`.
410#[derive(Copy, Clone, Debug)]
411pub enum StreamResult {
412    /// The operation completed normally, and the producer or consumer may be
413    /// able to produce or consume more items, respectively.
414    Completed,
415    /// The operation was interrupted (i.e. it wrapped up early after receiving
416    /// a `finish` parameter value of true in a call to `poll_produce` or
417    /// `poll_consume`), and the producer or consumer may be able to produce or
418    /// consume more items, respectively.
419    Cancelled,
420    /// The operation completed normally, but the producer or consumer will
421    /// _not_ able to produce or consume more items, respectively.
422    Dropped,
423}
424
425/// Represents the host-owned write end of a stream.
426pub trait StreamProducer<D>: Send + 'static {
427    /// The payload type of this stream.
428    type Item;
429
430    /// The `WriteBuffer` type to use when delivering items.
431    type Buffer: WriteBuffer<Self::Item> + Default;
432
433    /// Handle a host- or guest-initiated read by delivering zero or more items
434    /// to the specified destination.
435    ///
436    /// This will be called whenever the reader starts a read.
437    ///
438    /// If the implementation is able to produce one or more items immediately,
439    /// it should write them to `destination` and return either
440    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
441    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
442    /// any more items.
443    ///
444    /// If the implementation is unable to produce any items immediately, but
445    /// expects to do so later, and `finish` is _false_, it should store the
446    /// waker from `cx` for later and return `Poll::Pending` without writing
447    /// anything to `destination`.  Later, it should alert the waker when either
448    /// the items arrive, the stream has ended, or an error occurs.
449    ///
450    /// If the implementation is unable to produce any items immediately, but
451    /// expects to do so later, and `finish` is _true_, it should, if possible,
452    /// return `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without
453    /// writing anything to `destination`.  However, that might not be possible
454    /// if an earlier call to `poll_produce` kicked off an asynchronous
455    /// operation which needs to be completed (and possibly interrupted)
456    /// gracefully, in which case the implementation may return `Poll::Pending`
457    /// and later alert the waker as described above.  In other words, when
458    /// `finish` is true, the implementation should prioritize returning a
459    /// result to the reader (even if no items can be produced) rather than wait
460    /// indefinitely for at least one item to arrive.
461    ///
462    /// In all of the above cases, the implementation may alternatively choose
463    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
464    /// the guest (if any) to trap and render the component instance (if any)
465    /// unusable.  The implementation should report errors that _are_
466    /// recoverable by other means (e.g. by writing to a `future`) and return
467    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
468    ///
469    /// Note that the implementation should never return `Poll::Pending` after
470    /// writing one or more items to `destination`; if it does, the caller will
471    /// trap as if `Err(_)` was returned.  Conversely, it should only return
472    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without writing any items to
473    /// `destination` if called with `finish` set to true.  If it does so when
474    /// `finish` is false, the caller will trap.  Additionally, it should only
475    /// return `Poll::Ready(Ok(StreamResult::Completed))` after writing at least
476    /// one item to `destination` if it has capacity to accept that item;
477    /// otherwise, the caller will trap.
478    ///
479    /// If more items are written to `destination` than the reader has immediate
480    /// capacity to accept, they will be retained in memory by the caller and
481    /// used to satisfy future reads, in which case `poll_produce` will only be
482    /// called again once all those items have been delivered.
483    ///
484    /// If this function is called with zero capacity
485    /// (i.e. `Destination::remaining` returns `Some(0)`), the implementation
486    /// should either:
487    ///
488    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
489    /// anything if it expects to be able to produce items immediately
490    /// (i.e. without first returning `Poll::Pending`) the next time
491    /// `poll_produce` is called with non-zero capacity _or_ if that cannot be
492    /// reliably determined.
493    ///
494    /// - Return `Poll::Pending` if the next call to `poll_produce` with
495    /// non-zero capacity is likely to also return `Poll::Pending`.
496    ///
497    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
498    /// `Destination::set_buffer` with one more more items.  Note, however, that
499    /// this creates the hazard that the items will never be received by the
500    /// guest if it decides not to do another non-zero-length read before
501    /// closing the stream.  Moreover, if `Self::Item` is e.g. a `Resource<_>`,
502    /// they may end up leaking in that scenario.
503    fn poll_produce<'a>(
504        self: Pin<&mut Self>,
505        cx: &mut Context<'_>,
506        store: StoreContextMut<'a, D>,
507        destination: Destination<'a, Self::Item, Self::Buffer>,
508        finish: bool,
509    ) -> Poll<Result<StreamResult>>;
510}
511
512impl<T, D> StreamProducer<D> for iter::Empty<T>
513where
514    T: Send + Sync + 'static,
515{
516    type Item = T;
517    type Buffer = Option<Self::Item>;
518
519    fn poll_produce<'a>(
520        self: Pin<&mut Self>,
521        _: &mut Context<'_>,
522        _: StoreContextMut<'a, D>,
523        _: Destination<'a, Self::Item, Self::Buffer>,
524        _: bool,
525    ) -> Poll<Result<StreamResult>> {
526        Poll::Ready(Ok(StreamResult::Dropped))
527    }
528}
529
530impl<T, D> StreamProducer<D> for stream::Empty<T>
531where
532    T: Send + Sync + 'static,
533{
534    type Item = T;
535    type Buffer = Option<Self::Item>;
536
537    fn poll_produce<'a>(
538        self: Pin<&mut Self>,
539        _: &mut Context<'_>,
540        _: StoreContextMut<'a, D>,
541        _: Destination<'a, Self::Item, Self::Buffer>,
542        _: bool,
543    ) -> Poll<Result<StreamResult>> {
544        Poll::Ready(Ok(StreamResult::Dropped))
545    }
546}
547
548impl<T, D> StreamProducer<D> for Vec<T>
549where
550    T: Unpin + Send + Sync + 'static,
551{
552    type Item = T;
553    type Buffer = VecBuffer<T>;
554
555    fn poll_produce<'a>(
556        self: Pin<&mut Self>,
557        _: &mut Context<'_>,
558        _: StoreContextMut<'a, D>,
559        mut dst: Destination<'a, Self::Item, Self::Buffer>,
560        _: bool,
561    ) -> Poll<Result<StreamResult>> {
562        dst.set_buffer(mem::take(self.get_mut()).into());
563        Poll::Ready(Ok(StreamResult::Dropped))
564    }
565}
566
567impl<T, D> StreamProducer<D> for Box<[T]>
568where
569    T: Unpin + Send + Sync + 'static,
570{
571    type Item = T;
572    type Buffer = VecBuffer<T>;
573
574    fn poll_produce<'a>(
575        self: Pin<&mut Self>,
576        _: &mut Context<'_>,
577        _: StoreContextMut<'a, D>,
578        mut dst: Destination<'a, Self::Item, Self::Buffer>,
579        _: bool,
580    ) -> Poll<Result<StreamResult>> {
581        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
582        Poll::Ready(Ok(StreamResult::Dropped))
583    }
584}
585
586#[cfg(feature = "component-model-async-bytes")]
587impl<D> StreamProducer<D> for bytes::Bytes {
588    type Item = u8;
589    type Buffer = Cursor<Self>;
590
591    fn poll_produce<'a>(
592        mut self: Pin<&mut Self>,
593        _: &mut Context<'_>,
594        mut store: StoreContextMut<'a, D>,
595        mut dst: Destination<'a, Self::Item, Self::Buffer>,
596        _: bool,
597    ) -> Poll<Result<StreamResult>> {
598        let cap = dst.remaining(&mut store);
599        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
600            // on 0-length or host reads, buffer the bytes
601            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
602            return Poll::Ready(Ok(StreamResult::Dropped));
603        };
604        let cap = cap.into();
605        // data does not fit in destination, fill it and buffer the rest
606        dst.set_buffer(Cursor::new(self.split_off(cap)));
607        let mut dst = dst.as_direct(store, cap);
608        dst.remaining().copy_from_slice(&self);
609        dst.mark_written(cap);
610        Poll::Ready(Ok(StreamResult::Dropped))
611    }
612}
613
614#[cfg(feature = "component-model-async-bytes")]
615impl<D> StreamProducer<D> for bytes::BytesMut {
616    type Item = u8;
617    type Buffer = Cursor<Self>;
618
619    fn poll_produce<'a>(
620        mut self: Pin<&mut Self>,
621        _: &mut Context<'_>,
622        mut store: StoreContextMut<'a, D>,
623        mut dst: Destination<'a, Self::Item, Self::Buffer>,
624        _: bool,
625    ) -> Poll<Result<StreamResult>> {
626        let cap = dst.remaining(&mut store);
627        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
628            // on 0-length or host reads, buffer the bytes
629            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
630            return Poll::Ready(Ok(StreamResult::Dropped));
631        };
632        let cap = cap.into();
633        // data does not fit in destination, fill it and buffer the rest
634        dst.set_buffer(Cursor::new(self.split_off(cap)));
635        let mut dst = dst.as_direct(store, cap);
636        dst.remaining().copy_from_slice(&self);
637        dst.mark_written(cap);
638        Poll::Ready(Ok(StreamResult::Dropped))
639    }
640}
641
642/// Represents the buffer for a host- or guest-initiated stream write.
643pub struct Source<'a, T> {
644    instance: Instance,
645    id: TableId<TransmitState>,
646    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
647}
648
649impl<'a, T> Source<'a, T> {
650    /// Reborrow `self` so it can be used again later.
651    pub fn reborrow(&mut self) -> Source<'_, T> {
652        Source {
653            instance: self.instance,
654            id: self.id,
655            host_buffer: self.host_buffer.as_deref_mut(),
656        }
657    }
658
659    /// Accept zero or more items from the writer.
660    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
661    where
662        T: func::Lift + 'static,
663        B: ReadBuffer<T>,
664    {
665        if let Some(input) = &mut self.host_buffer {
666            let count = input.remaining().len().min(buffer.remaining_capacity());
667            buffer.move_from(*input, count);
668        } else {
669            let store = store.as_context_mut();
670            let transmit = self
671                .instance
672                .concurrent_state_mut(store.0)
673                .get_mut(self.id)?;
674
675            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
676                unreachable!();
677            };
678
679            let &WriteState::GuestReady {
680                ty,
681                address,
682                count,
683                options,
684                ..
685            } = &transmit.write
686            else {
687                unreachable!()
688            };
689
690            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self.instance);
691            let ty = payload(ty, cx.types);
692            let old_remaining = buffer.remaining_capacity();
693            lift::<T, B, S::Data>(
694                cx,
695                ty,
696                buffer,
697                address + (T::SIZE32 * guest_offset),
698                count - guest_offset,
699            )?;
700
701            let transmit = self
702                .instance
703                .concurrent_state_mut(store.0)
704                .get_mut(self.id)?;
705
706            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
707                unreachable!();
708            };
709
710            *guest_offset += old_remaining - buffer.remaining_capacity();
711        }
712
713        Ok(())
714    }
715
716    /// Return the number of items remaining to be read from the current write
717    /// operation.
718    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
719    where
720        T: 'static,
721    {
722        let transmit = self
723            .instance
724            .concurrent_state_mut(store.as_context_mut().0)
725            .get_mut(self.id)
726            .unwrap();
727
728        if let &WriteState::GuestReady { count, .. } = &transmit.write {
729            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
730                unreachable!()
731            };
732
733            count - guest_offset
734        } else if let Some(host_buffer) = &self.host_buffer {
735            host_buffer.remaining().len()
736        } else {
737            unreachable!()
738        }
739    }
740}
741
742impl<'a> Source<'a, u8> {
743    /// Return a `DirectSource` view of `self`.
744    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
745        DirectSource {
746            instance: self.instance,
747            id: self.id,
748            host_buffer: self.host_buffer,
749            store,
750        }
751    }
752}
753
754/// Represents a write to a `stream<u8>`, providing direct access to the
755/// writer's buffer.
756pub struct DirectSource<'a, D: 'static> {
757    instance: Instance,
758    id: TableId<TransmitState>,
759    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
760    store: StoreContextMut<'a, D>,
761}
762
763impl<D: 'static> DirectSource<'_, D> {
764    /// Provide direct access to the writer's buffer.
765    pub fn remaining(&mut self) -> &[u8] {
766        if let Some(buffer) = self.host_buffer.as_deref_mut() {
767            buffer.remaining()
768        } else {
769            let transmit = self
770                .instance
771                .concurrent_state_mut(self.store.as_context_mut().0)
772                .get_mut(self.id)
773                .unwrap();
774
775            let &WriteState::GuestReady {
776                address,
777                count,
778                options,
779                ..
780            } = &transmit.write
781            else {
782                unreachable!()
783            };
784
785            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
786                unreachable!()
787            };
788
789            options
790                .memory(self.store.0)
791                .get((address + guest_offset)..)
792                .and_then(|b| b.get(..(count - guest_offset)))
793                .unwrap()
794        }
795    }
796
797    /// Mark the specified number of bytes as read from the writer's buffer.
798    ///
799    /// This will panic if the count is larger than the size of the buffer
800    /// returned by `Self::remaining`.
801    pub fn mark_read(&mut self, count: usize) {
802        if let Some(buffer) = self.host_buffer.as_deref_mut() {
803            buffer.skip(count);
804        } else {
805            let transmit = self
806                .instance
807                .concurrent_state_mut(self.store.as_context_mut().0)
808                .get_mut(self.id)
809                .unwrap();
810
811            let WriteState::GuestReady {
812                count: write_count, ..
813            } = &transmit.write
814            else {
815                unreachable!()
816            };
817
818            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
819                unreachable!()
820            };
821
822            if *guest_offset + count > *write_count {
823                panic!(
824                    "read count ({count}) must be less than or equal to write count ({write_count})"
825                )
826            } else {
827                *guest_offset += count;
828            }
829        }
830    }
831}
832
833/// Represents the host-owned read end of a stream.
834pub trait StreamConsumer<D>: Send + 'static {
835    /// The payload type of this stream.
836    type Item;
837
838    /// Handle a host- or guest-initiated write by accepting zero or more items
839    /// from the specified source.
840    ///
841    /// This will be called whenever the writer starts a write.
842    ///
843    /// If the implementation is able to consume one or more items immediately,
844    /// it should take them from `source` and return either
845    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
846    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
847    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
848    /// indicate that the caller should delay sending a `COMPLETED` event to the
849    /// writer until a later call to this function returns `Poll::Ready(_)`.
850    /// For more about that, see the `Backpressure` section below.
851    ///
852    /// If the implementation cannot consume any items immediately and `finish`
853    /// is _false_, it should store the waker from `cx` for later and return
854    /// `Poll::Pending` without writing anything to `destination`.  Later, it
855    /// should alert the waker when either (1) the items arrive, (2) the stream
856    /// has ended, or (3) an error occurs.
857    ///
858    /// If the implementation cannot consume any items immediately and `finish`
859    /// is _true_, it should, if possible, return
860    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
861    /// anything from `source`.  However, that might not be possible if an
862    /// earlier call to `poll_consume` kicked off an asynchronous operation
863    /// which needs to be completed (and possibly interrupted) gracefully, in
864    /// which case the implementation may return `Poll::Pending` and later alert
865    /// the waker as described above.  In other words, when `finish` is true,
866    /// the implementation should prioritize returning a result to the reader
867    /// (even if no items can be consumed) rather than wait indefinitely for at
868    /// capacity to free up.
869    ///
870    /// In all of the above cases, the implementation may alternatively choose
871    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
872    /// the guest (if any) to trap and render the component instance (if any)
873    /// unusable.  The implementation should report errors that _are_
874    /// recoverable by other means (e.g. by writing to a `future`) and return
875    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
876    ///
877    /// Note that the implementation should only return
878    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
879    /// items from `source` if called with `finish` set to true.  If it does so
880    /// when `finish` is false, the caller will trap.  Additionally, it should
881    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
882    /// least one item from `source` if there is an item available; otherwise,
883    /// the caller will trap.  If `poll_consume` is called with no items in
884    /// `source`, it should only return `Poll::Ready(_)` once it is able to
885    /// accept at least one item during the next call to `poll_consume`.
886    ///
887    /// Note that any items which the implementation of this trait takes from
888    /// `source` become the responsibility of that implementation.  For that
889    /// reason, an implementation which forwards items to an upstream sink
890    /// should reserve capacity in that sink before taking items out of
891    /// `source`, if possible.  Alternatively, it might buffer items which can't
892    /// be forwarded immediately and send them once capacity is freed up.
893    ///
894    /// ## Backpressure
895    ///
896    /// As mentioned above, an implementation might choose to return
897    /// `Poll::Pending` after taking items from `source`, which tells the caller
898    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
899    /// a form of backpressure when the items are forwarded to an upstream sink
900    /// asynchronously.  Note, however, that it's not possible to "put back"
901    /// items into `source` once they've been taken out, so if the upstream sink
902    /// is unable to accept all the items, that cannot be communicated to the
903    /// writer at this level of abstraction.  Just as with application-specific,
904    /// recoverable errors, information about which items could be forwarded and
905    /// which could not must be communicated out-of-band, e.g. by writing to an
906    /// application-specific `future`.
907    ///
908    /// Similarly, if the writer cancels the write after items have been taken
909    /// from `source` but before the items have all been forwarded to an
910    /// upstream sink, `poll_consume` will be called with `finish` set to true,
911    /// and the implementation may either:
912    ///
913    /// - Interrupt the forwarding process gracefully.  This may be preferable
914    /// if there is an out-of-band channel for communicating to the writer how
915    /// many items were forwarded before being interrupted.
916    ///
917    /// - Allow the forwarding to complete without interrupting it.  This is
918    /// usually preferable if there's no out-of-band channel for reporting back
919    /// to the writer how many items were forwarded.
920    fn poll_consume(
921        self: Pin<&mut Self>,
922        cx: &mut Context<'_>,
923        store: StoreContextMut<D>,
924        source: Source<'_, Self::Item>,
925        finish: bool,
926    ) -> Poll<Result<StreamResult>>;
927}
928
929/// Represents a host-owned write end of a future.
930pub trait FutureProducer<D>: Send + 'static {
931    /// The payload type of this future.
932    type Item;
933
934    /// Handle a host- or guest-initiated read by producing a value.
935    ///
936    /// This is equivalent to `StreamProducer::poll_produce`, but with a
937    /// simplified interface for futures.
938    ///
939    /// If `finish` is true, the implementation may return
940    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
941    /// could produce a value.  Otherwise, it must either return
942    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
943    fn poll_produce(
944        self: Pin<&mut Self>,
945        cx: &mut Context<'_>,
946        store: StoreContextMut<D>,
947        finish: bool,
948    ) -> Poll<Result<Option<Self::Item>>>;
949}
950
951impl<T, E, D, Fut> FutureProducer<D> for Fut
952where
953    E: Into<Error>,
954    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
955{
956    type Item = T;
957
958    fn poll_produce<'a>(
959        self: Pin<&mut Self>,
960        cx: &mut Context<'_>,
961        _: StoreContextMut<'a, D>,
962        finish: bool,
963    ) -> Poll<Result<Option<T>>> {
964        match self.poll(cx) {
965            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
966            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
967            Poll::Pending if finish => Poll::Ready(Ok(None)),
968            Poll::Pending => Poll::Pending,
969        }
970    }
971}
972
973/// Represents a host-owned read end of a future.
974pub trait FutureConsumer<D>: Send + 'static {
975    /// The payload type of this future.
976    type Item;
977
978    /// Handle a host- or guest-initiated write by consuming a value.
979    ///
980    /// This is equivalent to `StreamProducer::poll_produce`, but with a
981    /// simplified interface for futures.
982    ///
983    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
984    /// without taking the item from `source`, which indicates the operation was
985    /// canceled before it could consume the value.  Otherwise, it must either
986    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
987    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
988    /// the item).
989    fn poll_consume(
990        self: Pin<&mut Self>,
991        cx: &mut Context<'_>,
992        store: StoreContextMut<D>,
993        source: Source<'_, Self::Item>,
994        finish: bool,
995    ) -> Poll<Result<()>>;
996}
997
998/// Represents the readable end of a Component Model `future`.
999///
1000/// Note that `FutureReader` instances must be disposed of using either `pipe`
1001/// or `close`; otherwise the in-store representation will leak and the writer
1002/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1003/// ensure that disposal happens automatically.
1004pub struct FutureReader<T> {
1005    instance: Instance,
1006    id: TableId<TransmitHandle>,
1007    _phantom: PhantomData<T>,
1008}
1009
1010impl<T> FutureReader<T> {
1011    /// Create a new future with the specified producer.
1012    pub fn new<S: AsContextMut>(
1013        instance: Instance,
1014        mut store: S,
1015        producer: impl FutureProducer<S::Data, Item = T>,
1016    ) -> Self
1017    where
1018        T: func::Lower + func::Lift + Send + Sync + 'static,
1019    {
1020        struct Producer<P>(P);
1021
1022        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1023            for Producer<P>
1024        {
1025            type Item = P::Item;
1026            type Buffer = Option<P::Item>;
1027
1028            fn poll_produce<'a>(
1029                self: Pin<&mut Self>,
1030                cx: &mut Context<'_>,
1031                store: StoreContextMut<D>,
1032                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1033                finish: bool,
1034            ) -> Poll<Result<StreamResult>> {
1035                // SAFETY: This is a standard pin-projection, and we never move
1036                // out of `self`.
1037                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1038
1039                Poll::Ready(Ok(
1040                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1041                        destination.set_buffer(Some(value));
1042
1043                        // Here we return `StreamResult::Completed` even though
1044                        // we've produced the last item we'll ever produce.
1045                        // That's because the ABI expects
1046                        // `ReturnCode::Completed(1)` rather than
1047                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1048                        // called again since the future will have resolved.
1049                        StreamResult::Completed
1050                    } else {
1051                        StreamResult::Cancelled
1052                    },
1053                ))
1054            }
1055        }
1056
1057        Self::new_(
1058            instance.new_transmit(
1059                store.as_context_mut(),
1060                TransmitKind::Future,
1061                Producer(producer),
1062            ),
1063            instance,
1064        )
1065    }
1066
1067    fn new_(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1068        Self {
1069            instance,
1070            id,
1071            _phantom: PhantomData,
1072        }
1073    }
1074
1075    /// Set the consumer that accepts the result of this future.
1076    pub fn pipe<S: AsContextMut>(
1077        self,
1078        store: S,
1079        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1080    ) where
1081        T: func::Lift + 'static,
1082    {
1083        struct Consumer<C>(C);
1084
1085        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1086            for Consumer<C>
1087        {
1088            type Item = T;
1089
1090            fn poll_consume(
1091                self: Pin<&mut Self>,
1092                cx: &mut Context<'_>,
1093                mut store: StoreContextMut<D>,
1094                mut source: Source<Self::Item>,
1095                finish: bool,
1096            ) -> Poll<Result<StreamResult>> {
1097                // SAFETY: This is a standard pin-projection, and we never move
1098                // out of `self`.
1099                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1100
1101                ready!(consumer.poll_consume(
1102                    cx,
1103                    store.as_context_mut(),
1104                    source.reborrow(),
1105                    finish
1106                ))?;
1107
1108                Poll::Ready(Ok(if source.remaining(store) == 0 {
1109                    // Here we return `StreamResult::Completed` even though
1110                    // we've consumed the last item we'll ever consume.  That's
1111                    // because the ABI expects `ReturnCode::Completed(1)` rather
1112                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1113                    // called again since the future will have resolved.
1114                    StreamResult::Completed
1115                } else {
1116                    StreamResult::Cancelled
1117                }))
1118            }
1119        }
1120
1121        self.instance
1122            .set_consumer(store, self.id, TransmitKind::Future, Consumer(consumer));
1123    }
1124
1125    /// Convert this `FutureReader` into a [`Val`].
1126    // See TODO comment for `FutureAny`; this is prone to handle leakage.
1127    pub fn into_val(self) -> Val {
1128        Val::Future(FutureAny(self.id.rep()))
1129    }
1130
1131    /// Attempt to convert the specified [`Val`] to a `FutureReader`.
1132    pub fn from_val(
1133        mut store: impl AsContextMut<Data: Send>,
1134        instance: Instance,
1135        value: &Val,
1136    ) -> Result<Self> {
1137        let Val::Future(FutureAny(rep)) = value else {
1138            bail!("expected `future`; got `{}`", value.desc());
1139        };
1140        let store = store.as_context_mut();
1141        let id = TableId::<TransmitHandle>::new(*rep);
1142        instance.concurrent_state_mut(store.0).get_mut(id)?; // Just make sure it's present
1143        Ok(Self::new_(id, instance))
1144    }
1145
1146    /// Transfer ownership of the read end of a future from a guest to the host.
1147    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1148        match ty {
1149            InterfaceType::Future(src) => {
1150                let handle_table = cx
1151                    .instance_mut()
1152                    .table_for_transmit(TransmitIndex::Future(src));
1153                let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1154                if is_done {
1155                    bail!("cannot lift future after being notified that the writable end dropped");
1156                }
1157                let id = TableId::<TransmitHandle>::new(rep);
1158                let concurrent_state = cx.instance_mut().concurrent_state_mut();
1159                let future = concurrent_state.get_mut(id)?;
1160                future.common.handle = None;
1161                let state = future.state;
1162
1163                if concurrent_state.get_mut(state)?.done {
1164                    bail!("cannot lift future after previous read succeeded");
1165                }
1166
1167                Ok(Self::new_(id, cx.instance_handle()))
1168            }
1169            _ => func::bad_type_info(),
1170        }
1171    }
1172
1173    /// Close this `FutureReader`, writing the default value.
1174    ///
1175    /// # Panics
1176    ///
1177    /// Panics if the store that the [`Accessor`] is derived from does not own
1178    /// this future. Usage of this future after calling `close` will also cause
1179    /// a panic.
1180    pub fn close(&mut self, mut store: impl AsContextMut) {
1181        // `self` should never be used again, but leave an invalid handle there just in case.
1182        let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1183        self.instance
1184            .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Future)
1185            .unwrap();
1186    }
1187
1188    /// Convenience method around [`Self::close`].
1189    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1190        accessor.as_accessor().with(|access| self.close(access))
1191    }
1192
1193    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1194    /// drop and clean it up from the store.
1195    ///
1196    /// Note that the `accessor` provided must own this future and is
1197    /// additionally transferred to the `GuardedFutureReader` return value.
1198    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1199    where
1200        A: AsAccessor,
1201    {
1202        GuardedFutureReader::new(accessor, self)
1203    }
1204}
1205
1206impl<T> fmt::Debug for FutureReader<T> {
1207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1208        f.debug_struct("FutureReader")
1209            .field("id", &self.id)
1210            .field("instance", &self.instance)
1211            .finish()
1212    }
1213}
1214
1215/// Transfer ownership of the read end of a future from the host to a guest.
1216pub(crate) fn lower_future_to_index<U>(
1217    rep: u32,
1218    cx: &mut LowerContext<'_, U>,
1219    ty: InterfaceType,
1220) -> Result<u32> {
1221    match ty {
1222        InterfaceType::Future(dst) => {
1223            let concurrent_state = cx.instance_mut().concurrent_state_mut();
1224            let id = TableId::<TransmitHandle>::new(rep);
1225            let state = concurrent_state.get_mut(id)?.state;
1226            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1227
1228            let handle = cx
1229                .instance_mut()
1230                .table_for_transmit(TransmitIndex::Future(dst))
1231                .future_insert_read(dst, rep)?;
1232
1233            cx.instance_mut()
1234                .concurrent_state_mut()
1235                .get_mut(id)?
1236                .common
1237                .handle = Some(handle);
1238
1239            Ok(handle)
1240        }
1241        _ => func::bad_type_info(),
1242    }
1243}
1244
1245// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1246// safe and correct since we lift and lower future handles as `u32`s.
1247unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1248    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1249
1250    type Lower = <u32 as func::ComponentType>::Lower;
1251
1252    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1253        match ty {
1254            InterfaceType::Future(_) => Ok(()),
1255            other => bail!("expected `future`, found `{}`", func::desc(other)),
1256        }
1257    }
1258}
1259
1260// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1261unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1262    fn linear_lower_to_flat<U>(
1263        &self,
1264        cx: &mut LowerContext<'_, U>,
1265        ty: InterfaceType,
1266        dst: &mut MaybeUninit<Self::Lower>,
1267    ) -> Result<()> {
1268        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1269            cx,
1270            InterfaceType::U32,
1271            dst,
1272        )
1273    }
1274
1275    fn linear_lower_to_memory<U>(
1276        &self,
1277        cx: &mut LowerContext<'_, U>,
1278        ty: InterfaceType,
1279        offset: usize,
1280    ) -> Result<()> {
1281        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1282            cx,
1283            InterfaceType::U32,
1284            offset,
1285        )
1286    }
1287}
1288
1289// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1290unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1291    fn linear_lift_from_flat(
1292        cx: &mut LiftContext<'_>,
1293        ty: InterfaceType,
1294        src: &Self::Lower,
1295    ) -> Result<Self> {
1296        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1297        Self::lift_from_index(cx, ty, index)
1298    }
1299
1300    fn linear_lift_from_memory(
1301        cx: &mut LiftContext<'_>,
1302        ty: InterfaceType,
1303        bytes: &[u8],
1304    ) -> Result<Self> {
1305        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1306        Self::lift_from_index(cx, ty, index)
1307    }
1308}
1309
1310/// A [`FutureReader`] paired with an [`Accessor`].
1311///
1312/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1313/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1314/// [`FutureReader::guard`].
1315pub struct GuardedFutureReader<T, A>
1316where
1317    A: AsAccessor,
1318{
1319    // This field is `None` to implement the conversion from this guard back to
1320    // `FutureReader`. When `None` is seen in the destructor it will cause the
1321    // destructor to do nothing.
1322    reader: Option<FutureReader<T>>,
1323    accessor: A,
1324}
1325
1326impl<T, A> GuardedFutureReader<T, A>
1327where
1328    A: AsAccessor,
1329{
1330    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1331    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1332        Self {
1333            reader: Some(reader),
1334            accessor,
1335        }
1336    }
1337
1338    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1339    /// back.
1340    pub fn into_future(self) -> FutureReader<T> {
1341        self.into()
1342    }
1343}
1344
1345impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1346where
1347    A: AsAccessor,
1348{
1349    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1350        guard.reader.take().unwrap()
1351    }
1352}
1353
1354impl<T, A> Drop for GuardedFutureReader<T, A>
1355where
1356    A: AsAccessor,
1357{
1358    fn drop(&mut self) {
1359        if let Some(reader) = &mut self.reader {
1360            reader.close_with(&self.accessor)
1361        }
1362    }
1363}
1364
1365/// Represents the readable end of a Component Model `stream`.
1366///
1367/// Note that `StreamReader` instances must be disposed of using `close`;
1368/// otherwise the in-store representation will leak and the writer end will hang
1369/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1370/// disposal happens automatically.
1371pub struct StreamReader<T> {
1372    instance: Instance,
1373    id: TableId<TransmitHandle>,
1374    _phantom: PhantomData<T>,
1375}
1376
1377impl<T> StreamReader<T> {
1378    /// Create a new stream with the specified producer.
1379    pub fn new<S: AsContextMut>(
1380        instance: Instance,
1381        store: S,
1382        producer: impl StreamProducer<S::Data, Item = T>,
1383    ) -> Self
1384    where
1385        T: func::Lower + func::Lift + Send + Sync + 'static,
1386    {
1387        Self::new_(
1388            instance.new_transmit(store, TransmitKind::Stream, producer),
1389            instance,
1390        )
1391    }
1392
1393    fn new_(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1394        Self {
1395            instance,
1396            id,
1397            _phantom: PhantomData,
1398        }
1399    }
1400
1401    /// Set the consumer that accepts the items delivered to this stream.
1402    pub fn pipe<S: AsContextMut>(self, store: S, consumer: impl StreamConsumer<S::Data, Item = T>)
1403    where
1404        T: 'static,
1405    {
1406        self.instance
1407            .set_consumer(store, self.id, TransmitKind::Stream, consumer);
1408    }
1409
1410    /// Convert this `StreamReader` into a [`Val`].
1411    // See TODO comment for `StreamAny`; this is prone to handle leakage.
1412    pub fn into_val(self) -> Val {
1413        Val::Stream(StreamAny(self.id.rep()))
1414    }
1415
1416    /// Attempt to convert the specified [`Val`] to a `StreamReader`.
1417    pub fn from_val(
1418        mut store: impl AsContextMut<Data: Send>,
1419        instance: Instance,
1420        value: &Val,
1421    ) -> Result<Self> {
1422        let Val::Stream(StreamAny(rep)) = value else {
1423            bail!("expected `stream`; got `{}`", value.desc());
1424        };
1425        let store = store.as_context_mut();
1426        let id = TableId::<TransmitHandle>::new(*rep);
1427        instance.concurrent_state_mut(store.0).get_mut(id)?; // Just make sure it's present
1428        Ok(Self::new_(id, instance))
1429    }
1430
1431    /// Transfer ownership of the read end of a stream from a guest to the host.
1432    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1433        match ty {
1434            InterfaceType::Stream(src) => {
1435                let handle_table = cx
1436                    .instance_mut()
1437                    .table_for_transmit(TransmitIndex::Stream(src));
1438                let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1439                if is_done {
1440                    bail!("cannot lift stream after being notified that the writable end dropped");
1441                }
1442                let id = TableId::<TransmitHandle>::new(rep);
1443                cx.instance_mut()
1444                    .concurrent_state_mut()
1445                    .get_mut(id)?
1446                    .common
1447                    .handle = None;
1448                Ok(Self::new_(id, cx.instance_handle()))
1449            }
1450            _ => func::bad_type_info(),
1451        }
1452    }
1453
1454    /// Close this `StreamReader`, writing the default value.
1455    ///
1456    /// # Panics
1457    ///
1458    /// Panics if the store that the [`Accessor`] is derived from does not own
1459    /// this future. Usage of this future after calling `close` will also cause
1460    /// a panic.
1461    pub fn close(&mut self, mut store: impl AsContextMut) {
1462        // `self` should never be used again, but leave an invalid handle there just in case.
1463        let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1464        self.instance
1465            .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Stream)
1466            .unwrap()
1467    }
1468
1469    /// Convenience method around [`Self::close`].
1470    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1471        accessor.as_accessor().with(|access| self.close(access))
1472    }
1473
1474    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1475    /// drop and clean it up from the store.
1476    ///
1477    /// Note that the `accessor` provided must own this future and is
1478    /// additionally transferred to the `GuardedStreamReader` return value.
1479    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1480    where
1481        A: AsAccessor,
1482    {
1483        GuardedStreamReader::new(accessor, self)
1484    }
1485}
1486
1487impl<T> fmt::Debug for StreamReader<T> {
1488    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1489        f.debug_struct("StreamReader")
1490            .field("id", &self.id)
1491            .field("instance", &self.instance)
1492            .finish()
1493    }
1494}
1495
1496/// Transfer ownership of the read end of a stream from the host to a guest.
1497pub(crate) fn lower_stream_to_index<U>(
1498    rep: u32,
1499    cx: &mut LowerContext<'_, U>,
1500    ty: InterfaceType,
1501) -> Result<u32> {
1502    match ty {
1503        InterfaceType::Stream(dst) => {
1504            let concurrent_state = cx.instance_mut().concurrent_state_mut();
1505            let id = TableId::<TransmitHandle>::new(rep);
1506            let state = concurrent_state.get_mut(id)?.state;
1507            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1508
1509            let handle = cx
1510                .instance_mut()
1511                .table_for_transmit(TransmitIndex::Stream(dst))
1512                .stream_insert_read(dst, rep)?;
1513
1514            cx.instance_mut()
1515                .concurrent_state_mut()
1516                .get_mut(id)?
1517                .common
1518                .handle = Some(handle);
1519
1520            Ok(handle)
1521        }
1522        _ => func::bad_type_info(),
1523    }
1524}
1525
1526// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1527// safe and correct since we lift and lower stream handles as `u32`s.
1528unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1529    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1530
1531    type Lower = <u32 as func::ComponentType>::Lower;
1532
1533    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1534        match ty {
1535            InterfaceType::Stream(_) => Ok(()),
1536            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1537        }
1538    }
1539}
1540
1541// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1542unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1543    fn linear_lower_to_flat<U>(
1544        &self,
1545        cx: &mut LowerContext<'_, U>,
1546        ty: InterfaceType,
1547        dst: &mut MaybeUninit<Self::Lower>,
1548    ) -> Result<()> {
1549        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1550            cx,
1551            InterfaceType::U32,
1552            dst,
1553        )
1554    }
1555
1556    fn linear_lower_to_memory<U>(
1557        &self,
1558        cx: &mut LowerContext<'_, U>,
1559        ty: InterfaceType,
1560        offset: usize,
1561    ) -> Result<()> {
1562        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1563            cx,
1564            InterfaceType::U32,
1565            offset,
1566        )
1567    }
1568}
1569
1570// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1571unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1572    fn linear_lift_from_flat(
1573        cx: &mut LiftContext<'_>,
1574        ty: InterfaceType,
1575        src: &Self::Lower,
1576    ) -> Result<Self> {
1577        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1578        Self::lift_from_index(cx, ty, index)
1579    }
1580
1581    fn linear_lift_from_memory(
1582        cx: &mut LiftContext<'_>,
1583        ty: InterfaceType,
1584        bytes: &[u8],
1585    ) -> Result<Self> {
1586        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1587        Self::lift_from_index(cx, ty, index)
1588    }
1589}
1590
1591/// A [`StreamReader`] paired with an [`Accessor`].
1592///
1593/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1594/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1595/// [`StreamReader::guard`].
1596pub struct GuardedStreamReader<T, A>
1597where
1598    A: AsAccessor,
1599{
1600    // This field is `None` to implement the conversion from this guard back to
1601    // `StreamReader`. When `None` is seen in the destructor it will cause the
1602    // destructor to do nothing.
1603    reader: Option<StreamReader<T>>,
1604    accessor: A,
1605}
1606
1607impl<T, A> GuardedStreamReader<T, A>
1608where
1609    A: AsAccessor,
1610{
1611    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1612    /// `reader`.
1613    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1614        Self {
1615            reader: Some(reader),
1616            accessor,
1617        }
1618    }
1619
1620    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1621    /// back.
1622    pub fn into_stream(self) -> StreamReader<T> {
1623        self.into()
1624    }
1625}
1626
1627impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1628where
1629    A: AsAccessor,
1630{
1631    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1632        guard.reader.take().unwrap()
1633    }
1634}
1635
1636impl<T, A> Drop for GuardedStreamReader<T, A>
1637where
1638    A: AsAccessor,
1639{
1640    fn drop(&mut self) {
1641        if let Some(reader) = &mut self.reader {
1642            reader.close_with(&self.accessor)
1643        }
1644    }
1645}
1646
1647/// Represents a Component Model `error-context`.
1648pub struct ErrorContext {
1649    rep: u32,
1650}
1651
1652impl ErrorContext {
1653    pub(crate) fn new(rep: u32) -> Self {
1654        Self { rep }
1655    }
1656
1657    /// Convert this `ErrorContext` into a [`Val`].
1658    pub fn into_val(self) -> Val {
1659        Val::ErrorContext(ErrorContextAny(self.rep))
1660    }
1661
1662    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1663    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1664        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1665            bail!("expected `error-context`; got `{}`", value.desc());
1666        };
1667        Ok(Self::new(*rep))
1668    }
1669
1670    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1671        match ty {
1672            InterfaceType::ErrorContext(src) => {
1673                let rep = cx
1674                    .instance_mut()
1675                    .table_for_error_context(src)
1676                    .error_context_rep(index)?;
1677
1678                Ok(Self { rep })
1679            }
1680            _ => func::bad_type_info(),
1681        }
1682    }
1683}
1684
1685pub(crate) fn lower_error_context_to_index<U>(
1686    rep: u32,
1687    cx: &mut LowerContext<'_, U>,
1688    ty: InterfaceType,
1689) -> Result<u32> {
1690    match ty {
1691        InterfaceType::ErrorContext(dst) => {
1692            let tbl = cx.instance_mut().table_for_error_context(dst);
1693            tbl.error_context_insert(rep)
1694        }
1695        _ => func::bad_type_info(),
1696    }
1697}
1698// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1699// safe and correct since we lift and lower future handles as `u32`s.
1700unsafe impl func::ComponentType for ErrorContext {
1701    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1702
1703    type Lower = <u32 as func::ComponentType>::Lower;
1704
1705    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1706        match ty {
1707            InterfaceType::ErrorContext(_) => Ok(()),
1708            other => bail!("expected `error`, found `{}`", func::desc(other)),
1709        }
1710    }
1711}
1712
1713// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1714unsafe impl func::Lower for ErrorContext {
1715    fn linear_lower_to_flat<T>(
1716        &self,
1717        cx: &mut LowerContext<'_, T>,
1718        ty: InterfaceType,
1719        dst: &mut MaybeUninit<Self::Lower>,
1720    ) -> Result<()> {
1721        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1722            cx,
1723            InterfaceType::U32,
1724            dst,
1725        )
1726    }
1727
1728    fn linear_lower_to_memory<T>(
1729        &self,
1730        cx: &mut LowerContext<'_, T>,
1731        ty: InterfaceType,
1732        offset: usize,
1733    ) -> Result<()> {
1734        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1735            cx,
1736            InterfaceType::U32,
1737            offset,
1738        )
1739    }
1740}
1741
1742// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1743unsafe impl func::Lift for ErrorContext {
1744    fn linear_lift_from_flat(
1745        cx: &mut LiftContext<'_>,
1746        ty: InterfaceType,
1747        src: &Self::Lower,
1748    ) -> Result<Self> {
1749        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1750        Self::lift_from_index(cx, ty, index)
1751    }
1752
1753    fn linear_lift_from_memory(
1754        cx: &mut LiftContext<'_>,
1755        ty: InterfaceType,
1756        bytes: &[u8],
1757    ) -> Result<Self> {
1758        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1759        Self::lift_from_index(cx, ty, index)
1760    }
1761}
1762
1763/// Represents the read or write end of a stream or future.
1764pub(super) struct TransmitHandle {
1765    pub(super) common: WaitableCommon,
1766    /// See `TransmitState`
1767    state: TableId<TransmitState>,
1768}
1769
1770impl TransmitHandle {
1771    fn new(state: TableId<TransmitState>) -> Self {
1772        Self {
1773            common: WaitableCommon::default(),
1774            state,
1775        }
1776    }
1777}
1778
1779impl TableDebug for TransmitHandle {
1780    fn type_name() -> &'static str {
1781        "TransmitHandle"
1782    }
1783}
1784
1785/// Represents the state of a stream or future.
1786struct TransmitState {
1787    /// The write end of the stream or future.
1788    write_handle: TableId<TransmitHandle>,
1789    /// The read end of the stream or future.
1790    read_handle: TableId<TransmitHandle>,
1791    /// See `WriteState`
1792    write: WriteState,
1793    /// See `ReadState`
1794    read: ReadState,
1795    /// Whether futher values may be transmitted via this stream or future.
1796    done: bool,
1797}
1798
1799impl Default for TransmitState {
1800    fn default() -> Self {
1801        Self {
1802            write_handle: TableId::new(u32::MAX),
1803            read_handle: TableId::new(u32::MAX),
1804            read: ReadState::Open,
1805            write: WriteState::Open,
1806            done: false,
1807        }
1808    }
1809}
1810
1811impl TableDebug for TransmitState {
1812    fn type_name() -> &'static str {
1813        "TransmitState"
1814    }
1815}
1816
1817type PollStream = Box<
1818    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
1819>;
1820
1821/// Represents the state of the write end of a stream or future.
1822enum WriteState {
1823    /// The write end is open, but no write is pending.
1824    Open,
1825    /// The write end is owned by a guest task and a write is pending.
1826    GuestReady {
1827        ty: TransmitIndex,
1828        flat_abi: Option<FlatAbi>,
1829        options: Options,
1830        address: usize,
1831        count: usize,
1832        handle: u32,
1833    },
1834    /// The write end is owned by the host, which is ready to produce items.
1835    HostReady {
1836        produce: PollStream,
1837        guest_offset: usize,
1838        cancel: bool,
1839        cancel_waker: Option<Waker>,
1840    },
1841    /// The write end has been dropped.
1842    Dropped,
1843}
1844
1845impl fmt::Debug for WriteState {
1846    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1847        match self {
1848            Self::Open => f.debug_tuple("Open").finish(),
1849            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1850            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1851            Self::Dropped => f.debug_tuple("Dropped").finish(),
1852        }
1853    }
1854}
1855
1856/// Represents the state of the read end of a stream or future.
1857enum ReadState {
1858    /// The read end is open, but no read is pending.
1859    Open,
1860    /// The read end is owned by a guest task and a read is pending.
1861    GuestReady {
1862        ty: TransmitIndex,
1863        flat_abi: Option<FlatAbi>,
1864        options: Options,
1865        address: usize,
1866        count: usize,
1867        handle: u32,
1868    },
1869    /// The read end is owned by a host task, and it is ready to consume items.
1870    HostReady {
1871        consume: PollStream,
1872        guest_offset: usize,
1873        cancel: bool,
1874        cancel_waker: Option<Waker>,
1875    },
1876    /// Both the read and write ends are owned by the host.
1877    HostToHost {
1878        accept: Box<
1879            dyn for<'a> Fn(
1880                    &'a mut UntypedWriteBuffer<'a>,
1881                )
1882                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1883                + Send
1884                + Sync,
1885        >,
1886        buffer: Vec<u8>,
1887        limit: usize,
1888    },
1889    /// The read end has been dropped.
1890    Dropped,
1891}
1892
1893impl fmt::Debug for ReadState {
1894    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1895        match self {
1896            Self::Open => f.debug_tuple("Open").finish(),
1897            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1898            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1899            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1900            Self::Dropped => f.debug_tuple("Dropped").finish(),
1901        }
1902    }
1903}
1904
1905fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
1906    let count = guest_offset.try_into().unwrap();
1907    match state {
1908        StreamResult::Dropped => ReturnCode::Dropped(count),
1909        StreamResult::Completed => ReturnCode::completed(kind, count),
1910        StreamResult::Cancelled => ReturnCode::Cancelled(count),
1911    }
1912}
1913
1914impl Instance {
1915    fn new_transmit<S: AsContextMut, P: StreamProducer<S::Data>>(
1916        self,
1917        mut store: S,
1918        kind: TransmitKind,
1919        producer: P,
1920    ) -> TableId<TransmitHandle>
1921    where
1922        P::Item: func::Lower,
1923    {
1924        let mut store = store.as_context_mut();
1925        let token = StoreToken::new(store.as_context_mut());
1926        let state = self.concurrent_state_mut(store.0);
1927        let (_, read) = state.new_transmit().unwrap();
1928        let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
1929        let id = state.get_mut(read).unwrap().state;
1930        let produce = Box::new(move || {
1931            let producer = producer.clone();
1932            async move {
1933                let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
1934
1935                let (result, cancelled) = if buffer.remaining().is_empty() {
1936                    future::poll_fn(|cx| {
1937                        tls::get(|store| {
1938                            let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1939
1940                            let &WriteState::HostReady { cancel, .. } = &transmit.write else {
1941                                unreachable!();
1942                            };
1943
1944                            let mut host_buffer =
1945                                if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
1946                                    Some(Cursor::new(mem::take(buffer)))
1947                                } else {
1948                                    None
1949                                };
1950
1951                            let poll = mine.as_mut().poll_produce(
1952                                cx,
1953                                token.as_context_mut(store),
1954                                Destination {
1955                                    instance: self,
1956                                    id,
1957                                    buffer: &mut buffer,
1958                                    host_buffer: host_buffer.as_mut(),
1959                                    _phantom: PhantomData,
1960                                },
1961                                cancel,
1962                            );
1963
1964                            let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1965
1966                            let host_offset = if let (
1967                                Some(host_buffer),
1968                                ReadState::HostToHost { buffer, limit, .. },
1969                            ) = (host_buffer, &mut transmit.read)
1970                            {
1971                                *limit = usize::try_from(host_buffer.position()).unwrap();
1972                                *buffer = host_buffer.into_inner();
1973                                *limit
1974                            } else {
1975                                0
1976                            };
1977
1978                            {
1979                                let WriteState::HostReady {
1980                                    guest_offset,
1981                                    cancel,
1982                                    cancel_waker,
1983                                    ..
1984                                } = &mut transmit.write
1985                                else {
1986                                    unreachable!();
1987                                };
1988
1989                                if poll.is_pending() {
1990                                    if !buffer.remaining().is_empty()
1991                                        || *guest_offset > 0
1992                                        || host_offset > 0
1993                                    {
1994                                        return Poll::Ready(Err(anyhow!(
1995                                            "StreamProducer::poll_produce returned Poll::Pending \
1996                                             after producing at least one item"
1997                                        )));
1998                                    }
1999                                    *cancel_waker = Some(cx.waker().clone());
2000                                } else {
2001                                    *cancel_waker = None;
2002                                    *cancel = false;
2003                                }
2004                            }
2005
2006                            poll.map(|v| v.map(|result| (result, cancel)))
2007                        })
2008                    })
2009                    .await?
2010                } else {
2011                    (StreamResult::Completed, false)
2012                };
2013
2014                let (guest_offset, host_offset, count) = tls::get(|store| {
2015                    let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
2016                    let (count, host_offset) = match &transmit.read {
2017                        &ReadState::GuestReady { count, .. } => (count, 0),
2018                        &ReadState::HostToHost { limit, .. } => (1, limit),
2019                        _ => unreachable!(),
2020                    };
2021                    let guest_offset = match &transmit.write {
2022                        &WriteState::HostReady { guest_offset, .. } => guest_offset,
2023                        _ => unreachable!(),
2024                    };
2025                    (guest_offset, host_offset, count)
2026                });
2027
2028                match result {
2029                    StreamResult::Completed => {
2030                        if count > 1
2031                            && buffer.remaining().is_empty()
2032                            && guest_offset == 0
2033                            && host_offset == 0
2034                        {
2035                            bail!(
2036                                "StreamProducer::poll_produce returned StreamResult::Completed \
2037                                 without producing any items"
2038                            );
2039                        }
2040                    }
2041                    StreamResult::Cancelled => {
2042                        if !cancelled {
2043                            bail!(
2044                                "StreamProducer::poll_produce returned StreamResult::Cancelled \
2045                                 without being given a `finish` parameter value of true"
2046                            );
2047                        }
2048                    }
2049                    StreamResult::Dropped => {}
2050                }
2051
2052                let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2053
2054                *producer.lock().unwrap() = Some((mine, buffer));
2055
2056                if write_buffer {
2057                    self.write(token, id, producer, kind).await?;
2058                }
2059
2060                Ok(result)
2061            }
2062            .boxed()
2063        });
2064        state.get_mut(id).unwrap().write = WriteState::HostReady {
2065            produce,
2066            guest_offset: 0,
2067            cancel: false,
2068            cancel_waker: None,
2069        };
2070        read
2071    }
2072
2073    fn set_consumer<S: AsContextMut, C: StreamConsumer<S::Data>>(
2074        self,
2075        mut store: S,
2076        id: TableId<TransmitHandle>,
2077        kind: TransmitKind,
2078        consumer: C,
2079    ) {
2080        let mut store = store.as_context_mut();
2081        let token = StoreToken::new(store.as_context_mut());
2082        let state = self.concurrent_state_mut(store.0);
2083        let id = state.get_mut(id).unwrap().state;
2084        let transmit = state.get_mut(id).unwrap();
2085        let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2086        let consume_with_buffer = {
2087            let consumer = consumer.clone();
2088            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2089                let mut mine = consumer.lock().unwrap().take().unwrap();
2090
2091                let host_buffer_remaining_before =
2092                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2093
2094                let (result, cancelled) = future::poll_fn(|cx| {
2095                    tls::get(|store| {
2096                        let cancel =
2097                            match &self.concurrent_state_mut(store).get_mut(id).unwrap().read {
2098                                &ReadState::HostReady { cancel, .. } => cancel,
2099                                ReadState::Open => false,
2100                                _ => unreachable!(),
2101                            };
2102
2103                        let poll = mine.as_mut().poll_consume(
2104                            cx,
2105                            token.as_context_mut(store),
2106                            Source {
2107                                instance: self,
2108                                id,
2109                                host_buffer: host_buffer.as_deref_mut(),
2110                            },
2111                            cancel,
2112                        );
2113
2114                        if let ReadState::HostReady {
2115                            cancel_waker,
2116                            cancel,
2117                            ..
2118                        } = &mut self.concurrent_state_mut(store).get_mut(id).unwrap().read
2119                        {
2120                            if poll.is_pending() {
2121                                *cancel_waker = Some(cx.waker().clone());
2122                            } else {
2123                                *cancel_waker = None;
2124                                *cancel = false;
2125                            }
2126                        }
2127
2128                        poll.map(|v| v.map(|result| (result, cancel)))
2129                    })
2130                })
2131                .await?;
2132
2133                let (guest_offset, count) = tls::get(|store| {
2134                    let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
2135                    (
2136                        match &transmit.read {
2137                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2138                            ReadState::Open => 0,
2139                            _ => unreachable!(),
2140                        },
2141                        match &transmit.write {
2142                            &WriteState::GuestReady { count, .. } => count,
2143                            WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2144                            _ => unreachable!(),
2145                        },
2146                    )
2147                });
2148
2149                match result {
2150                    StreamResult::Completed => {
2151                        if count > 0
2152                            && guest_offset == 0
2153                            && host_buffer_remaining_before
2154                                .zip(host_buffer.map(|v| v.remaining().len()))
2155                                .map(|(before, after)| before == after)
2156                                .unwrap_or(false)
2157                        {
2158                            bail!(
2159                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2160                                 without consuming any items"
2161                            );
2162                        }
2163
2164                        if let TransmitKind::Future = kind {
2165                            tls::get(|store| {
2166                                self.concurrent_state_mut(store).get_mut(id).unwrap().done = true;
2167                            });
2168                        }
2169                    }
2170                    StreamResult::Cancelled => {
2171                        if !cancelled {
2172                            bail!(
2173                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2174                                 without being given a `finish` parameter value of true"
2175                            );
2176                        }
2177                    }
2178                    StreamResult::Dropped => {}
2179                }
2180
2181                *consumer.lock().unwrap() = Some(mine);
2182
2183                Ok(result)
2184            }
2185        };
2186        let consume = {
2187            let consume = consume_with_buffer.clone();
2188            Box::new(move || {
2189                let consume = consume.clone();
2190                async move { consume(None).await }.boxed()
2191            })
2192        };
2193
2194        match &transmit.write {
2195            WriteState::Open => {
2196                transmit.read = ReadState::HostReady {
2197                    consume,
2198                    guest_offset: 0,
2199                    cancel: false,
2200                    cancel_waker: None,
2201                };
2202            }
2203            WriteState::GuestReady { .. } => {
2204                let future = consume();
2205                transmit.read = ReadState::HostReady {
2206                    consume,
2207                    guest_offset: 0,
2208                    cancel: false,
2209                    cancel_waker: None,
2210                };
2211                self.pipe_from_guest(store.0, kind, id, future);
2212            }
2213            WriteState::HostReady { .. } => {
2214                let WriteState::HostReady { produce, .. } = mem::replace(
2215                    &mut transmit.write,
2216                    WriteState::HostReady {
2217                        produce: Box::new(|| unreachable!()),
2218                        guest_offset: 0,
2219                        cancel: false,
2220                        cancel_waker: None,
2221                    },
2222                ) else {
2223                    unreachable!();
2224                };
2225
2226                transmit.read = ReadState::HostToHost {
2227                    accept: Box::new(move |input| {
2228                        let consume = consume_with_buffer.clone();
2229                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2230                    }),
2231                    buffer: Vec::new(),
2232                    limit: 0,
2233                };
2234
2235                let future = async move {
2236                    loop {
2237                        if tls::get(|store| {
2238                            anyhow::Ok(matches!(
2239                                self.concurrent_state_mut(store).get_mut(id)?.read,
2240                                ReadState::Dropped
2241                            ))
2242                        })? {
2243                            break Ok(());
2244                        }
2245
2246                        match produce().await? {
2247                            StreamResult::Completed | StreamResult::Cancelled => {}
2248                            StreamResult::Dropped => break Ok(()),
2249                        }
2250
2251                        if let TransmitKind::Future = kind {
2252                            break Ok(());
2253                        }
2254                    }
2255                }
2256                .map(move |result| {
2257                    tls::get(|store| self.concurrent_state_mut(store).delete_transmit(id))?;
2258                    result
2259                });
2260
2261                state.push_future(Box::pin(future));
2262            }
2263            WriteState::Dropped => {
2264                let reader = transmit.read_handle;
2265                self.host_drop_reader(store.0, reader, kind).unwrap();
2266            }
2267        }
2268    }
2269
2270    async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2271        self,
2272        token: StoreToken<D>,
2273        id: TableId<TransmitState>,
2274        pair: Arc<Mutex<Option<(P, B)>>>,
2275        kind: TransmitKind,
2276    ) -> Result<()> {
2277        let (read, guest_offset) = tls::get(|store| {
2278            let transmit = self.concurrent_state_mut(store).get_mut(id)?;
2279
2280            let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write
2281            {
2282                Some(guest_offset)
2283            } else {
2284                None
2285            };
2286
2287            anyhow::Ok((
2288                mem::replace(&mut transmit.read, ReadState::Open),
2289                guest_offset,
2290            ))
2291        })?;
2292
2293        match read {
2294            ReadState::GuestReady {
2295                ty,
2296                flat_abi,
2297                options,
2298                address,
2299                count,
2300                handle,
2301            } => {
2302                let guest_offset = guest_offset.unwrap();
2303
2304                if let TransmitKind::Future = kind {
2305                    tls::get(|store| {
2306                        self.concurrent_state_mut(store).get_mut(id)?.done = true;
2307                        anyhow::Ok(())
2308                    })?;
2309                }
2310
2311                let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2312                let accept = {
2313                    let pair = pair.clone();
2314                    move |mut store: StoreContextMut<D>| {
2315                        lower::<T, B, D>(
2316                            store.as_context_mut(),
2317                            self,
2318                            &options,
2319                            ty,
2320                            address + (T::SIZE32 * guest_offset),
2321                            count - guest_offset,
2322                            &mut pair.lock().unwrap().as_mut().unwrap().1,
2323                        )?;
2324                        anyhow::Ok(())
2325                    }
2326                };
2327
2328                if guest_offset < count {
2329                    if T::MAY_REQUIRE_REALLOC {
2330                        // For payloads which may require a realloc call, use a
2331                        // oneshot::channel and background task.  This is
2332                        // necessary because calling the guest while there are
2333                        // host embedder frames on the stack is unsound.
2334                        let (tx, rx) = oneshot::channel();
2335                        tls::get(move |store| {
2336                            self.concurrent_state_mut(store).push_high_priority(
2337                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2338                                    move |store, _| {
2339                                        _ = tx.send(accept(token.as_context_mut(store))?);
2340                                        Ok(())
2341                                    },
2342                                ))),
2343                            )
2344                        });
2345                        rx.await?
2346                    } else {
2347                        // Optimize flat payloads (i.e. those which do not
2348                        // require calling the guest's realloc function) by
2349                        // lowering directly instead of using a oneshot::channel
2350                        // and background task.
2351                        tls::get(|store| accept(token.as_context_mut(store)))?
2352                    };
2353                }
2354
2355                tls::get(|store| {
2356                    let count =
2357                        old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2358
2359                    let transmit = self.concurrent_state_mut(store).get_mut(id)?;
2360
2361                    let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2362                        unreachable!();
2363                    };
2364
2365                    *guest_offset += count;
2366
2367                    transmit.read = ReadState::GuestReady {
2368                        ty,
2369                        flat_abi,
2370                        options,
2371                        address,
2372                        count,
2373                        handle,
2374                    };
2375
2376                    anyhow::Ok(())
2377                })?;
2378
2379                Ok(())
2380            }
2381
2382            ReadState::HostToHost {
2383                accept,
2384                mut buffer,
2385                limit,
2386            } => {
2387                let mut state = StreamResult::Completed;
2388                let mut position = 0;
2389
2390                while !matches!(state, StreamResult::Dropped) && position < limit {
2391                    let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2392                    state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2393                    (buffer, position, _) = slice_buffer.into_parts();
2394                }
2395
2396                {
2397                    let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2398
2399                    while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty())
2400                    {
2401                        state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2402                    }
2403
2404                    *pair.lock().unwrap() = Some((mine, buffer));
2405                }
2406
2407                tls::get(|store| {
2408                    self.concurrent_state_mut(store).get_mut(id)?.read = match state {
2409                        StreamResult::Dropped => ReadState::Dropped,
2410                        StreamResult::Completed | StreamResult::Cancelled => {
2411                            ReadState::HostToHost {
2412                                accept,
2413                                buffer,
2414                                limit: 0,
2415                            }
2416                        }
2417                    };
2418
2419                    anyhow::Ok(())
2420                })?;
2421                Ok(())
2422            }
2423
2424            _ => unreachable!(),
2425        }
2426    }
2427
2428    fn pipe_from_guest(
2429        self,
2430        store: &mut dyn VMStore,
2431        kind: TransmitKind,
2432        id: TableId<TransmitState>,
2433        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2434    ) {
2435        let future = async move {
2436            let stream_state = future.await?;
2437            tls::get(|store| {
2438                let state = self.concurrent_state_mut(store);
2439                let transmit = state.get_mut(id)?;
2440                let ReadState::HostReady {
2441                    consume,
2442                    guest_offset,
2443                    ..
2444                } = mem::replace(&mut transmit.read, ReadState::Open)
2445                else {
2446                    unreachable!();
2447                };
2448                let code = return_code(kind, stream_state, guest_offset);
2449                transmit.read = match stream_state {
2450                    StreamResult::Dropped => ReadState::Dropped,
2451                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2452                        consume,
2453                        guest_offset: 0,
2454                        cancel: false,
2455                        cancel_waker: None,
2456                    },
2457                };
2458                let WriteState::GuestReady { ty, handle, .. } =
2459                    mem::replace(&mut transmit.write, WriteState::Open)
2460                else {
2461                    unreachable!();
2462                };
2463                state.send_write_result(ty, id, handle, code)?;
2464                Ok(())
2465            })
2466        };
2467
2468        self.concurrent_state_mut(store).push_future(future.boxed());
2469    }
2470
2471    fn pipe_to_guest(
2472        self,
2473        store: &mut dyn VMStore,
2474        kind: TransmitKind,
2475        id: TableId<TransmitState>,
2476        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2477    ) {
2478        let future = async move {
2479            let stream_state = future.await?;
2480            tls::get(|store| {
2481                let state = self.concurrent_state_mut(store);
2482                let transmit = state.get_mut(id)?;
2483                let WriteState::HostReady {
2484                    produce,
2485                    guest_offset,
2486                    ..
2487                } = mem::replace(&mut transmit.write, WriteState::Open)
2488                else {
2489                    unreachable!();
2490                };
2491                let code = return_code(kind, stream_state, guest_offset);
2492                transmit.write = match stream_state {
2493                    StreamResult::Dropped => WriteState::Dropped,
2494                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2495                        produce,
2496                        guest_offset: 0,
2497                        cancel: false,
2498                        cancel_waker: None,
2499                    },
2500                };
2501                let ReadState::GuestReady { ty, handle, .. } =
2502                    mem::replace(&mut transmit.read, ReadState::Open)
2503                else {
2504                    unreachable!();
2505                };
2506                state.send_read_result(ty, id, handle, code)?;
2507                Ok(())
2508            })
2509        };
2510
2511        self.concurrent_state_mut(store).push_future(future.boxed());
2512    }
2513
2514    /// Drop the read end of a stream or future read from the host.
2515    fn host_drop_reader(
2516        self,
2517        store: &mut dyn VMStore,
2518        id: TableId<TransmitHandle>,
2519        kind: TransmitKind,
2520    ) -> Result<()> {
2521        let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
2522        let state = self.concurrent_state_mut(store);
2523        let transmit = state
2524            .get_mut(transmit_id)
2525            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2526        log::trace!(
2527            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2528            transmit.read,
2529            transmit.write
2530        );
2531
2532        transmit.read = ReadState::Dropped;
2533
2534        // If the write end is already dropped, it should stay dropped,
2535        // otherwise, it should be opened.
2536        let new_state = if let WriteState::Dropped = &transmit.write {
2537            WriteState::Dropped
2538        } else {
2539            WriteState::Open
2540        };
2541
2542        let write_handle = transmit.write_handle;
2543
2544        match mem::replace(&mut transmit.write, new_state) {
2545            // If a guest is waiting to write, notify it that the read end has
2546            // been dropped.
2547            WriteState::GuestReady { ty, handle, .. } => {
2548                state.update_event(
2549                    write_handle.rep(),
2550                    match ty {
2551                        TransmitIndex::Future(ty) => Event::FutureWrite {
2552                            code: ReturnCode::Dropped(0),
2553                            pending: Some((ty, handle)),
2554                        },
2555                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2556                            code: ReturnCode::Dropped(0),
2557                            pending: Some((ty, handle)),
2558                        },
2559                    },
2560                )?;
2561            }
2562
2563            WriteState::HostReady { .. } => {}
2564
2565            WriteState::Open => {
2566                state.update_event(
2567                    write_handle.rep(),
2568                    match kind {
2569                        TransmitKind::Future => Event::FutureWrite {
2570                            code: ReturnCode::Dropped(0),
2571                            pending: None,
2572                        },
2573                        TransmitKind::Stream => Event::StreamWrite {
2574                            code: ReturnCode::Dropped(0),
2575                            pending: None,
2576                        },
2577                    },
2578                )?;
2579            }
2580
2581            WriteState::Dropped => {
2582                log::trace!("host_drop_reader delete {transmit_id:?}");
2583                state.delete_transmit(transmit_id)?;
2584            }
2585        }
2586        Ok(())
2587    }
2588
2589    /// Drop the write end of a stream or future read from the host.
2590    fn host_drop_writer<U>(
2591        self,
2592        store: StoreContextMut<U>,
2593        id: TableId<TransmitHandle>,
2594        on_drop_open: Option<fn() -> Result<()>>,
2595    ) -> Result<()> {
2596        let transmit_id = self.concurrent_state_mut(store.0).get_mut(id)?.state;
2597        let transmit = self
2598            .concurrent_state_mut(store.0)
2599            .get_mut(transmit_id)
2600            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2601        log::trace!(
2602            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2603            transmit.read,
2604            transmit.write
2605        );
2606
2607        // Existing queued transmits must be updated with information for the impending writer closure
2608        match &mut transmit.write {
2609            WriteState::GuestReady { .. } => {
2610                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2611            }
2612            WriteState::HostReady { .. } => {}
2613            v @ WriteState::Open => {
2614                if let (Some(on_drop_open), false) = (
2615                    on_drop_open,
2616                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2617                ) {
2618                    on_drop_open()?;
2619                } else {
2620                    *v = WriteState::Dropped;
2621                }
2622            }
2623            WriteState::Dropped => unreachable!("write state is already dropped"),
2624        }
2625
2626        let transmit = self.concurrent_state_mut(store.0).get_mut(transmit_id)?;
2627
2628        // If the existing read state is dropped, then there's nothing to read
2629        // and we can keep it that way.
2630        //
2631        // If the read state was any other state, then we must set the new state to open
2632        // to indicate that there *is* data to be read
2633        let new_state = if let ReadState::Dropped = &transmit.read {
2634            ReadState::Dropped
2635        } else {
2636            ReadState::Open
2637        };
2638
2639        let read_handle = transmit.read_handle;
2640
2641        // Swap in the new read state
2642        match mem::replace(&mut transmit.read, new_state) {
2643            // If the guest was ready to read, then we cannot drop the reader (or writer);
2644            // we must deliver the event, and update the state associated with the handle to
2645            // represent that a read must be performed
2646            ReadState::GuestReady { ty, handle, .. } => {
2647                // Ensure the final read of the guest is queued, with appropriate closure indicator
2648                self.concurrent_state_mut(store.0).update_event(
2649                    read_handle.rep(),
2650                    match ty {
2651                        TransmitIndex::Future(ty) => Event::FutureRead {
2652                            code: ReturnCode::Dropped(0),
2653                            pending: Some((ty, handle)),
2654                        },
2655                        TransmitIndex::Stream(ty) => Event::StreamRead {
2656                            code: ReturnCode::Dropped(0),
2657                            pending: Some((ty, handle)),
2658                        },
2659                    },
2660                )?;
2661            }
2662
2663            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2664
2665            // If the read state is open, then there are no registered readers of the stream/future
2666            ReadState::Open => {
2667                self.concurrent_state_mut(store.0).update_event(
2668                    read_handle.rep(),
2669                    match on_drop_open {
2670                        Some(_) => Event::FutureRead {
2671                            code: ReturnCode::Dropped(0),
2672                            pending: None,
2673                        },
2674                        None => Event::StreamRead {
2675                            code: ReturnCode::Dropped(0),
2676                            pending: None,
2677                        },
2678                    },
2679                )?;
2680            }
2681
2682            // If the read state was already dropped, then we can remove the transmit state completely
2683            // (both writer and reader have been dropped)
2684            ReadState::Dropped => {
2685                log::trace!("host_drop_writer delete {transmit_id:?}");
2686                self.concurrent_state_mut(store.0)
2687                    .delete_transmit(transmit_id)?;
2688            }
2689        }
2690        Ok(())
2691    }
2692
2693    /// Drop the writable end of the specified stream or future from the guest.
2694    pub(super) fn guest_drop_writable<T>(
2695        self,
2696        store: StoreContextMut<T>,
2697        ty: TransmitIndex,
2698        writer: u32,
2699    ) -> Result<()> {
2700        let table = self.id().get_mut(store.0).table_for_transmit(ty);
2701        let transmit_rep = match ty {
2702            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2703            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2704        };
2705
2706        let id = TableId::<TransmitHandle>::new(transmit_rep);
2707        log::trace!("guest_drop_writable: drop writer {id:?}");
2708        match ty {
2709            TransmitIndex::Stream(_) => self.host_drop_writer(store, id, None),
2710            TransmitIndex::Future(_) => self.host_drop_writer(
2711                store,
2712                id,
2713                Some(|| {
2714                    Err(anyhow!(
2715                        "cannot drop future write end without first writing a value"
2716                    ))
2717                }),
2718            ),
2719        }
2720    }
2721
2722    /// Copy `count` items from `read_address` to `write_address` for the
2723    /// specified stream or future.
2724    fn copy<T: 'static>(
2725        self,
2726        mut store: StoreContextMut<T>,
2727        flat_abi: Option<FlatAbi>,
2728        write_ty: TransmitIndex,
2729        write_options: &Options,
2730        write_address: usize,
2731        read_ty: TransmitIndex,
2732        read_options: &Options,
2733        read_address: usize,
2734        count: usize,
2735        rep: u32,
2736    ) -> Result<()> {
2737        let types = self.id().get(store.0).component().types().clone();
2738        match (write_ty, read_ty) {
2739            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2740                assert_eq!(count, 1);
2741
2742                let val = types[types[write_ty].ty]
2743                    .payload
2744                    .map(|ty| {
2745                        let abi = types.canonical_abi(&ty);
2746                        // FIXME: needs to read an i64 for memory64
2747                        if write_address % usize::try_from(abi.align32)? != 0 {
2748                            bail!("write pointer not aligned");
2749                        }
2750
2751                        let lift =
2752                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2753                        let bytes = lift
2754                            .memory()
2755                            .get(write_address..)
2756                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2757                            .ok_or_else(|| {
2758                                anyhow::anyhow!("write pointer out of bounds of memory")
2759                            })?;
2760
2761                        Val::load(lift, ty, bytes)
2762                    })
2763                    .transpose()?;
2764
2765                if let Some(val) = val {
2766                    let lower =
2767                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2768                    let ty = types[types[read_ty].ty].payload.unwrap();
2769                    let ptr = func::validate_inbounds_dynamic(
2770                        types.canonical_abi(&ty),
2771                        lower.as_slice_mut(),
2772                        &ValRaw::u32(read_address.try_into().unwrap()),
2773                    )?;
2774                    val.store(lower, ty, ptr)?;
2775                }
2776            }
2777            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2778                if let Some(flat_abi) = flat_abi {
2779                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
2780                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2781                    if length_in_bytes > 0 {
2782                        if write_address % usize::try_from(flat_abi.align)? != 0 {
2783                            bail!("write pointer not aligned");
2784                        }
2785                        if read_address % usize::try_from(flat_abi.align)? != 0 {
2786                            bail!("read pointer not aligned");
2787                        }
2788
2789                        let store_opaque = store.0.store_opaque_mut();
2790
2791                        {
2792                            let src = write_options
2793                                .memory(store_opaque)
2794                                .get(write_address..)
2795                                .and_then(|b| b.get(..length_in_bytes))
2796                                .ok_or_else(|| {
2797                                    anyhow::anyhow!("write pointer out of bounds of memory")
2798                                })?
2799                                .as_ptr();
2800                            let dst = read_options
2801                                .memory_mut(store_opaque)
2802                                .get_mut(read_address..)
2803                                .and_then(|b| b.get_mut(..length_in_bytes))
2804                                .ok_or_else(|| {
2805                                    anyhow::anyhow!("read pointer out of bounds of memory")
2806                                })?
2807                                .as_mut_ptr();
2808                            // SAFETY: Both `src` and `dst` have been validated
2809                            // above.
2810                            unsafe { src.copy_to(dst, length_in_bytes) };
2811                        }
2812                    }
2813                } else {
2814                    let store_opaque = store.0.store_opaque_mut();
2815                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
2816                    let ty = types[types[write_ty].ty].payload.unwrap();
2817                    let abi = lift.types.canonical_abi(&ty);
2818                    let size = usize::try_from(abi.size32).unwrap();
2819                    if write_address % usize::try_from(abi.align32)? != 0 {
2820                        bail!("write pointer not aligned");
2821                    }
2822                    let bytes = lift
2823                        .memory()
2824                        .get(write_address..)
2825                        .and_then(|b| b.get(..size * count))
2826                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2827
2828                    let values = (0..count)
2829                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2830                        .collect::<Result<Vec<_>>>()?;
2831
2832                    let id = TableId::<TransmitHandle>::new(rep);
2833                    log::trace!("copy values {values:?} for {id:?}");
2834
2835                    let lower =
2836                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2837                    let ty = types[types[read_ty].ty].payload.unwrap();
2838                    let abi = lower.types.canonical_abi(&ty);
2839                    if read_address % usize::try_from(abi.align32)? != 0 {
2840                        bail!("read pointer not aligned");
2841                    }
2842                    let size = usize::try_from(abi.size32).unwrap();
2843                    lower
2844                        .as_slice_mut()
2845                        .get_mut(read_address..)
2846                        .and_then(|b| b.get_mut(..size * count))
2847                        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2848                    let mut ptr = read_address;
2849                    for value in values {
2850                        value.store(lower, ty, ptr)?;
2851                        ptr += size
2852                    }
2853                }
2854            }
2855            _ => unreachable!(),
2856        }
2857
2858        Ok(())
2859    }
2860
2861    fn check_bounds(
2862        self,
2863        store: &StoreOpaque,
2864        options: &Options,
2865        ty: TransmitIndex,
2866        address: usize,
2867        count: usize,
2868    ) -> Result<()> {
2869        let types = self.id().get(store).component().types().clone();
2870        let size = usize::try_from(
2871            match ty {
2872                TransmitIndex::Future(ty) => types[types[ty].ty]
2873                    .payload
2874                    .map(|ty| types.canonical_abi(&ty).size32),
2875                TransmitIndex::Stream(ty) => types[types[ty].ty]
2876                    .payload
2877                    .map(|ty| types.canonical_abi(&ty).size32),
2878            }
2879            .unwrap_or(0),
2880        )
2881        .unwrap();
2882
2883        if count > 0 && size > 0 {
2884            options
2885                .memory(store)
2886                .get(address..)
2887                .and_then(|b| b.get(..(size * count)))
2888                .map(drop)
2889                .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
2890        } else {
2891            Ok(())
2892        }
2893    }
2894
2895    /// Write to the specified stream or future from the guest.
2896    pub(super) fn guest_write<T: 'static>(
2897        self,
2898        mut store: StoreContextMut<T>,
2899        ty: TransmitIndex,
2900        options: OptionsIndex,
2901        flat_abi: Option<FlatAbi>,
2902        handle: u32,
2903        address: u32,
2904        count: u32,
2905    ) -> Result<ReturnCode> {
2906        let address = usize::try_from(address).unwrap();
2907        let count = usize::try_from(count).unwrap();
2908        let options = Options::new_index(store.0, self, options);
2909        self.check_bounds(store.0, &options, ty, address, count)?;
2910        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
2911        let TransmitLocalState::Write { done } = *state else {
2912            bail!(
2913                "invalid handle {handle}; expected `Write`; got {:?}",
2914                *state
2915            );
2916        };
2917
2918        if done {
2919            bail!("cannot write to stream after being notified that the readable end dropped");
2920        }
2921
2922        *state = TransmitLocalState::Busy;
2923        let transmit_handle = TableId::<TransmitHandle>::new(rep);
2924        let concurrent_state = self.concurrent_state_mut(store.0);
2925        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
2926        let transmit = concurrent_state.get_mut(transmit_id)?;
2927        log::trace!(
2928            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2929            transmit.read
2930        );
2931
2932        if transmit.done {
2933            bail!("cannot write to future after previous write succeeded or readable end dropped");
2934        }
2935
2936        let new_state = if let ReadState::Dropped = &transmit.read {
2937            ReadState::Dropped
2938        } else {
2939            ReadState::Open
2940        };
2941
2942        let set_guest_ready = |me: &mut ConcurrentState| {
2943            let transmit = me.get_mut(transmit_id)?;
2944            assert!(matches!(&transmit.write, WriteState::Open));
2945            transmit.write = WriteState::GuestReady {
2946                ty,
2947                flat_abi,
2948                options,
2949                address,
2950                count,
2951                handle,
2952            };
2953            Ok::<_, crate::Error>(())
2954        };
2955
2956        let mut result = match mem::replace(&mut transmit.read, new_state) {
2957            ReadState::GuestReady {
2958                ty: read_ty,
2959                flat_abi: read_flat_abi,
2960                options: read_options,
2961                address: read_address,
2962                count: read_count,
2963                handle: read_handle,
2964            } => {
2965                assert_eq!(flat_abi, read_flat_abi);
2966
2967                if let TransmitIndex::Future(_) = ty {
2968                    transmit.done = true;
2969                }
2970
2971                // Note that zero-length reads and writes are handling specially
2972                // by the spec to allow each end to signal readiness to the
2973                // other.  Quoting the spec:
2974                //
2975                // ```
2976                // The meaning of a read or write when the length is 0 is that
2977                // the caller is querying the "readiness" of the other
2978                // side. When a 0-length read/write rendezvous with a
2979                // non-0-length read/write, only the 0-length read/write
2980                // completes; the non-0-length read/write is kept pending (and
2981                // ready for a subsequent rendezvous).
2982                //
2983                // In the corner case where a 0-length read and write
2984                // rendezvous, only the writer is notified of readiness. To
2985                // avoid livelock, the Canonical ABI requires that a writer must
2986                // (eventually) follow a completed 0-length write with a
2987                // non-0-length write that is allowed to block (allowing the
2988                // reader end to run and rendezvous with its own non-0-length
2989                // read).
2990                // ```
2991
2992                let write_complete = count == 0 || read_count > 0;
2993                let read_complete = count > 0;
2994                let read_buffer_remaining = count < read_count;
2995
2996                let read_handle_rep = transmit.read_handle.rep();
2997
2998                let count = count.min(read_count);
2999
3000                self.copy(
3001                    store.as_context_mut(),
3002                    flat_abi,
3003                    ty,
3004                    &options,
3005                    address,
3006                    read_ty,
3007                    &read_options,
3008                    read_address,
3009                    count,
3010                    rep,
3011                )?;
3012
3013                let instance = self.id().get_mut(store.0);
3014                let types = instance.component().types();
3015                let item_size = payload(ty, types)
3016                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3017                    .unwrap_or(0);
3018                let concurrent_state = instance.concurrent_state_mut();
3019                if read_complete {
3020                    let count = u32::try_from(count).unwrap();
3021                    let total = if let Some(Event::StreamRead {
3022                        code: ReturnCode::Completed(old_total),
3023                        ..
3024                    }) = concurrent_state.take_event(read_handle_rep)?
3025                    {
3026                        count + old_total
3027                    } else {
3028                        count
3029                    };
3030
3031                    let code = ReturnCode::completed(ty.kind(), total);
3032
3033                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3034                }
3035
3036                if read_buffer_remaining {
3037                    let transmit = concurrent_state.get_mut(transmit_id)?;
3038                    transmit.read = ReadState::GuestReady {
3039                        ty: read_ty,
3040                        flat_abi: read_flat_abi,
3041                        options: read_options,
3042                        address: read_address + (count * item_size),
3043                        count: read_count - count,
3044                        handle: read_handle,
3045                    };
3046                }
3047
3048                if write_complete {
3049                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3050                } else {
3051                    set_guest_ready(concurrent_state)?;
3052                    ReturnCode::Blocked
3053                }
3054            }
3055
3056            ReadState::HostReady {
3057                consume,
3058                guest_offset,
3059                cancel,
3060                cancel_waker,
3061            } => {
3062                assert!(cancel_waker.is_none());
3063                assert!(!cancel);
3064                assert_eq!(0, guest_offset);
3065
3066                if let TransmitIndex::Future(_) = ty {
3067                    transmit.done = true;
3068                }
3069
3070                set_guest_ready(concurrent_state)?;
3071                self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3072            }
3073
3074            ReadState::HostToHost { .. } => unreachable!(),
3075
3076            ReadState::Open => {
3077                set_guest_ready(concurrent_state)?;
3078                ReturnCode::Blocked
3079            }
3080
3081            ReadState::Dropped => {
3082                if let TransmitIndex::Future(_) = ty {
3083                    transmit.done = true;
3084                }
3085
3086                ReturnCode::Dropped(0)
3087            }
3088        };
3089
3090        if result == ReturnCode::Blocked && !options.async_() {
3091            result = self.wait_for_write(store.0, transmit_handle)?;
3092        }
3093
3094        if result != ReturnCode::Blocked {
3095            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3096                TransmitLocalState::Write {
3097                    done: matches!(
3098                        (result, ty),
3099                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3100                    ),
3101                };
3102        }
3103
3104        log::trace!(
3105            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3106        );
3107
3108        Ok(result)
3109    }
3110
3111    /// Handle a host- or guest-initiated write by delivering the item(s) to the
3112    /// `StreamConsumer` for the specified stream or future.
3113    fn consume(
3114        self,
3115        store: &mut dyn VMStore,
3116        kind: TransmitKind,
3117        transmit_id: TableId<TransmitState>,
3118        consume: PollStream,
3119        guest_offset: usize,
3120        cancel: bool,
3121    ) -> Result<ReturnCode> {
3122        let mut future = consume();
3123        self.concurrent_state_mut(store).get_mut(transmit_id)?.read = ReadState::HostReady {
3124            consume,
3125            guest_offset,
3126            cancel,
3127            cancel_waker: None,
3128        };
3129        let poll = self.set_tls(store, || {
3130            future
3131                .as_mut()
3132                .poll(&mut Context::from_waker(&Waker::noop()))
3133        });
3134
3135        Ok(match poll {
3136            Poll::Ready(state) => {
3137                let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3138                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3139                    unreachable!();
3140                };
3141                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3142                transmit.write = WriteState::Open;
3143                code
3144            }
3145            Poll::Pending => {
3146                self.pipe_from_guest(store, kind, transmit_id, future);
3147                ReturnCode::Blocked
3148            }
3149        })
3150    }
3151
3152    /// Read from the specified stream or future from the guest.
3153    pub(super) fn guest_read<T: 'static>(
3154        self,
3155        mut store: StoreContextMut<T>,
3156        ty: TransmitIndex,
3157        options: OptionsIndex,
3158        flat_abi: Option<FlatAbi>,
3159        handle: u32,
3160        address: u32,
3161        count: u32,
3162    ) -> Result<ReturnCode> {
3163        let address = usize::try_from(address).unwrap();
3164        let count = usize::try_from(count).unwrap();
3165        let options = Options::new_index(store.0, self, options);
3166        self.check_bounds(store.0, &options, ty, address, count)?;
3167        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3168        let TransmitLocalState::Read { done } = *state else {
3169            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3170        };
3171
3172        if done {
3173            bail!("cannot read from stream after being notified that the writable end dropped");
3174        }
3175
3176        *state = TransmitLocalState::Busy;
3177        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3178        let concurrent_state = self.concurrent_state_mut(store.0);
3179        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3180        let transmit = concurrent_state.get_mut(transmit_id)?;
3181        log::trace!(
3182            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3183            transmit.write
3184        );
3185
3186        if transmit.done {
3187            bail!("cannot read from future after previous read succeeded");
3188        }
3189
3190        let new_state = if let WriteState::Dropped = &transmit.write {
3191            WriteState::Dropped
3192        } else {
3193            WriteState::Open
3194        };
3195
3196        let set_guest_ready = |me: &mut ConcurrentState| {
3197            let transmit = me.get_mut(transmit_id)?;
3198            assert!(matches!(&transmit.read, ReadState::Open));
3199            transmit.read = ReadState::GuestReady {
3200                ty,
3201                flat_abi,
3202                options,
3203                address,
3204                count,
3205                handle,
3206            };
3207            Ok::<_, crate::Error>(())
3208        };
3209
3210        let mut result = match mem::replace(&mut transmit.write, new_state) {
3211            WriteState::GuestReady {
3212                ty: write_ty,
3213                flat_abi: write_flat_abi,
3214                options: write_options,
3215                address: write_address,
3216                count: write_count,
3217                handle: write_handle,
3218            } => {
3219                assert_eq!(flat_abi, write_flat_abi);
3220
3221                if let TransmitIndex::Future(_) = ty {
3222                    transmit.done = true;
3223                }
3224
3225                let write_handle_rep = transmit.write_handle.rep();
3226
3227                // See the comment in `guest_write` for the
3228                // `ReadState::GuestReady` case concerning zero-length reads and
3229                // writes.
3230
3231                let write_complete = write_count == 0 || count > 0;
3232                let read_complete = write_count > 0;
3233                let write_buffer_remaining = count < write_count;
3234
3235                let count = count.min(write_count);
3236
3237                self.copy(
3238                    store.as_context_mut(),
3239                    flat_abi,
3240                    write_ty,
3241                    &write_options,
3242                    write_address,
3243                    ty,
3244                    &options,
3245                    address,
3246                    count,
3247                    rep,
3248                )?;
3249
3250                let instance = self.id().get_mut(store.0);
3251                let types = instance.component().types();
3252                let item_size = payload(ty, types)
3253                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3254                    .unwrap_or(0);
3255                let concurrent_state = instance.concurrent_state_mut();
3256
3257                if write_complete {
3258                    let count = u32::try_from(count).unwrap();
3259                    let total = if let Some(Event::StreamWrite {
3260                        code: ReturnCode::Completed(old_total),
3261                        ..
3262                    }) = concurrent_state.take_event(write_handle_rep)?
3263                    {
3264                        count + old_total
3265                    } else {
3266                        count
3267                    };
3268
3269                    let code = ReturnCode::completed(ty.kind(), total);
3270
3271                    concurrent_state.send_write_result(
3272                        write_ty,
3273                        transmit_id,
3274                        write_handle,
3275                        code,
3276                    )?;
3277                }
3278
3279                if write_buffer_remaining {
3280                    let transmit = concurrent_state.get_mut(transmit_id)?;
3281                    transmit.write = WriteState::GuestReady {
3282                        ty: write_ty,
3283                        flat_abi: write_flat_abi,
3284                        options: write_options,
3285                        address: write_address + (count * item_size),
3286                        count: write_count - count,
3287                        handle: write_handle,
3288                    };
3289                }
3290
3291                if read_complete {
3292                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3293                } else {
3294                    set_guest_ready(concurrent_state)?;
3295                    ReturnCode::Blocked
3296                }
3297            }
3298
3299            WriteState::HostReady {
3300                produce,
3301                guest_offset,
3302                cancel,
3303                cancel_waker,
3304            } => {
3305                assert!(cancel_waker.is_none());
3306                assert!(!cancel);
3307                assert_eq!(0, guest_offset);
3308
3309                if let TransmitIndex::Future(_) = ty {
3310                    transmit.done = true;
3311                }
3312
3313                set_guest_ready(concurrent_state)?;
3314
3315                self.produce(store.0, ty.kind(), transmit_id, produce, 0, false)?
3316            }
3317
3318            WriteState::Open => {
3319                set_guest_ready(concurrent_state)?;
3320                ReturnCode::Blocked
3321            }
3322
3323            WriteState::Dropped => ReturnCode::Dropped(0),
3324        };
3325
3326        if result == ReturnCode::Blocked && !options.async_() {
3327            result = self.wait_for_read(store.0, transmit_handle)?;
3328        }
3329
3330        if result != ReturnCode::Blocked {
3331            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3332                TransmitLocalState::Read {
3333                    done: matches!(
3334                        (result, ty),
3335                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3336                    ),
3337                };
3338        }
3339
3340        log::trace!(
3341            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3342        );
3343
3344        Ok(result)
3345    }
3346
3347    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
3348    /// for the specified stream or future for items.
3349    fn produce(
3350        self,
3351        store: &mut dyn VMStore,
3352        kind: TransmitKind,
3353        transmit_id: TableId<TransmitState>,
3354        produce: PollStream,
3355        guest_offset: usize,
3356        cancel: bool,
3357    ) -> Result<ReturnCode> {
3358        let mut future = produce();
3359        self.concurrent_state_mut(store).get_mut(transmit_id)?.write = WriteState::HostReady {
3360            produce,
3361            guest_offset,
3362            cancel,
3363            cancel_waker: None,
3364        };
3365        let poll = self.set_tls(store, || {
3366            future
3367                .as_mut()
3368                .poll(&mut Context::from_waker(&Waker::noop()))
3369        });
3370
3371        Ok(match poll {
3372            Poll::Ready(state) => {
3373                let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3374                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3375                    unreachable!();
3376                };
3377                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3378                transmit.read = ReadState::Open;
3379                code
3380            }
3381            Poll::Pending => {
3382                self.pipe_to_guest(store, kind, transmit_id, future);
3383                ReturnCode::Blocked
3384            }
3385        })
3386    }
3387
3388    fn wait_for_write(
3389        self,
3390        store: &mut dyn VMStore,
3391        handle: TableId<TransmitHandle>,
3392    ) -> Result<ReturnCode> {
3393        let waitable = Waitable::Transmit(handle);
3394        self.wait_for_event(store, waitable)?;
3395        let event = waitable.take_event(self.concurrent_state_mut(store))?;
3396        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3397            event
3398        {
3399            waitable.on_delivery(self.id().get_mut(store), event);
3400            Ok(code)
3401        } else {
3402            unreachable!()
3403        }
3404    }
3405
3406    /// Cancel a pending stream or future write.
3407    fn cancel_write(
3408        self,
3409        store: &mut dyn VMStore,
3410        transmit_id: TableId<TransmitState>,
3411        async_: bool,
3412    ) -> Result<ReturnCode> {
3413        let state = self.concurrent_state_mut(store);
3414        let transmit = state.get_mut(transmit_id)?;
3415        log::trace!(
3416            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3417            transmit.read,
3418            transmit.write
3419        );
3420
3421        let code = if let Some(event) =
3422            Waitable::Transmit(transmit.write_handle).take_event(state)?
3423        {
3424            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3425                unreachable!();
3426            };
3427            match (code, event) {
3428                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3429                    ReturnCode::Cancelled(count)
3430                }
3431                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3432                _ => unreachable!(),
3433            }
3434        } else if let ReadState::HostReady {
3435            cancel,
3436            cancel_waker,
3437            ..
3438        } = &mut state.get_mut(transmit_id)?.read
3439        {
3440            *cancel = true;
3441            if let Some(waker) = cancel_waker.take() {
3442                waker.wake();
3443            }
3444
3445            if async_ {
3446                ReturnCode::Blocked
3447            } else {
3448                let handle = self
3449                    .concurrent_state_mut(store)
3450                    .get_mut(transmit_id)?
3451                    .write_handle;
3452                self.wait_for_write(store, handle)?
3453            }
3454        } else {
3455            ReturnCode::Cancelled(0)
3456        };
3457
3458        let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3459
3460        match &transmit.write {
3461            WriteState::GuestReady { .. } => {
3462                transmit.write = WriteState::Open;
3463            }
3464            WriteState::HostReady { .. } => todo!("support host write cancellation"),
3465            WriteState::Open | WriteState::Dropped => {}
3466        }
3467
3468        log::trace!("cancelled write {transmit_id:?}: {code:?}");
3469
3470        Ok(code)
3471    }
3472
3473    fn wait_for_read(
3474        self,
3475        store: &mut dyn VMStore,
3476        handle: TableId<TransmitHandle>,
3477    ) -> Result<ReturnCode> {
3478        let waitable = Waitable::Transmit(handle);
3479        self.wait_for_event(store, waitable)?;
3480        let event = waitable.take_event(self.concurrent_state_mut(store))?;
3481        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3482            event
3483        {
3484            waitable.on_delivery(self.id().get_mut(store), event);
3485            Ok(code)
3486        } else {
3487            unreachable!()
3488        }
3489    }
3490
3491    /// Cancel a pending stream or future read.
3492    fn cancel_read(
3493        self,
3494        store: &mut dyn VMStore,
3495        transmit_id: TableId<TransmitState>,
3496        async_: bool,
3497    ) -> Result<ReturnCode> {
3498        let state = self.concurrent_state_mut(store);
3499        let transmit = state.get_mut(transmit_id)?;
3500        log::trace!(
3501            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3502            transmit.read,
3503            transmit.write
3504        );
3505
3506        let code = if let Some(event) =
3507            Waitable::Transmit(transmit.read_handle).take_event(state)?
3508        {
3509            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3510                unreachable!();
3511            };
3512            match (code, event) {
3513                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3514                    ReturnCode::Cancelled(count)
3515                }
3516                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3517                _ => unreachable!(),
3518            }
3519        } else if let WriteState::HostReady {
3520            cancel,
3521            cancel_waker,
3522            ..
3523        } = &mut state.get_mut(transmit_id)?.write
3524        {
3525            *cancel = true;
3526            if let Some(waker) = cancel_waker.take() {
3527                waker.wake();
3528            }
3529
3530            if async_ {
3531                ReturnCode::Blocked
3532            } else {
3533                let handle = self
3534                    .concurrent_state_mut(store)
3535                    .get_mut(transmit_id)?
3536                    .read_handle;
3537                self.wait_for_read(store, handle)?
3538            }
3539        } else {
3540            ReturnCode::Cancelled(0)
3541        };
3542
3543        let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3544
3545        match &transmit.read {
3546            ReadState::GuestReady { .. } => {
3547                transmit.read = ReadState::Open;
3548            }
3549            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3550                todo!("support host read cancellation")
3551            }
3552            ReadState::Open | ReadState::Dropped => {}
3553        }
3554
3555        log::trace!("cancelled read {transmit_id:?}: {code:?}");
3556
3557        Ok(code)
3558    }
3559
3560    /// Cancel a pending write for the specified stream or future from the guest.
3561    fn guest_cancel_write(
3562        self,
3563        store: &mut dyn VMStore,
3564        ty: TransmitIndex,
3565        async_: bool,
3566        writer: u32,
3567    ) -> Result<ReturnCode> {
3568        let (rep, state) =
3569            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3570        let id = TableId::<TransmitHandle>::new(rep);
3571        log::trace!("guest cancel write {id:?} (handle {writer})");
3572        match state {
3573            TransmitLocalState::Write { .. } => {
3574                bail!("stream or future write cancelled when no write is pending")
3575            }
3576            TransmitLocalState::Read { .. } => {
3577                bail!("passed read end to `{{stream|future}}.cancel-write`")
3578            }
3579            TransmitLocalState::Busy => {}
3580        }
3581        let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
3582        let code = self.cancel_write(store, transmit_id, async_)?;
3583        if !matches!(code, ReturnCode::Blocked) {
3584            let state =
3585                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3586                    .1;
3587            if let TransmitLocalState::Busy = state {
3588                *state = TransmitLocalState::Write { done: false };
3589            }
3590        }
3591        Ok(code)
3592    }
3593
3594    /// Cancel a pending read for the specified stream or future from the guest.
3595    fn guest_cancel_read(
3596        self,
3597        store: &mut dyn VMStore,
3598        ty: TransmitIndex,
3599        async_: bool,
3600        reader: u32,
3601    ) -> Result<ReturnCode> {
3602        let (rep, state) =
3603            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3604        let id = TableId::<TransmitHandle>::new(rep);
3605        log::trace!("guest cancel read {id:?} (handle {reader})");
3606        match state {
3607            TransmitLocalState::Read { .. } => {
3608                bail!("stream or future read cancelled when no read is pending")
3609            }
3610            TransmitLocalState::Write { .. } => {
3611                bail!("passed write end to `{{stream|future}}.cancel-read`")
3612            }
3613            TransmitLocalState::Busy => {}
3614        }
3615        let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
3616        let code = self.cancel_read(store, transmit_id, async_)?;
3617        if !matches!(code, ReturnCode::Blocked) {
3618            let state =
3619                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3620                    .1;
3621            if let TransmitLocalState::Busy = state {
3622                *state = TransmitLocalState::Read { done: false };
3623            }
3624        }
3625        Ok(code)
3626    }
3627
3628    /// Drop the readable end of the specified stream or future from the guest.
3629    fn guest_drop_readable(
3630        self,
3631        store: &mut dyn VMStore,
3632        ty: TransmitIndex,
3633        reader: u32,
3634    ) -> Result<()> {
3635        let table = self.id().get_mut(store).table_for_transmit(ty);
3636        let (rep, _is_done) = match ty {
3637            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3638            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3639        };
3640        let kind = match ty {
3641            TransmitIndex::Stream(_) => TransmitKind::Stream,
3642            TransmitIndex::Future(_) => TransmitKind::Future,
3643        };
3644        let id = TableId::<TransmitHandle>::new(rep);
3645        log::trace!("guest_drop_readable: drop reader {id:?}");
3646        self.host_drop_reader(store, id, kind)
3647    }
3648
3649    /// Create a new error context for the given component.
3650    pub(crate) fn error_context_new(
3651        self,
3652        store: &mut StoreOpaque,
3653        caller: RuntimeComponentInstanceIndex,
3654        ty: TypeComponentLocalErrorContextTableIndex,
3655        options: OptionsIndex,
3656        debug_msg_address: u32,
3657        debug_msg_len: u32,
3658    ) -> Result<u32> {
3659        self.id().get(store).check_may_leave(caller)?;
3660        let options = Options::new_index(store, self, options);
3661        let lift_ctx = &mut LiftContext::new(store, &options, self);
3662        //  Read string from guest memory
3663        let address = usize::try_from(debug_msg_address)?;
3664        let len = usize::try_from(debug_msg_len)?;
3665        lift_ctx
3666            .memory()
3667            .get(address..)
3668            .and_then(|b| b.get(..len))
3669            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3670        let message = WasmStr::new(address, len, lift_ctx)?;
3671
3672        // Create a new ErrorContext that is tracked along with other concurrent state
3673        let err_ctx = ErrorContextState {
3674            debug_msg: message
3675                .to_str_from_memory(options.memory(store))?
3676                .to_string(),
3677        };
3678        let state = self.concurrent_state_mut(store);
3679        let table_id = state.push(err_ctx)?;
3680        let global_ref_count_idx =
3681            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3682
3683        // Add to the global error context ref counts
3684        let _ = state
3685            .global_error_context_ref_counts
3686            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3687
3688        // Error context are tracked both locally (to a single component instance) and globally
3689        // the counts for both must stay in sync.
3690        //
3691        // Here we reflect the newly created global concurrent error context state into the
3692        // component instance's locally tracked count, along with the appropriate key into the global
3693        // ref tracking data structures to enable later lookup
3694        let local_idx = self
3695            .id()
3696            .get_mut(store)
3697            .table_for_error_context(ty)
3698            .error_context_insert(table_id.rep())?;
3699
3700        Ok(local_idx)
3701    }
3702
3703    /// Retrieve the debug message from the specified error context.
3704    pub(super) fn error_context_debug_message<T>(
3705        self,
3706        store: StoreContextMut<T>,
3707        ty: TypeComponentLocalErrorContextTableIndex,
3708        options: OptionsIndex,
3709        err_ctx_handle: u32,
3710        debug_msg_address: u32,
3711    ) -> Result<()> {
3712        // Retrieve the error context and internal debug message
3713        let handle_table_id_rep = self
3714            .id()
3715            .get_mut(store.0)
3716            .table_for_error_context(ty)
3717            .error_context_rep(err_ctx_handle)?;
3718
3719        let state = self.concurrent_state_mut(store.0);
3720        // Get the state associated with the error context
3721        let ErrorContextState { debug_msg } =
3722            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3723        let debug_msg = debug_msg.clone();
3724
3725        let options = Options::new_index(store.0, self, options);
3726        let types = self.id().get(store.0).component().types().clone();
3727        let lower_cx = &mut LowerContext::new(store, &options, &types, self);
3728        let debug_msg_address = usize::try_from(debug_msg_address)?;
3729        // Lower the string into the component's memory
3730        let offset = lower_cx
3731            .as_slice_mut()
3732            .get(debug_msg_address..)
3733            .and_then(|b| b.get(..debug_msg.bytes().len()))
3734            .map(|_| debug_msg_address)
3735            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3736        debug_msg
3737            .as_str()
3738            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3739
3740        Ok(())
3741    }
3742
3743    /// Implements the `future.cancel-read` intrinsic.
3744    pub(crate) fn future_cancel_read(
3745        self,
3746        store: &mut dyn VMStore,
3747        caller: RuntimeComponentInstanceIndex,
3748        ty: TypeFutureTableIndex,
3749        async_: bool,
3750        reader: u32,
3751    ) -> Result<u32> {
3752        self.id().get(store).check_may_leave(caller)?;
3753        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3754            .map(|v| v.encode())
3755    }
3756
3757    /// Implements the `future.cancel-write` intrinsic.
3758    pub(crate) fn future_cancel_write(
3759        self,
3760        store: &mut dyn VMStore,
3761        caller: RuntimeComponentInstanceIndex,
3762        ty: TypeFutureTableIndex,
3763        async_: bool,
3764        writer: u32,
3765    ) -> Result<u32> {
3766        self.id().get(store).check_may_leave(caller)?;
3767        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3768            .map(|v| v.encode())
3769    }
3770
3771    /// Implements the `stream.cancel-read` intrinsic.
3772    pub(crate) fn stream_cancel_read(
3773        self,
3774        store: &mut dyn VMStore,
3775        caller: RuntimeComponentInstanceIndex,
3776        ty: TypeStreamTableIndex,
3777        async_: bool,
3778        reader: u32,
3779    ) -> Result<u32> {
3780        self.id().get(store).check_may_leave(caller)?;
3781        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3782            .map(|v| v.encode())
3783    }
3784
3785    /// Implements the `stream.cancel-write` intrinsic.
3786    pub(crate) fn stream_cancel_write(
3787        self,
3788        store: &mut dyn VMStore,
3789        caller: RuntimeComponentInstanceIndex,
3790        ty: TypeStreamTableIndex,
3791        async_: bool,
3792        writer: u32,
3793    ) -> Result<u32> {
3794        self.id().get(store).check_may_leave(caller)?;
3795        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3796            .map(|v| v.encode())
3797    }
3798
3799    /// Implements the `future.drop-readable` intrinsic.
3800    pub(crate) fn future_drop_readable(
3801        self,
3802        store: &mut dyn VMStore,
3803        caller: RuntimeComponentInstanceIndex,
3804        ty: TypeFutureTableIndex,
3805        reader: u32,
3806    ) -> Result<()> {
3807        self.id().get(store).check_may_leave(caller)?;
3808        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3809    }
3810
3811    /// Implements the `stream.drop-readable` intrinsic.
3812    pub(crate) fn stream_drop_readable(
3813        self,
3814        store: &mut dyn VMStore,
3815        caller: RuntimeComponentInstanceIndex,
3816        ty: TypeStreamTableIndex,
3817        reader: u32,
3818    ) -> Result<()> {
3819        self.id().get(store).check_may_leave(caller)?;
3820        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3821    }
3822}
3823
3824impl ComponentInstance {
3825    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
3826        let (tables, types) = self.guest_tables();
3827        let runtime_instance = match ty {
3828            TransmitIndex::Stream(ty) => types[ty].instance,
3829            TransmitIndex::Future(ty) => types[ty].instance,
3830        };
3831        &mut tables[runtime_instance]
3832    }
3833
3834    fn table_for_error_context(
3835        self: Pin<&mut Self>,
3836        ty: TypeComponentLocalErrorContextTableIndex,
3837    ) -> &mut HandleTable {
3838        let (tables, types) = self.guest_tables();
3839        let runtime_instance = types[ty].instance;
3840        &mut tables[runtime_instance]
3841    }
3842
3843    fn get_mut_by_index(
3844        self: Pin<&mut Self>,
3845        ty: TransmitIndex,
3846        index: u32,
3847    ) -> Result<(u32, &mut TransmitLocalState)> {
3848        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
3849    }
3850
3851    /// Allocate a new future or stream and grant ownership of both the read and
3852    /// write ends to the (sub-)component instance to which the specified
3853    /// `TransmitIndex` belongs.
3854    fn guest_new(mut self: Pin<&mut Self>, ty: TransmitIndex) -> Result<ResourcePair> {
3855        let (write, read) = self.as_mut().concurrent_state_mut().new_transmit()?;
3856
3857        let table = self.as_mut().table_for_transmit(ty);
3858        let (read_handle, write_handle) = match ty {
3859            TransmitIndex::Future(ty) => (
3860                table.future_insert_read(ty, read.rep())?,
3861                table.future_insert_write(ty, write.rep())?,
3862            ),
3863            TransmitIndex::Stream(ty) => (
3864                table.stream_insert_read(ty, read.rep())?,
3865                table.stream_insert_write(ty, write.rep())?,
3866            ),
3867        };
3868
3869        let state = self.as_mut().concurrent_state_mut();
3870        state.get_mut(read)?.common.handle = Some(read_handle);
3871        state.get_mut(write)?.common.handle = Some(write_handle);
3872
3873        Ok(ResourcePair {
3874            write: write_handle,
3875            read: read_handle,
3876        })
3877    }
3878
3879    /// Drop the specified error context.
3880    pub(crate) fn error_context_drop(
3881        mut self: Pin<&mut Self>,
3882        caller: RuntimeComponentInstanceIndex,
3883        ty: TypeComponentLocalErrorContextTableIndex,
3884        error_context: u32,
3885    ) -> Result<()> {
3886        self.check_may_leave(caller)?;
3887
3888        let local_handle_table = self.as_mut().table_for_error_context(ty);
3889
3890        let rep = local_handle_table.error_context_drop(error_context)?;
3891
3892        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3893
3894        let state = self.concurrent_state_mut();
3895        let GlobalErrorContextRefCount(global_ref_count) = state
3896            .global_error_context_ref_counts
3897            .get_mut(&global_ref_count_idx)
3898            .expect("retrieve concurrent state for error context during drop");
3899
3900        // Reduce the component-global ref count, removing tracking if necessary
3901        assert!(*global_ref_count >= 1);
3902        *global_ref_count -= 1;
3903        if *global_ref_count == 0 {
3904            state
3905                .global_error_context_ref_counts
3906                .remove(&global_ref_count_idx);
3907
3908            state
3909                .delete(TableId::<ErrorContextState>::new(rep))
3910                .context("deleting component-global error context data")?;
3911        }
3912
3913        Ok(())
3914    }
3915
3916    /// Transfer ownership of the specified stream or future read end from one
3917    /// guest to another.
3918    fn guest_transfer(
3919        mut self: Pin<&mut Self>,
3920        src_idx: u32,
3921        src: TransmitIndex,
3922        dst: TransmitIndex,
3923    ) -> Result<u32> {
3924        let src_table = self.as_mut().table_for_transmit(src);
3925        let (rep, is_done) = match src {
3926            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
3927            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
3928        };
3929        if is_done {
3930            bail!("cannot lift after being notified that the writable end dropped");
3931        }
3932        let dst_table = self.as_mut().table_for_transmit(dst);
3933        let handle = match dst {
3934            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
3935            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
3936        }?;
3937        self.concurrent_state_mut()
3938            .get_mut(TableId::<TransmitHandle>::new(rep))?
3939            .common
3940            .handle = Some(handle);
3941        Ok(handle)
3942    }
3943
3944    /// Implements the `future.new` intrinsic.
3945    pub(crate) fn future_new(
3946        self: Pin<&mut Self>,
3947        caller: RuntimeComponentInstanceIndex,
3948        ty: TypeFutureTableIndex,
3949    ) -> Result<ResourcePair> {
3950        self.check_may_leave(caller)?;
3951        self.guest_new(TransmitIndex::Future(ty))
3952    }
3953
3954    /// Implements the `stream.new` intrinsic.
3955    pub(crate) fn stream_new(
3956        self: Pin<&mut Self>,
3957        caller: RuntimeComponentInstanceIndex,
3958        ty: TypeStreamTableIndex,
3959    ) -> Result<ResourcePair> {
3960        self.check_may_leave(caller)?;
3961        self.guest_new(TransmitIndex::Stream(ty))
3962    }
3963
3964    /// Transfer ownership of the specified future read end from one guest to
3965    /// another.
3966    pub(crate) fn future_transfer(
3967        self: Pin<&mut Self>,
3968        src_idx: u32,
3969        src: TypeFutureTableIndex,
3970        dst: TypeFutureTableIndex,
3971    ) -> Result<u32> {
3972        self.guest_transfer(
3973            src_idx,
3974            TransmitIndex::Future(src),
3975            TransmitIndex::Future(dst),
3976        )
3977    }
3978
3979    /// Transfer ownership of the specified stream read end from one guest to
3980    /// another.
3981    pub(crate) fn stream_transfer(
3982        self: Pin<&mut Self>,
3983        src_idx: u32,
3984        src: TypeStreamTableIndex,
3985        dst: TypeStreamTableIndex,
3986    ) -> Result<u32> {
3987        self.guest_transfer(
3988            src_idx,
3989            TransmitIndex::Stream(src),
3990            TransmitIndex::Stream(dst),
3991        )
3992    }
3993
3994    /// Copy the specified error context from one component to another.
3995    pub(crate) fn error_context_transfer(
3996        mut self: Pin<&mut Self>,
3997        src_idx: u32,
3998        src: TypeComponentLocalErrorContextTableIndex,
3999        dst: TypeComponentLocalErrorContextTableIndex,
4000    ) -> Result<u32> {
4001        let rep = self
4002            .as_mut()
4003            .table_for_error_context(src)
4004            .error_context_rep(src_idx)?;
4005        let dst_idx = self
4006            .as_mut()
4007            .table_for_error_context(dst)
4008            .error_context_insert(rep)?;
4009
4010        // Update the global (cross-subcomponent) count for error contexts
4011        // as the new component has essentially created a new reference that will
4012        // be dropped/handled independently
4013        let global_ref_count = self
4014            .concurrent_state_mut()
4015            .global_error_context_ref_counts
4016            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4017            .context("global ref count present for existing (sub)component error context")?;
4018        global_ref_count.0 += 1;
4019
4020        Ok(dst_idx)
4021    }
4022}
4023
4024impl ConcurrentState {
4025    fn send_write_result(
4026        &mut self,
4027        ty: TransmitIndex,
4028        id: TableId<TransmitState>,
4029        handle: u32,
4030        code: ReturnCode,
4031    ) -> Result<()> {
4032        let write_handle = self.get_mut(id)?.write_handle.rep();
4033        self.set_event(
4034            write_handle,
4035            match ty {
4036                TransmitIndex::Future(ty) => Event::FutureWrite {
4037                    code,
4038                    pending: Some((ty, handle)),
4039                },
4040                TransmitIndex::Stream(ty) => Event::StreamWrite {
4041                    code,
4042                    pending: Some((ty, handle)),
4043                },
4044            },
4045        )
4046    }
4047
4048    fn send_read_result(
4049        &mut self,
4050        ty: TransmitIndex,
4051        id: TableId<TransmitState>,
4052        handle: u32,
4053        code: ReturnCode,
4054    ) -> Result<()> {
4055        let read_handle = self.get_mut(id)?.read_handle.rep();
4056        self.set_event(
4057            read_handle,
4058            match ty {
4059                TransmitIndex::Future(ty) => Event::FutureRead {
4060                    code,
4061                    pending: Some((ty, handle)),
4062                },
4063                TransmitIndex::Stream(ty) => Event::StreamRead {
4064                    code,
4065                    pending: Some((ty, handle)),
4066                },
4067            },
4068        )
4069    }
4070
4071    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4072        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4073    }
4074
4075    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4076        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4077    }
4078
4079    /// Set or update the event for the specified waitable.
4080    ///
4081    /// If there is already an event set for this waitable, we assert that it is
4082    /// of the same variant as the new one and reuse the `ReturnCode` count and
4083    /// the `pending` field if applicable.
4084    // TODO: This is a bit awkward due to how
4085    // `Event::{Stream,Future}{Write,Read}` and
4086    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4087    // Consider updating those representations in a way that allows this
4088    // function to be simplified.
4089    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4090        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4091
4092        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4093            let (ReturnCode::Completed(count)
4094            | ReturnCode::Dropped(count)
4095            | ReturnCode::Cancelled(count)) = old
4096            else {
4097                unreachable!()
4098            };
4099
4100            match new {
4101                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4102                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4103                _ => unreachable!(),
4104            }
4105        }
4106
4107        let event = match (waitable.take_event(self)?, event) {
4108            (None, _) => event,
4109            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4110            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4111            (
4112                Some(Event::StreamWrite {
4113                    code: old_code,
4114                    pending: old_pending,
4115                }),
4116                Event::StreamWrite { code, pending },
4117            ) => Event::StreamWrite {
4118                code: update_code(old_code, code),
4119                pending: old_pending.or(pending),
4120            },
4121            (
4122                Some(Event::StreamRead {
4123                    code: old_code,
4124                    pending: old_pending,
4125                }),
4126                Event::StreamRead { code, pending },
4127            ) => Event::StreamRead {
4128                code: update_code(old_code, code),
4129                pending: old_pending.or(pending),
4130            },
4131            _ => unreachable!(),
4132        };
4133
4134        waitable.set_event(self, Some(event))
4135    }
4136
4137    /// Allocate a new future or stream, including the `TransmitState` and the
4138    /// `TransmitHandle`s corresponding to the read and write ends.
4139    fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4140        let state_id = self.push(TransmitState::default())?;
4141
4142        let write = self.push(TransmitHandle::new(state_id))?;
4143        let read = self.push(TransmitHandle::new(state_id))?;
4144
4145        let state = self.get_mut(state_id)?;
4146        state.write_handle = write;
4147        state.read_handle = read;
4148
4149        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4150
4151        Ok((write, read))
4152    }
4153
4154    /// Delete the specified future or stream, including the read and write ends.
4155    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4156        let state = self.delete(state_id)?;
4157        self.delete(state.write_handle)?;
4158        self.delete(state.read_handle)?;
4159
4160        log::trace!(
4161            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4162            state.write_handle,
4163            state.read_handle,
4164        );
4165
4166        Ok(())
4167    }
4168}
4169
4170pub(crate) struct ResourcePair {
4171    pub(crate) write: u32,
4172    pub(crate) read: u32,
4173}
4174
4175#[cfg(test)]
4176mod tests {
4177    use super::*;
4178    use crate::{Engine, Store};
4179    use core::future::pending;
4180    use core::pin::pin;
4181    use std::sync::LazyLock;
4182
4183    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4184
4185    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4186    where
4187        T: FutureProducer<()>,
4188    {
4189        rx.poll_produce(
4190            &mut Context::from_waker(Waker::noop()),
4191            Store::new(&ENGINE, ()).as_context_mut(),
4192            finish,
4193        )
4194    }
4195
4196    #[test]
4197    fn future_producer() {
4198        let mut fut = pin!(async { anyhow::Ok(()) });
4199        assert!(matches!(
4200            poll_future_producer(fut.as_mut(), false),
4201            Poll::Ready(Ok(Some(()))),
4202        ));
4203
4204        let mut fut = pin!(async { anyhow::Ok(()) });
4205        assert!(matches!(
4206            poll_future_producer(fut.as_mut(), true),
4207            Poll::Ready(Ok(Some(()))),
4208        ));
4209
4210        let mut fut = pin!(pending::<Result<()>>());
4211        assert!(matches!(
4212            poll_future_producer(fut.as_mut(), false),
4213            Poll::Pending,
4214        ));
4215        assert!(matches!(
4216            poll_future_producer(fut.as_mut(), true),
4217            Poll::Ready(Ok(None)),
4218        ));
4219
4220        let (tx, rx) = oneshot::channel();
4221        let mut rx = pin!(rx);
4222        assert!(matches!(
4223            poll_future_producer(rx.as_mut(), false),
4224            Poll::Pending,
4225        ));
4226        assert!(matches!(
4227            poll_future_producer(rx.as_mut(), true),
4228            Poll::Ready(Ok(None)),
4229        ));
4230        tx.send(()).unwrap();
4231        assert!(matches!(
4232            poll_future_producer(rx.as_mut(), true),
4233            Poll::Ready(Ok(Some(()))),
4234        ));
4235
4236        let (tx, rx) = oneshot::channel();
4237        let mut rx = pin!(rx);
4238        tx.send(()).unwrap();
4239        assert!(matches!(
4240            poll_future_producer(rx.as_mut(), false),
4241            Poll::Ready(Ok(Some(()))),
4242        ));
4243
4244        let (tx, rx) = oneshot::channel::<()>();
4245        let mut rx = pin!(rx);
4246        drop(tx);
4247        assert!(matches!(
4248            poll_future_producer(rx.as_mut(), false),
4249            Poll::Ready(Err(..)),
4250        ));
4251
4252        let (tx, rx) = oneshot::channel::<()>();
4253        let mut rx = pin!(rx);
4254        drop(tx);
4255        assert!(matches!(
4256            poll_future_producer(rx.as_mut(), true),
4257            Poll::Ready(Err(..)),
4258        ));
4259    }
4260}