wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext, Options};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Error, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use core::fmt;
15use core::future;
16use core::iter;
17use core::marker::PhantomData;
18use core::mem::{self, MaybeUninit};
19use core::pin::Pin;
20use core::task::{Context, Poll, Waker, ready};
21use futures::channel::oneshot;
22use futures::{FutureExt as _, stream};
23use std::any::{Any, TypeId};
24use std::boxed::Box;
25use std::io::Cursor;
26use std::string::{String, ToString};
27use std::sync::{Arc, Mutex};
28use std::vec::Vec;
29use wasmtime_environ::component::{
30    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
31    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
32    TypeFutureTableIndex, TypeStreamTableIndex,
33};
34
35pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
36
37mod buffers;
38
39/// Enum for distinguishing between a stream or future in functions that handle
40/// both.
41#[derive(Copy, Clone, Debug)]
42pub enum TransmitKind {
43    Stream,
44    Future,
45}
46
47/// Represents `{stream,future}.{read,write}` results.
48#[derive(Copy, Clone, Debug, PartialEq)]
49pub enum ReturnCode {
50    Blocked,
51    Completed(u32),
52    Dropped(u32),
53    Cancelled(u32),
54}
55
56impl ReturnCode {
57    /// Pack `self` into a single 32-bit integer that may be returned to the
58    /// guest.
59    ///
60    /// This corresponds to `pack_copy_result` in the Component Model spec.
61    pub fn encode(&self) -> u32 {
62        const BLOCKED: u32 = 0xffff_ffff;
63        const COMPLETED: u32 = 0x0;
64        const DROPPED: u32 = 0x1;
65        const CANCELLED: u32 = 0x2;
66        match self {
67            ReturnCode::Blocked => BLOCKED,
68            ReturnCode::Completed(n) => {
69                debug_assert!(*n < (1 << 28));
70                (n << 4) | COMPLETED
71            }
72            ReturnCode::Dropped(n) => {
73                debug_assert!(*n < (1 << 28));
74                (n << 4) | DROPPED
75            }
76            ReturnCode::Cancelled(n) => {
77                debug_assert!(*n < (1 << 28));
78                (n << 4) | CANCELLED
79            }
80        }
81    }
82
83    /// Returns `Self::Completed` with the specified count (or zero if
84    /// `matches!(kind, TransmitKind::Future)`)
85    fn completed(kind: TransmitKind, count: u32) -> Self {
86        Self::Completed(if let TransmitKind::Future = kind {
87            0
88        } else {
89            count
90        })
91    }
92}
93
94/// Represents a stream or future type index.
95///
96/// This is useful as a parameter type for functions which operate on either a
97/// future or a stream.
98#[derive(Copy, Clone, Debug)]
99pub enum TransmitIndex {
100    Stream(TypeStreamTableIndex),
101    Future(TypeFutureTableIndex),
102}
103
104impl TransmitIndex {
105    pub fn kind(&self) -> TransmitKind {
106        match self {
107            TransmitIndex::Stream(_) => TransmitKind::Stream,
108            TransmitIndex::Future(_) => TransmitKind::Future,
109        }
110    }
111}
112
113/// Retrieve the payload type of the specified stream or future, or `None` if it
114/// has no payload type.
115fn payload(ty: TransmitIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
116    match ty {
117        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
118        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
119    }
120}
121
122/// Retrieve the host rep and state for the specified guest-visible waitable
123/// handle.
124fn get_mut_by_index_from(
125    handle_table: &mut HandleTable,
126    ty: TransmitIndex,
127    index: u32,
128) -> Result<(u32, &mut TransmitLocalState)> {
129    match ty {
130        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
131        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
132    }
133}
134
135fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
136    mut store: StoreContextMut<U>,
137    instance: Instance,
138    options: &Options,
139    ty: TransmitIndex,
140    address: usize,
141    count: usize,
142    buffer: &mut B,
143) -> Result<()> {
144    let types = instance.id().get(store.0).component().types().clone();
145    let count = buffer.remaining().len().min(count);
146
147    let lower = &mut if T::MAY_REQUIRE_REALLOC {
148        LowerContext::new
149    } else {
150        LowerContext::new_without_realloc
151    }(store.as_context_mut(), options, &types, instance);
152
153    if address % usize::try_from(T::ALIGN32)? != 0 {
154        bail!("read pointer not aligned");
155    }
156    lower
157        .as_slice_mut()
158        .get_mut(address..)
159        .and_then(|b| b.get_mut(..T::SIZE32 * count))
160        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
161
162    if let Some(ty) = payload(ty, &types) {
163        T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
164    }
165
166    buffer.skip(count);
167
168    Ok(())
169}
170
171fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
172    lift: &mut LiftContext<'_>,
173    ty: Option<InterfaceType>,
174    buffer: &mut B,
175    address: usize,
176    count: usize,
177) -> Result<()> {
178    let count = count.min(buffer.remaining_capacity());
179    if T::IS_RUST_UNIT_TYPE {
180        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
181        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
182        // is a valid way to populate the zero-sized buffer.
183        buffer.extend(
184            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
185        )
186    } else {
187        let ty = ty.unwrap();
188        if address % usize::try_from(T::ALIGN32)? != 0 {
189            bail!("write pointer not aligned");
190        }
191        lift.memory()
192            .get(address..)
193            .and_then(|b| b.get(..T::SIZE32 * count))
194            .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
195
196        let list = &WasmList::new(address, count, lift, ty)?;
197        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
198    }
199    Ok(())
200}
201
202/// Represents the state associated with an error context
203#[derive(Debug, PartialEq, Eq, PartialOrd)]
204pub(super) struct ErrorContextState {
205    /// Debug message associated with the error context
206    pub(crate) debug_msg: String,
207}
208
209/// Represents the size and alignment for a "flat" Component Model type,
210/// i.e. one containing no pointers or handles.
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub(super) struct FlatAbi {
213    pub(super) size: u32,
214    pub(super) align: u32,
215}
216
217/// Represents the buffer for a host- or guest-initiated stream read.
218pub struct Destination<'a, T, B> {
219    id: TableId<TransmitState>,
220    buffer: &'a mut B,
221    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
222    _phantom: PhantomData<fn() -> T>,
223}
224
225impl<'a, T, B> Destination<'a, T, B> {
226    /// Reborrow `self` so it can be used again later.
227    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
228        Destination {
229            id: self.id,
230            buffer: &mut *self.buffer,
231            host_buffer: self.host_buffer.as_deref_mut(),
232            _phantom: PhantomData,
233        }
234    }
235
236    /// Take the buffer out of `self`, leaving a default-initialized one in its
237    /// place.
238    ///
239    /// This can be useful for reusing the previously-stored buffer's capacity
240    /// instead of allocating a fresh one.
241    pub fn take_buffer(&mut self) -> B
242    where
243        B: Default,
244    {
245        mem::take(self.buffer)
246    }
247
248    /// Store the specified buffer in `self`.
249    ///
250    /// Any items contained in the buffer will be delivered to the reader after
251    /// the `StreamProducer::poll_produce` call to which this `Destination` was
252    /// passed returns (unless overwritten by another call to `set_buffer`).
253    ///
254    /// If items are stored via this buffer _and_ written via a
255    /// `DirectDestination` view of `self`, then the items in the buffer will be
256    /// delivered after the ones written using `DirectDestination`.
257    pub fn set_buffer(&mut self, buffer: B) {
258        *self.buffer = buffer;
259    }
260
261    /// Return the remaining number of items the current read has capacity to
262    /// accept, if known.
263    ///
264    /// This will return `Some(_)` if the reader is a guest; it will return
265    /// `None` if the reader is the host.
266    ///
267    /// Note that, if this returns `None(0)`, the producer must still attempt to
268    /// produce at least one item if the value of `finish` passed to
269    /// `StreamProducer::poll_produce` is false.  In that case, the reader is
270    /// effectively asking when the producer will be able to produce items
271    /// without blocking (or reach a terminal state such as end-of-stream),
272    /// meaning the next non-zero read must complete without blocking.
273    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
274        let transmit = store
275            .as_context_mut()
276            .0
277            .concurrent_state_mut()
278            .get_mut(self.id)
279            .unwrap();
280
281        if let &ReadState::GuestReady { count, .. } = &transmit.read {
282            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
283                unreachable!()
284            };
285
286            Some(count - guest_offset)
287        } else {
288            None
289        }
290    }
291}
292
293impl<'a, B> Destination<'a, u8, B> {
294    /// Return a `DirectDestination` view of `self`.
295    ///
296    /// If the reader is a guest, this will provide direct access to the guest's
297    /// read buffer.  If the reader is a host, this will provide access to a
298    /// buffer which will be delivered to the host before any items stored using
299    /// `Destination::set_buffer`.
300    ///
301    /// `capacity` will only be used if the reader is a host, in which case it
302    /// will update the length of the buffer, possibly zero-initializing the new
303    /// elements if the new length is larger than the old length.
304    pub fn as_direct<D>(
305        mut self,
306        store: StoreContextMut<'a, D>,
307        capacity: usize,
308    ) -> DirectDestination<'a, D> {
309        if let Some(buffer) = self.host_buffer.as_deref_mut() {
310            buffer.set_position(0);
311            if buffer.get_mut().is_empty() {
312                buffer.get_mut().resize(capacity, 0);
313            }
314        }
315
316        DirectDestination {
317            id: self.id,
318            host_buffer: self.host_buffer,
319            store,
320        }
321    }
322}
323
324/// Represents a read from a `stream<u8>`, providing direct access to the
325/// writer's buffer.
326pub struct DirectDestination<'a, D: 'static> {
327    id: TableId<TransmitState>,
328    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
329    store: StoreContextMut<'a, D>,
330}
331
332impl<D: 'static> DirectDestination<'_, D> {
333    /// Provide direct access to the writer's buffer.
334    pub fn remaining(&mut self) -> &mut [u8] {
335        if let Some(buffer) = self.host_buffer.as_deref_mut() {
336            buffer.get_mut()
337        } else {
338            let transmit = self
339                .store
340                .as_context_mut()
341                .0
342                .concurrent_state_mut()
343                .get_mut(self.id)
344                .unwrap();
345
346            let &ReadState::GuestReady {
347                address,
348                count,
349                options,
350                ..
351            } = &transmit.read
352            else {
353                unreachable!();
354            };
355
356            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
357                unreachable!()
358            };
359
360            options
361                .memory_mut(self.store.0)
362                .get_mut((address + guest_offset)..)
363                .and_then(|b| b.get_mut(..(count - guest_offset)))
364                .unwrap()
365        }
366    }
367
368    /// Mark the specified number of bytes as written to the writer's buffer.
369    ///
370    /// This will panic if the count is larger than the size of the
371    /// buffer returned by `Self::remaining`.
372    pub fn mark_written(&mut self, count: usize) {
373        if let Some(buffer) = self.host_buffer.as_deref_mut() {
374            buffer.set_position(
375                buffer
376                    .position()
377                    .checked_add(u64::try_from(count).unwrap())
378                    .unwrap(),
379            );
380        } else {
381            let transmit = self
382                .store
383                .as_context_mut()
384                .0
385                .concurrent_state_mut()
386                .get_mut(self.id)
387                .unwrap();
388
389            let ReadState::GuestReady {
390                count: read_count, ..
391            } = &transmit.read
392            else {
393                unreachable!();
394            };
395
396            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
397                unreachable!()
398            };
399
400            if *guest_offset + count > *read_count {
401                panic!(
402                    "write count ({count}) must be less than or equal to read count ({read_count})"
403                )
404            } else {
405                *guest_offset += count;
406            }
407        }
408    }
409}
410
411/// Represents the state of a `Stream{Producer,Consumer}`.
412#[derive(Copy, Clone, Debug)]
413pub enum StreamResult {
414    /// The operation completed normally, and the producer or consumer may be
415    /// able to produce or consume more items, respectively.
416    Completed,
417    /// The operation was interrupted (i.e. it wrapped up early after receiving
418    /// a `finish` parameter value of true in a call to `poll_produce` or
419    /// `poll_consume`), and the producer or consumer may be able to produce or
420    /// consume more items, respectively.
421    Cancelled,
422    /// The operation completed normally, but the producer or consumer will
423    /// _not_ able to produce or consume more items, respectively.
424    Dropped,
425}
426
427/// Represents the host-owned write end of a stream.
428pub trait StreamProducer<D>: Send + 'static {
429    /// The payload type of this stream.
430    type Item;
431
432    /// The `WriteBuffer` type to use when delivering items.
433    type Buffer: WriteBuffer<Self::Item> + Default;
434
435    /// Handle a host- or guest-initiated read by delivering zero or more items
436    /// to the specified destination.
437    ///
438    /// This will be called whenever the reader starts a read.
439    ///
440    /// If the implementation is able to produce one or more items immediately,
441    /// it should write them to `destination` and return either
442    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
443    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
444    /// any more items.
445    ///
446    /// If the implementation is unable to produce any items immediately, but
447    /// expects to do so later, and `finish` is _false_, it should store the
448    /// waker from `cx` for later and return `Poll::Pending` without writing
449    /// anything to `destination`.  Later, it should alert the waker when either
450    /// the items arrive, the stream has ended, or an error occurs.
451    ///
452    /// If the implementation is unable to produce any items immediately, but
453    /// expects to do so later, and `finish` is _true_, it should, if possible,
454    /// return `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without
455    /// writing anything to `destination`.  However, that might not be possible
456    /// if an earlier call to `poll_produce` kicked off an asynchronous
457    /// operation which needs to be completed (and possibly interrupted)
458    /// gracefully, in which case the implementation may return `Poll::Pending`
459    /// and later alert the waker as described above.  In other words, when
460    /// `finish` is true, the implementation should prioritize returning a
461    /// result to the reader (even if no items can be produced) rather than wait
462    /// indefinitely for at least one item to arrive.
463    ///
464    /// In all of the above cases, the implementation may alternatively choose
465    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
466    /// the guest (if any) to trap and render the component instance (if any)
467    /// unusable.  The implementation should report errors that _are_
468    /// recoverable by other means (e.g. by writing to a `future`) and return
469    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
470    ///
471    /// Note that the implementation should never return `Poll::Pending` after
472    /// writing one or more items to `destination`; if it does, the caller will
473    /// trap as if `Err(_)` was returned.  Conversely, it should only return
474    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without writing any items to
475    /// `destination` if called with `finish` set to true.  If it does so when
476    /// `finish` is false, the caller will trap.  Additionally, it should only
477    /// return `Poll::Ready(Ok(StreamResult::Completed))` after writing at least
478    /// one item to `destination` if it has capacity to accept that item;
479    /// otherwise, the caller will trap.
480    ///
481    /// If more items are written to `destination` than the reader has immediate
482    /// capacity to accept, they will be retained in memory by the caller and
483    /// used to satisfy future reads, in which case `poll_produce` will only be
484    /// called again once all those items have been delivered.
485    ///
486    /// If this function is called with zero capacity
487    /// (i.e. `Destination::remaining` returns `Some(0)`), the implementation
488    /// should either:
489    ///
490    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
491    /// anything if it expects to be able to produce items immediately
492    /// (i.e. without first returning `Poll::Pending`) the next time
493    /// `poll_produce` is called with non-zero capacity _or_ if that cannot be
494    /// reliably determined.
495    ///
496    /// - Return `Poll::Pending` if the next call to `poll_produce` with
497    /// non-zero capacity is likely to also return `Poll::Pending`.
498    ///
499    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
500    /// `Destination::set_buffer` with one more more items.  Note, however, that
501    /// this creates the hazard that the items will never be received by the
502    /// guest if it decides not to do another non-zero-length read before
503    /// closing the stream.  Moreover, if `Self::Item` is e.g. a `Resource<_>`,
504    /// they may end up leaking in that scenario.
505    fn poll_produce<'a>(
506        self: Pin<&mut Self>,
507        cx: &mut Context<'_>,
508        store: StoreContextMut<'a, D>,
509        destination: Destination<'a, Self::Item, Self::Buffer>,
510        finish: bool,
511    ) -> Poll<Result<StreamResult>>;
512
513    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
514    /// be downcast to the specified type.
515    ///
516    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
517    /// to the specified type is guaranteed to succeed.
518    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
519        Err(me)
520    }
521}
522
523impl<T, D> StreamProducer<D> for iter::Empty<T>
524where
525    T: Send + Sync + 'static,
526{
527    type Item = T;
528    type Buffer = Option<Self::Item>;
529
530    fn poll_produce<'a>(
531        self: Pin<&mut Self>,
532        _: &mut Context<'_>,
533        _: StoreContextMut<'a, D>,
534        _: Destination<'a, Self::Item, Self::Buffer>,
535        _: bool,
536    ) -> Poll<Result<StreamResult>> {
537        Poll::Ready(Ok(StreamResult::Dropped))
538    }
539}
540
541impl<T, D> StreamProducer<D> for stream::Empty<T>
542where
543    T: Send + Sync + 'static,
544{
545    type Item = T;
546    type Buffer = Option<Self::Item>;
547
548    fn poll_produce<'a>(
549        self: Pin<&mut Self>,
550        _: &mut Context<'_>,
551        _: StoreContextMut<'a, D>,
552        _: Destination<'a, Self::Item, Self::Buffer>,
553        _: bool,
554    ) -> Poll<Result<StreamResult>> {
555        Poll::Ready(Ok(StreamResult::Dropped))
556    }
557}
558
559impl<T, D> StreamProducer<D> for Vec<T>
560where
561    T: Unpin + Send + Sync + 'static,
562{
563    type Item = T;
564    type Buffer = VecBuffer<T>;
565
566    fn poll_produce<'a>(
567        self: Pin<&mut Self>,
568        _: &mut Context<'_>,
569        _: StoreContextMut<'a, D>,
570        mut dst: Destination<'a, Self::Item, Self::Buffer>,
571        _: bool,
572    ) -> Poll<Result<StreamResult>> {
573        dst.set_buffer(mem::take(self.get_mut()).into());
574        Poll::Ready(Ok(StreamResult::Dropped))
575    }
576}
577
578impl<T, D> StreamProducer<D> for Box<[T]>
579where
580    T: Unpin + Send + Sync + 'static,
581{
582    type Item = T;
583    type Buffer = VecBuffer<T>;
584
585    fn poll_produce<'a>(
586        self: Pin<&mut Self>,
587        _: &mut Context<'_>,
588        _: StoreContextMut<'a, D>,
589        mut dst: Destination<'a, Self::Item, Self::Buffer>,
590        _: bool,
591    ) -> Poll<Result<StreamResult>> {
592        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
593        Poll::Ready(Ok(StreamResult::Dropped))
594    }
595}
596
597#[cfg(feature = "component-model-async-bytes")]
598impl<D> StreamProducer<D> for bytes::Bytes {
599    type Item = u8;
600    type Buffer = Cursor<Self>;
601
602    fn poll_produce<'a>(
603        mut self: Pin<&mut Self>,
604        _: &mut Context<'_>,
605        mut store: StoreContextMut<'a, D>,
606        mut dst: Destination<'a, Self::Item, Self::Buffer>,
607        _: bool,
608    ) -> Poll<Result<StreamResult>> {
609        let cap = dst.remaining(&mut store);
610        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
611            // on 0-length or host reads, buffer the bytes
612            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
613            return Poll::Ready(Ok(StreamResult::Dropped));
614        };
615        let cap = cap.into();
616        // data does not fit in destination, fill it and buffer the rest
617        dst.set_buffer(Cursor::new(self.split_off(cap)));
618        let mut dst = dst.as_direct(store, cap);
619        dst.remaining().copy_from_slice(&self);
620        dst.mark_written(cap);
621        Poll::Ready(Ok(StreamResult::Dropped))
622    }
623}
624
625#[cfg(feature = "component-model-async-bytes")]
626impl<D> StreamProducer<D> for bytes::BytesMut {
627    type Item = u8;
628    type Buffer = Cursor<Self>;
629
630    fn poll_produce<'a>(
631        mut self: Pin<&mut Self>,
632        _: &mut Context<'_>,
633        mut store: StoreContextMut<'a, D>,
634        mut dst: Destination<'a, Self::Item, Self::Buffer>,
635        _: bool,
636    ) -> Poll<Result<StreamResult>> {
637        let cap = dst.remaining(&mut store);
638        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
639            // on 0-length or host reads, buffer the bytes
640            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
641            return Poll::Ready(Ok(StreamResult::Dropped));
642        };
643        let cap = cap.into();
644        // data does not fit in destination, fill it and buffer the rest
645        dst.set_buffer(Cursor::new(self.split_off(cap)));
646        let mut dst = dst.as_direct(store, cap);
647        dst.remaining().copy_from_slice(&self);
648        dst.mark_written(cap);
649        Poll::Ready(Ok(StreamResult::Dropped))
650    }
651}
652
653/// Represents the buffer for a host- or guest-initiated stream write.
654pub struct Source<'a, T> {
655    instance: Option<Instance>,
656    id: TableId<TransmitState>,
657    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
658}
659
660impl<'a, T> Source<'a, T> {
661    /// Reborrow `self` so it can be used again later.
662    pub fn reborrow(&mut self) -> Source<'_, T> {
663        Source {
664            instance: self.instance,
665            id: self.id,
666            host_buffer: self.host_buffer.as_deref_mut(),
667        }
668    }
669
670    /// Accept zero or more items from the writer.
671    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
672    where
673        T: func::Lift + 'static,
674        B: ReadBuffer<T>,
675    {
676        if let Some(input) = &mut self.host_buffer {
677            let count = input.remaining().len().min(buffer.remaining_capacity());
678            buffer.move_from(*input, count);
679        } else {
680            let store = store.as_context_mut();
681            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
682
683            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
684                unreachable!();
685            };
686
687            let &WriteState::GuestReady {
688                ty,
689                address,
690                count,
691                options,
692                ..
693            } = &transmit.write
694            else {
695                unreachable!()
696            };
697
698            let cx =
699                &mut LiftContext::new(store.0.store_opaque_mut(), &options, self.instance.unwrap());
700            let ty = payload(ty, cx.types);
701            let old_remaining = buffer.remaining_capacity();
702            lift::<T, B, S::Data>(
703                cx,
704                ty,
705                buffer,
706                address + (T::SIZE32 * guest_offset),
707                count - guest_offset,
708            )?;
709
710            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
711
712            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
713                unreachable!();
714            };
715
716            *guest_offset += old_remaining - buffer.remaining_capacity();
717        }
718
719        Ok(())
720    }
721
722    /// Return the number of items remaining to be read from the current write
723    /// operation.
724    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
725    where
726        T: 'static,
727    {
728        let transmit = store
729            .as_context_mut()
730            .0
731            .concurrent_state_mut()
732            .get_mut(self.id)
733            .unwrap();
734
735        if let &WriteState::GuestReady { count, .. } = &transmit.write {
736            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
737                unreachable!()
738            };
739
740            count - guest_offset
741        } else if let Some(host_buffer) = &self.host_buffer {
742            host_buffer.remaining().len()
743        } else {
744            unreachable!()
745        }
746    }
747}
748
749impl<'a> Source<'a, u8> {
750    /// Return a `DirectSource` view of `self`.
751    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
752        DirectSource {
753            id: self.id,
754            host_buffer: self.host_buffer,
755            store,
756        }
757    }
758}
759
760/// Represents a write to a `stream<u8>`, providing direct access to the
761/// writer's buffer.
762pub struct DirectSource<'a, D: 'static> {
763    id: TableId<TransmitState>,
764    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
765    store: StoreContextMut<'a, D>,
766}
767
768impl<D: 'static> DirectSource<'_, D> {
769    /// Provide direct access to the writer's buffer.
770    pub fn remaining(&mut self) -> &[u8] {
771        if let Some(buffer) = self.host_buffer.as_deref_mut() {
772            buffer.remaining()
773        } else {
774            let transmit = self
775                .store
776                .as_context_mut()
777                .0
778                .concurrent_state_mut()
779                .get_mut(self.id)
780                .unwrap();
781
782            let &WriteState::GuestReady {
783                address,
784                count,
785                options,
786                ..
787            } = &transmit.write
788            else {
789                unreachable!()
790            };
791
792            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
793                unreachable!()
794            };
795
796            options
797                .memory(self.store.0)
798                .get((address + guest_offset)..)
799                .and_then(|b| b.get(..(count - guest_offset)))
800                .unwrap()
801        }
802    }
803
804    /// Mark the specified number of bytes as read from the writer's buffer.
805    ///
806    /// This will panic if the count is larger than the size of the buffer
807    /// returned by `Self::remaining`.
808    pub fn mark_read(&mut self, count: usize) {
809        if let Some(buffer) = self.host_buffer.as_deref_mut() {
810            buffer.skip(count);
811        } else {
812            let transmit = self
813                .store
814                .as_context_mut()
815                .0
816                .concurrent_state_mut()
817                .get_mut(self.id)
818                .unwrap();
819
820            let WriteState::GuestReady {
821                count: write_count, ..
822            } = &transmit.write
823            else {
824                unreachable!()
825            };
826
827            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
828                unreachable!()
829            };
830
831            if *guest_offset + count > *write_count {
832                panic!(
833                    "read count ({count}) must be less than or equal to write count ({write_count})"
834                )
835            } else {
836                *guest_offset += count;
837            }
838        }
839    }
840}
841
842/// Represents the host-owned read end of a stream.
843pub trait StreamConsumer<D>: Send + 'static {
844    /// The payload type of this stream.
845    type Item;
846
847    /// Handle a host- or guest-initiated write by accepting zero or more items
848    /// from the specified source.
849    ///
850    /// This will be called whenever the writer starts a write.
851    ///
852    /// If the implementation is able to consume one or more items immediately,
853    /// it should take them from `source` and return either
854    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
855    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
856    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
857    /// indicate that the caller should delay sending a `COMPLETED` event to the
858    /// writer until a later call to this function returns `Poll::Ready(_)`.
859    /// For more about that, see the `Backpressure` section below.
860    ///
861    /// If the implementation cannot consume any items immediately and `finish`
862    /// is _false_, it should store the waker from `cx` for later and return
863    /// `Poll::Pending` without writing anything to `destination`.  Later, it
864    /// should alert the waker when either (1) the items arrive, (2) the stream
865    /// has ended, or (3) an error occurs.
866    ///
867    /// If the implementation cannot consume any items immediately and `finish`
868    /// is _true_, it should, if possible, return
869    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
870    /// anything from `source`.  However, that might not be possible if an
871    /// earlier call to `poll_consume` kicked off an asynchronous operation
872    /// which needs to be completed (and possibly interrupted) gracefully, in
873    /// which case the implementation may return `Poll::Pending` and later alert
874    /// the waker as described above.  In other words, when `finish` is true,
875    /// the implementation should prioritize returning a result to the reader
876    /// (even if no items can be consumed) rather than wait indefinitely for at
877    /// capacity to free up.
878    ///
879    /// In all of the above cases, the implementation may alternatively choose
880    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
881    /// the guest (if any) to trap and render the component instance (if any)
882    /// unusable.  The implementation should report errors that _are_
883    /// recoverable by other means (e.g. by writing to a `future`) and return
884    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
885    ///
886    /// Note that the implementation should only return
887    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
888    /// items from `source` if called with `finish` set to true.  If it does so
889    /// when `finish` is false, the caller will trap.  Additionally, it should
890    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
891    /// least one item from `source` if there is an item available; otherwise,
892    /// the caller will trap.  If `poll_consume` is called with no items in
893    /// `source`, it should only return `Poll::Ready(_)` once it is able to
894    /// accept at least one item during the next call to `poll_consume`.
895    ///
896    /// Note that any items which the implementation of this trait takes from
897    /// `source` become the responsibility of that implementation.  For that
898    /// reason, an implementation which forwards items to an upstream sink
899    /// should reserve capacity in that sink before taking items out of
900    /// `source`, if possible.  Alternatively, it might buffer items which can't
901    /// be forwarded immediately and send them once capacity is freed up.
902    ///
903    /// ## Backpressure
904    ///
905    /// As mentioned above, an implementation might choose to return
906    /// `Poll::Pending` after taking items from `source`, which tells the caller
907    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
908    /// a form of backpressure when the items are forwarded to an upstream sink
909    /// asynchronously.  Note, however, that it's not possible to "put back"
910    /// items into `source` once they've been taken out, so if the upstream sink
911    /// is unable to accept all the items, that cannot be communicated to the
912    /// writer at this level of abstraction.  Just as with application-specific,
913    /// recoverable errors, information about which items could be forwarded and
914    /// which could not must be communicated out-of-band, e.g. by writing to an
915    /// application-specific `future`.
916    ///
917    /// Similarly, if the writer cancels the write after items have been taken
918    /// from `source` but before the items have all been forwarded to an
919    /// upstream sink, `poll_consume` will be called with `finish` set to true,
920    /// and the implementation may either:
921    ///
922    /// - Interrupt the forwarding process gracefully.  This may be preferable
923    /// if there is an out-of-band channel for communicating to the writer how
924    /// many items were forwarded before being interrupted.
925    ///
926    /// - Allow the forwarding to complete without interrupting it.  This is
927    /// usually preferable if there's no out-of-band channel for reporting back
928    /// to the writer how many items were forwarded.
929    fn poll_consume(
930        self: Pin<&mut Self>,
931        cx: &mut Context<'_>,
932        store: StoreContextMut<D>,
933        source: Source<'_, Self::Item>,
934        finish: bool,
935    ) -> Poll<Result<StreamResult>>;
936}
937
938/// Represents a host-owned write end of a future.
939pub trait FutureProducer<D>: Send + 'static {
940    /// The payload type of this future.
941    type Item;
942
943    /// Handle a host- or guest-initiated read by producing a value.
944    ///
945    /// This is equivalent to `StreamProducer::poll_produce`, but with a
946    /// simplified interface for futures.
947    ///
948    /// If `finish` is true, the implementation may return
949    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
950    /// could produce a value.  Otherwise, it must either return
951    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
952    fn poll_produce(
953        self: Pin<&mut Self>,
954        cx: &mut Context<'_>,
955        store: StoreContextMut<D>,
956        finish: bool,
957    ) -> Poll<Result<Option<Self::Item>>>;
958}
959
960impl<T, E, D, Fut> FutureProducer<D> for Fut
961where
962    E: Into<Error>,
963    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
964{
965    type Item = T;
966
967    fn poll_produce<'a>(
968        self: Pin<&mut Self>,
969        cx: &mut Context<'_>,
970        _: StoreContextMut<'a, D>,
971        finish: bool,
972    ) -> Poll<Result<Option<T>>> {
973        match self.poll(cx) {
974            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
975            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
976            Poll::Pending if finish => Poll::Ready(Ok(None)),
977            Poll::Pending => Poll::Pending,
978        }
979    }
980}
981
982/// Represents a host-owned read end of a future.
983pub trait FutureConsumer<D>: Send + 'static {
984    /// The payload type of this future.
985    type Item;
986
987    /// Handle a host- or guest-initiated write by consuming a value.
988    ///
989    /// This is equivalent to `StreamProducer::poll_produce`, but with a
990    /// simplified interface for futures.
991    ///
992    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
993    /// without taking the item from `source`, which indicates the operation was
994    /// canceled before it could consume the value.  Otherwise, it must either
995    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
996    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
997    /// the item).
998    fn poll_consume(
999        self: Pin<&mut Self>,
1000        cx: &mut Context<'_>,
1001        store: StoreContextMut<D>,
1002        source: Source<'_, Self::Item>,
1003        finish: bool,
1004    ) -> Poll<Result<()>>;
1005}
1006
1007/// Represents the readable end of a Component Model `future`.
1008///
1009/// Note that `FutureReader` instances must be disposed of using either `pipe`
1010/// or `close`; otherwise the in-store representation will leak and the writer
1011/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1012/// ensure that disposal happens automatically.
1013pub struct FutureReader<T> {
1014    id: TableId<TransmitHandle>,
1015    _phantom: PhantomData<T>,
1016}
1017
1018impl<T> FutureReader<T> {
1019    /// Create a new future with the specified producer.
1020    pub fn new<S: AsContextMut>(
1021        mut store: S,
1022        producer: impl FutureProducer<S::Data, Item = T>,
1023    ) -> Self
1024    where
1025        T: func::Lower + func::Lift + Send + Sync + 'static,
1026    {
1027        struct Producer<P>(P);
1028
1029        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1030            for Producer<P>
1031        {
1032            type Item = P::Item;
1033            type Buffer = Option<P::Item>;
1034
1035            fn poll_produce<'a>(
1036                self: Pin<&mut Self>,
1037                cx: &mut Context<'_>,
1038                store: StoreContextMut<D>,
1039                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1040                finish: bool,
1041            ) -> Poll<Result<StreamResult>> {
1042                // SAFETY: This is a standard pin-projection, and we never move
1043                // out of `self`.
1044                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1045
1046                Poll::Ready(Ok(
1047                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1048                        destination.set_buffer(Some(value));
1049
1050                        // Here we return `StreamResult::Completed` even though
1051                        // we've produced the last item we'll ever produce.
1052                        // That's because the ABI expects
1053                        // `ReturnCode::Completed(1)` rather than
1054                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1055                        // called again since the future will have resolved.
1056                        StreamResult::Completed
1057                    } else {
1058                        StreamResult::Cancelled
1059                    },
1060                ))
1061            }
1062        }
1063
1064        Self::new_(
1065            store
1066                .as_context_mut()
1067                .new_transmit(TransmitKind::Future, Producer(producer)),
1068        )
1069    }
1070
1071    fn new_(id: TableId<TransmitHandle>) -> Self {
1072        Self {
1073            id,
1074            _phantom: PhantomData,
1075        }
1076    }
1077
1078    /// Set the consumer that accepts the result of this future.
1079    pub fn pipe<S: AsContextMut>(
1080        self,
1081        mut store: S,
1082        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1083    ) where
1084        T: func::Lift + 'static,
1085    {
1086        struct Consumer<C>(C);
1087
1088        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1089            for Consumer<C>
1090        {
1091            type Item = T;
1092
1093            fn poll_consume(
1094                self: Pin<&mut Self>,
1095                cx: &mut Context<'_>,
1096                mut store: StoreContextMut<D>,
1097                mut source: Source<Self::Item>,
1098                finish: bool,
1099            ) -> Poll<Result<StreamResult>> {
1100                // SAFETY: This is a standard pin-projection, and we never move
1101                // out of `self`.
1102                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1103
1104                ready!(consumer.poll_consume(
1105                    cx,
1106                    store.as_context_mut(),
1107                    source.reborrow(),
1108                    finish
1109                ))?;
1110
1111                Poll::Ready(Ok(if source.remaining(store) == 0 {
1112                    // Here we return `StreamResult::Completed` even though
1113                    // we've consumed the last item we'll ever consume.  That's
1114                    // because the ABI expects `ReturnCode::Completed(1)` rather
1115                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1116                    // called again since the future will have resolved.
1117                    StreamResult::Completed
1118                } else {
1119                    StreamResult::Cancelled
1120                }))
1121            }
1122        }
1123
1124        store
1125            .as_context_mut()
1126            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1127    }
1128
1129    /// Convert this `FutureReader` into a [`Val`].
1130    // See TODO comment for `FutureAny`; this is prone to handle leakage.
1131    pub fn into_val(self) -> Val {
1132        Val::Future(FutureAny(self.id.rep()))
1133    }
1134
1135    /// Attempt to convert the specified [`Val`] to a `FutureReader`.
1136    pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1137        let Val::Future(FutureAny(rep)) = value else {
1138            bail!("expected `future`; got `{}`", value.desc());
1139        };
1140        let store = store.as_context_mut();
1141        let id = TableId::<TransmitHandle>::new(*rep);
1142        store.0.concurrent_state_mut().get_mut(id)?; // Just make sure it's present
1143        Ok(Self::new_(id))
1144    }
1145
1146    /// Transfer ownership of the read end of a future from a guest to the host.
1147    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1148        match ty {
1149            InterfaceType::Future(src) => {
1150                let handle_table = cx
1151                    .instance_mut()
1152                    .table_for_transmit(TransmitIndex::Future(src));
1153                let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1154                if is_done {
1155                    bail!("cannot lift future after being notified that the writable end dropped");
1156                }
1157                let id = TableId::<TransmitHandle>::new(rep);
1158                let concurrent_state = cx.concurrent_state_mut();
1159                let future = concurrent_state.get_mut(id)?;
1160                future.common.handle = None;
1161                let state = future.state;
1162
1163                if concurrent_state.get_mut(state)?.done {
1164                    bail!("cannot lift future after previous read succeeded");
1165                }
1166
1167                Ok(Self::new_(id))
1168            }
1169            _ => func::bad_type_info(),
1170        }
1171    }
1172
1173    /// Close this `FutureReader`, writing the default value.
1174    ///
1175    /// # Panics
1176    ///
1177    /// Panics if the store that the [`Accessor`] is derived from does not own
1178    /// this future. Usage of this future after calling `close` will also cause
1179    /// a panic.
1180    pub fn close(&mut self, mut store: impl AsContextMut) {
1181        // `self` should never be used again, but leave an invalid handle there just in case.
1182        let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1183        store
1184            .as_context_mut()
1185            .0
1186            .host_drop_reader(id, TransmitKind::Future)
1187            .unwrap();
1188    }
1189
1190    /// Convenience method around [`Self::close`].
1191    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1192        accessor.as_accessor().with(|access| self.close(access))
1193    }
1194
1195    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1196    /// drop and clean it up from the store.
1197    ///
1198    /// Note that the `accessor` provided must own this future and is
1199    /// additionally transferred to the `GuardedFutureReader` return value.
1200    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1201    where
1202        A: AsAccessor,
1203    {
1204        GuardedFutureReader::new(accessor, self)
1205    }
1206}
1207
1208impl<T> fmt::Debug for FutureReader<T> {
1209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1210        f.debug_struct("FutureReader")
1211            .field("id", &self.id)
1212            .finish()
1213    }
1214}
1215
1216/// Transfer ownership of the read end of a future from the host to a guest.
1217pub(crate) fn lower_future_to_index<U>(
1218    rep: u32,
1219    cx: &mut LowerContext<'_, U>,
1220    ty: InterfaceType,
1221) -> Result<u32> {
1222    match ty {
1223        InterfaceType::Future(dst) => {
1224            let concurrent_state = cx.store.0.concurrent_state_mut();
1225            let id = TableId::<TransmitHandle>::new(rep);
1226            let state = concurrent_state.get_mut(id)?.state;
1227            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1228
1229            let handle = cx
1230                .instance_mut()
1231                .table_for_transmit(TransmitIndex::Future(dst))
1232                .future_insert_read(dst, rep)?;
1233
1234            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1235
1236            Ok(handle)
1237        }
1238        _ => func::bad_type_info(),
1239    }
1240}
1241
1242// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1243// safe and correct since we lift and lower future handles as `u32`s.
1244unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1245    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1246
1247    type Lower = <u32 as func::ComponentType>::Lower;
1248
1249    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1250        match ty {
1251            InterfaceType::Future(_) => Ok(()),
1252            other => bail!("expected `future`, found `{}`", func::desc(other)),
1253        }
1254    }
1255}
1256
1257// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1258unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1259    fn linear_lower_to_flat<U>(
1260        &self,
1261        cx: &mut LowerContext<'_, U>,
1262        ty: InterfaceType,
1263        dst: &mut MaybeUninit<Self::Lower>,
1264    ) -> Result<()> {
1265        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1266            cx,
1267            InterfaceType::U32,
1268            dst,
1269        )
1270    }
1271
1272    fn linear_lower_to_memory<U>(
1273        &self,
1274        cx: &mut LowerContext<'_, U>,
1275        ty: InterfaceType,
1276        offset: usize,
1277    ) -> Result<()> {
1278        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1279            cx,
1280            InterfaceType::U32,
1281            offset,
1282        )
1283    }
1284}
1285
1286// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1287unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1288    fn linear_lift_from_flat(
1289        cx: &mut LiftContext<'_>,
1290        ty: InterfaceType,
1291        src: &Self::Lower,
1292    ) -> Result<Self> {
1293        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1294        Self::lift_from_index(cx, ty, index)
1295    }
1296
1297    fn linear_lift_from_memory(
1298        cx: &mut LiftContext<'_>,
1299        ty: InterfaceType,
1300        bytes: &[u8],
1301    ) -> Result<Self> {
1302        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1303        Self::lift_from_index(cx, ty, index)
1304    }
1305}
1306
1307/// A [`FutureReader`] paired with an [`Accessor`].
1308///
1309/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1310/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1311/// [`FutureReader::guard`].
1312pub struct GuardedFutureReader<T, A>
1313where
1314    A: AsAccessor,
1315{
1316    // This field is `None` to implement the conversion from this guard back to
1317    // `FutureReader`. When `None` is seen in the destructor it will cause the
1318    // destructor to do nothing.
1319    reader: Option<FutureReader<T>>,
1320    accessor: A,
1321}
1322
1323impl<T, A> GuardedFutureReader<T, A>
1324where
1325    A: AsAccessor,
1326{
1327    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1328    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1329        Self {
1330            reader: Some(reader),
1331            accessor,
1332        }
1333    }
1334
1335    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1336    /// back.
1337    pub fn into_future(self) -> FutureReader<T> {
1338        self.into()
1339    }
1340}
1341
1342impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1343where
1344    A: AsAccessor,
1345{
1346    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1347        guard.reader.take().unwrap()
1348    }
1349}
1350
1351impl<T, A> Drop for GuardedFutureReader<T, A>
1352where
1353    A: AsAccessor,
1354{
1355    fn drop(&mut self) {
1356        if let Some(reader) = &mut self.reader {
1357            reader.close_with(&self.accessor)
1358        }
1359    }
1360}
1361
1362/// Represents the readable end of a Component Model `stream`.
1363///
1364/// Note that `StreamReader` instances must be disposed of using `close`;
1365/// otherwise the in-store representation will leak and the writer end will hang
1366/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1367/// disposal happens automatically.
1368pub struct StreamReader<T> {
1369    id: TableId<TransmitHandle>,
1370    _phantom: PhantomData<T>,
1371}
1372
1373impl<T> StreamReader<T> {
1374    /// Create a new stream with the specified producer.
1375    pub fn new<S: AsContextMut>(
1376        mut store: S,
1377        producer: impl StreamProducer<S::Data, Item = T>,
1378    ) -> Self
1379    where
1380        T: func::Lower + func::Lift + Send + Sync + 'static,
1381    {
1382        Self::new_(
1383            store
1384                .as_context_mut()
1385                .new_transmit(TransmitKind::Stream, producer),
1386        )
1387    }
1388
1389    fn new_(id: TableId<TransmitHandle>) -> Self {
1390        Self {
1391            id,
1392            _phantom: PhantomData,
1393        }
1394    }
1395
1396    /// Attempt to consume this object by converting it into the specified type.
1397    ///
1398    /// This can be useful for "short-circuiting" host-to-host streams,
1399    /// bypassing the guest entirely.  For example, if a guest task returns a
1400    /// host-created stream and then exits, this function may be used to
1401    /// retrieve the write end, after which the guest instance and store may be
1402    /// disposed of if no longer needed.
1403    ///
1404    /// This will return `Ok(_)` if and only if the following conditions are
1405    /// met:
1406    ///
1407    /// - The stream was created by the host (i.e. not by the guest).
1408    ///
1409    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1410    /// producer provided to `StreamReader::new` when the stream was created,
1411    /// along with `TypeId::of::<V>()`.
1412    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1413        let store = store.as_context_mut();
1414        let state = store.0.concurrent_state_mut();
1415        let id = state.get_mut(self.id).unwrap().state;
1416        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1417            match try_into(TypeId::of::<V>()) {
1418                Some(result) => {
1419                    self.close(store);
1420                    Ok(*result.downcast::<V>().unwrap())
1421                }
1422                None => Err(self),
1423            }
1424        } else {
1425            Err(self)
1426        }
1427    }
1428
1429    /// Set the consumer that accepts the items delivered to this stream.
1430    pub fn pipe<S: AsContextMut>(
1431        self,
1432        mut store: S,
1433        consumer: impl StreamConsumer<S::Data, Item = T>,
1434    ) where
1435        T: 'static,
1436    {
1437        store
1438            .as_context_mut()
1439            .set_consumer(self.id, TransmitKind::Stream, consumer);
1440    }
1441
1442    /// Convert this `StreamReader` into a [`Val`].
1443    // See TODO comment for `StreamAny`; this is prone to handle leakage.
1444    pub fn into_val(self) -> Val {
1445        Val::Stream(StreamAny(self.id.rep()))
1446    }
1447
1448    /// Attempt to convert the specified [`Val`] to a `StreamReader`.
1449    pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1450        let Val::Stream(StreamAny(rep)) = value else {
1451            bail!("expected `stream`; got `{}`", value.desc());
1452        };
1453        let store = store.as_context_mut();
1454        let id = TableId::<TransmitHandle>::new(*rep);
1455        store.0.concurrent_state_mut().get_mut(id)?; // Just make sure it's present
1456        Ok(Self::new_(id))
1457    }
1458
1459    /// Transfer ownership of the read end of a stream from a guest to the host.
1460    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1461        match ty {
1462            InterfaceType::Stream(src) => {
1463                let handle_table = cx
1464                    .instance_mut()
1465                    .table_for_transmit(TransmitIndex::Stream(src));
1466                let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1467                if is_done {
1468                    bail!("cannot lift stream after being notified that the writable end dropped");
1469                }
1470                let id = TableId::<TransmitHandle>::new(rep);
1471                cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1472                Ok(Self::new_(id))
1473            }
1474            _ => func::bad_type_info(),
1475        }
1476    }
1477
1478    /// Close this `StreamReader`, writing the default value.
1479    ///
1480    /// # Panics
1481    ///
1482    /// Panics if the store that the [`Accessor`] is derived from does not own
1483    /// this future. Usage of this future after calling `close` will also cause
1484    /// a panic.
1485    pub fn close(&mut self, mut store: impl AsContextMut) {
1486        // `self` should never be used again, but leave an invalid handle there just in case.
1487        let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1488        store
1489            .as_context_mut()
1490            .0
1491            .host_drop_reader(id, TransmitKind::Stream)
1492            .unwrap()
1493    }
1494
1495    /// Convenience method around [`Self::close`].
1496    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1497        accessor.as_accessor().with(|access| self.close(access))
1498    }
1499
1500    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1501    /// drop and clean it up from the store.
1502    ///
1503    /// Note that the `accessor` provided must own this future and is
1504    /// additionally transferred to the `GuardedStreamReader` return value.
1505    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1506    where
1507        A: AsAccessor,
1508    {
1509        GuardedStreamReader::new(accessor, self)
1510    }
1511}
1512
1513impl<T> fmt::Debug for StreamReader<T> {
1514    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1515        f.debug_struct("StreamReader")
1516            .field("id", &self.id)
1517            .finish()
1518    }
1519}
1520
1521/// Transfer ownership of the read end of a stream from the host to a guest.
1522pub(crate) fn lower_stream_to_index<U>(
1523    rep: u32,
1524    cx: &mut LowerContext<'_, U>,
1525    ty: InterfaceType,
1526) -> Result<u32> {
1527    match ty {
1528        InterfaceType::Stream(dst) => {
1529            let concurrent_state = cx.store.0.concurrent_state_mut();
1530            let id = TableId::<TransmitHandle>::new(rep);
1531            let state = concurrent_state.get_mut(id)?.state;
1532            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1533
1534            let handle = cx
1535                .instance_mut()
1536                .table_for_transmit(TransmitIndex::Stream(dst))
1537                .stream_insert_read(dst, rep)?;
1538
1539            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1540
1541            Ok(handle)
1542        }
1543        _ => func::bad_type_info(),
1544    }
1545}
1546
1547// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1548// safe and correct since we lift and lower stream handles as `u32`s.
1549unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1550    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1551
1552    type Lower = <u32 as func::ComponentType>::Lower;
1553
1554    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1555        match ty {
1556            InterfaceType::Stream(_) => Ok(()),
1557            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1558        }
1559    }
1560}
1561
1562// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1563unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1564    fn linear_lower_to_flat<U>(
1565        &self,
1566        cx: &mut LowerContext<'_, U>,
1567        ty: InterfaceType,
1568        dst: &mut MaybeUninit<Self::Lower>,
1569    ) -> Result<()> {
1570        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1571            cx,
1572            InterfaceType::U32,
1573            dst,
1574        )
1575    }
1576
1577    fn linear_lower_to_memory<U>(
1578        &self,
1579        cx: &mut LowerContext<'_, U>,
1580        ty: InterfaceType,
1581        offset: usize,
1582    ) -> Result<()> {
1583        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1584            cx,
1585            InterfaceType::U32,
1586            offset,
1587        )
1588    }
1589}
1590
1591// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1592unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1593    fn linear_lift_from_flat(
1594        cx: &mut LiftContext<'_>,
1595        ty: InterfaceType,
1596        src: &Self::Lower,
1597    ) -> Result<Self> {
1598        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1599        Self::lift_from_index(cx, ty, index)
1600    }
1601
1602    fn linear_lift_from_memory(
1603        cx: &mut LiftContext<'_>,
1604        ty: InterfaceType,
1605        bytes: &[u8],
1606    ) -> Result<Self> {
1607        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1608        Self::lift_from_index(cx, ty, index)
1609    }
1610}
1611
1612/// A [`StreamReader`] paired with an [`Accessor`].
1613///
1614/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1615/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1616/// [`StreamReader::guard`].
1617pub struct GuardedStreamReader<T, A>
1618where
1619    A: AsAccessor,
1620{
1621    // This field is `None` to implement the conversion from this guard back to
1622    // `StreamReader`. When `None` is seen in the destructor it will cause the
1623    // destructor to do nothing.
1624    reader: Option<StreamReader<T>>,
1625    accessor: A,
1626}
1627
1628impl<T, A> GuardedStreamReader<T, A>
1629where
1630    A: AsAccessor,
1631{
1632    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1633    /// `reader`.
1634    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1635        Self {
1636            reader: Some(reader),
1637            accessor,
1638        }
1639    }
1640
1641    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1642    /// back.
1643    pub fn into_stream(self) -> StreamReader<T> {
1644        self.into()
1645    }
1646}
1647
1648impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1649where
1650    A: AsAccessor,
1651{
1652    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1653        guard.reader.take().unwrap()
1654    }
1655}
1656
1657impl<T, A> Drop for GuardedStreamReader<T, A>
1658where
1659    A: AsAccessor,
1660{
1661    fn drop(&mut self) {
1662        if let Some(reader) = &mut self.reader {
1663            reader.close_with(&self.accessor)
1664        }
1665    }
1666}
1667
1668/// Represents a Component Model `error-context`.
1669pub struct ErrorContext {
1670    rep: u32,
1671}
1672
1673impl ErrorContext {
1674    pub(crate) fn new(rep: u32) -> Self {
1675        Self { rep }
1676    }
1677
1678    /// Convert this `ErrorContext` into a [`Val`].
1679    pub fn into_val(self) -> Val {
1680        Val::ErrorContext(ErrorContextAny(self.rep))
1681    }
1682
1683    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1684    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1685        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1686            bail!("expected `error-context`; got `{}`", value.desc());
1687        };
1688        Ok(Self::new(*rep))
1689    }
1690
1691    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1692        match ty {
1693            InterfaceType::ErrorContext(src) => {
1694                let rep = cx
1695                    .instance_mut()
1696                    .table_for_error_context(src)
1697                    .error_context_rep(index)?;
1698
1699                Ok(Self { rep })
1700            }
1701            _ => func::bad_type_info(),
1702        }
1703    }
1704}
1705
1706pub(crate) fn lower_error_context_to_index<U>(
1707    rep: u32,
1708    cx: &mut LowerContext<'_, U>,
1709    ty: InterfaceType,
1710) -> Result<u32> {
1711    match ty {
1712        InterfaceType::ErrorContext(dst) => {
1713            let tbl = cx.instance_mut().table_for_error_context(dst);
1714            tbl.error_context_insert(rep)
1715        }
1716        _ => func::bad_type_info(),
1717    }
1718}
1719// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1720// safe and correct since we lift and lower future handles as `u32`s.
1721unsafe impl func::ComponentType for ErrorContext {
1722    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1723
1724    type Lower = <u32 as func::ComponentType>::Lower;
1725
1726    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1727        match ty {
1728            InterfaceType::ErrorContext(_) => Ok(()),
1729            other => bail!("expected `error`, found `{}`", func::desc(other)),
1730        }
1731    }
1732}
1733
1734// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1735unsafe impl func::Lower for ErrorContext {
1736    fn linear_lower_to_flat<T>(
1737        &self,
1738        cx: &mut LowerContext<'_, T>,
1739        ty: InterfaceType,
1740        dst: &mut MaybeUninit<Self::Lower>,
1741    ) -> Result<()> {
1742        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1743            cx,
1744            InterfaceType::U32,
1745            dst,
1746        )
1747    }
1748
1749    fn linear_lower_to_memory<T>(
1750        &self,
1751        cx: &mut LowerContext<'_, T>,
1752        ty: InterfaceType,
1753        offset: usize,
1754    ) -> Result<()> {
1755        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1756            cx,
1757            InterfaceType::U32,
1758            offset,
1759        )
1760    }
1761}
1762
1763// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1764unsafe impl func::Lift for ErrorContext {
1765    fn linear_lift_from_flat(
1766        cx: &mut LiftContext<'_>,
1767        ty: InterfaceType,
1768        src: &Self::Lower,
1769    ) -> Result<Self> {
1770        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1771        Self::lift_from_index(cx, ty, index)
1772    }
1773
1774    fn linear_lift_from_memory(
1775        cx: &mut LiftContext<'_>,
1776        ty: InterfaceType,
1777        bytes: &[u8],
1778    ) -> Result<Self> {
1779        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1780        Self::lift_from_index(cx, ty, index)
1781    }
1782}
1783
1784/// Represents the read or write end of a stream or future.
1785pub(super) struct TransmitHandle {
1786    pub(super) common: WaitableCommon,
1787    /// See `TransmitState`
1788    state: TableId<TransmitState>,
1789}
1790
1791impl TransmitHandle {
1792    fn new(state: TableId<TransmitState>) -> Self {
1793        Self {
1794            common: WaitableCommon::default(),
1795            state,
1796        }
1797    }
1798}
1799
1800impl TableDebug for TransmitHandle {
1801    fn type_name() -> &'static str {
1802        "TransmitHandle"
1803    }
1804}
1805
1806/// Represents the state of a stream or future.
1807struct TransmitState {
1808    /// The write end of the stream or future.
1809    write_handle: TableId<TransmitHandle>,
1810    /// The read end of the stream or future.
1811    read_handle: TableId<TransmitHandle>,
1812    /// See `WriteState`
1813    write: WriteState,
1814    /// See `ReadState`
1815    read: ReadState,
1816    /// Whether futher values may be transmitted via this stream or future.
1817    done: bool,
1818}
1819
1820impl Default for TransmitState {
1821    fn default() -> Self {
1822        Self {
1823            write_handle: TableId::new(u32::MAX),
1824            read_handle: TableId::new(u32::MAX),
1825            read: ReadState::Open,
1826            write: WriteState::Open,
1827            done: false,
1828        }
1829    }
1830}
1831
1832impl TableDebug for TransmitState {
1833    fn type_name() -> &'static str {
1834        "TransmitState"
1835    }
1836}
1837
1838type PollStream = Box<
1839    dyn Fn(Option<Instance>) -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>
1840        + Send
1841        + Sync,
1842>;
1843
1844type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
1845
1846/// Represents the state of the write end of a stream or future.
1847enum WriteState {
1848    /// The write end is open, but no write is pending.
1849    Open,
1850    /// The write end is owned by a guest task and a write is pending.
1851    GuestReady {
1852        instance: Instance,
1853        ty: TransmitIndex,
1854        flat_abi: Option<FlatAbi>,
1855        options: Options,
1856        address: usize,
1857        count: usize,
1858        handle: u32,
1859    },
1860    /// The write end is owned by the host, which is ready to produce items.
1861    HostReady {
1862        produce: PollStream,
1863        try_into: TryInto,
1864        guest_offset: usize,
1865        cancel: bool,
1866        cancel_waker: Option<Waker>,
1867    },
1868    /// The write end has been dropped.
1869    Dropped,
1870}
1871
1872impl fmt::Debug for WriteState {
1873    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1874        match self {
1875            Self::Open => f.debug_tuple("Open").finish(),
1876            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1877            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1878            Self::Dropped => f.debug_tuple("Dropped").finish(),
1879        }
1880    }
1881}
1882
1883/// Represents the state of the read end of a stream or future.
1884enum ReadState {
1885    /// The read end is open, but no read is pending.
1886    Open,
1887    /// The read end is owned by a guest task and a read is pending.
1888    GuestReady {
1889        ty: TransmitIndex,
1890        flat_abi: Option<FlatAbi>,
1891        options: Options,
1892        address: usize,
1893        count: usize,
1894        handle: u32,
1895    },
1896    /// The read end is owned by a host task, and it is ready to consume items.
1897    HostReady {
1898        consume: PollStream,
1899        guest_offset: usize,
1900        cancel: bool,
1901        cancel_waker: Option<Waker>,
1902    },
1903    /// Both the read and write ends are owned by the host.
1904    HostToHost {
1905        accept: Box<
1906            dyn for<'a> Fn(
1907                    &'a mut UntypedWriteBuffer<'a>,
1908                )
1909                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1910                + Send
1911                + Sync,
1912        >,
1913        buffer: Vec<u8>,
1914        limit: usize,
1915    },
1916    /// The read end has been dropped.
1917    Dropped,
1918}
1919
1920impl fmt::Debug for ReadState {
1921    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1922        match self {
1923            Self::Open => f.debug_tuple("Open").finish(),
1924            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1925            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1926            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1927            Self::Dropped => f.debug_tuple("Dropped").finish(),
1928        }
1929    }
1930}
1931
1932fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
1933    let count = guest_offset.try_into().unwrap();
1934    match state {
1935        StreamResult::Dropped => ReturnCode::Dropped(count),
1936        StreamResult::Completed => ReturnCode::completed(kind, count),
1937        StreamResult::Cancelled => ReturnCode::Cancelled(count),
1938    }
1939}
1940
1941impl StoreOpaque {
1942    fn pipe_from_guest(
1943        &mut self,
1944        kind: TransmitKind,
1945        id: TableId<TransmitState>,
1946        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
1947    ) {
1948        let future = async move {
1949            let stream_state = future.await?;
1950            tls::get(|store| {
1951                let state = store.concurrent_state_mut();
1952                let transmit = state.get_mut(id)?;
1953                let ReadState::HostReady {
1954                    consume,
1955                    guest_offset,
1956                    ..
1957                } = mem::replace(&mut transmit.read, ReadState::Open)
1958                else {
1959                    unreachable!();
1960                };
1961                let code = return_code(kind, stream_state, guest_offset);
1962                transmit.read = match stream_state {
1963                    StreamResult::Dropped => ReadState::Dropped,
1964                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
1965                        consume,
1966                        guest_offset: 0,
1967                        cancel: false,
1968                        cancel_waker: None,
1969                    },
1970                };
1971                let WriteState::GuestReady { ty, handle, .. } =
1972                    mem::replace(&mut transmit.write, WriteState::Open)
1973                else {
1974                    unreachable!();
1975                };
1976                state.send_write_result(ty, id, handle, code)?;
1977                Ok(())
1978            })
1979        };
1980
1981        self.concurrent_state_mut().push_future(future.boxed());
1982    }
1983
1984    fn pipe_to_guest(
1985        &mut self,
1986        kind: TransmitKind,
1987        id: TableId<TransmitState>,
1988        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
1989    ) {
1990        let future = async move {
1991            let stream_state = future.await?;
1992            tls::get(|store| {
1993                let state = store.concurrent_state_mut();
1994                let transmit = state.get_mut(id)?;
1995                let WriteState::HostReady {
1996                    produce,
1997                    try_into,
1998                    guest_offset,
1999                    ..
2000                } = mem::replace(&mut transmit.write, WriteState::Open)
2001                else {
2002                    unreachable!();
2003                };
2004                let code = return_code(kind, stream_state, guest_offset);
2005                transmit.write = match stream_state {
2006                    StreamResult::Dropped => WriteState::Dropped,
2007                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2008                        produce,
2009                        try_into,
2010                        guest_offset: 0,
2011                        cancel: false,
2012                        cancel_waker: None,
2013                    },
2014                };
2015                let ReadState::GuestReady { ty, handle, .. } =
2016                    mem::replace(&mut transmit.read, ReadState::Open)
2017                else {
2018                    unreachable!();
2019                };
2020                state.send_read_result(ty, id, handle, code)?;
2021                Ok(())
2022            })
2023        };
2024
2025        self.concurrent_state_mut().push_future(future.boxed());
2026    }
2027
2028    /// Drop the read end of a stream or future read from the host.
2029    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2030        let state = self.concurrent_state_mut();
2031        let transmit_id = state.get_mut(id)?.state;
2032        let transmit = state
2033            .get_mut(transmit_id)
2034            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2035        log::trace!(
2036            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2037            transmit.read,
2038            transmit.write
2039        );
2040
2041        transmit.read = ReadState::Dropped;
2042
2043        // If the write end is already dropped, it should stay dropped,
2044        // otherwise, it should be opened.
2045        let new_state = if let WriteState::Dropped = &transmit.write {
2046            WriteState::Dropped
2047        } else {
2048            WriteState::Open
2049        };
2050
2051        let write_handle = transmit.write_handle;
2052
2053        match mem::replace(&mut transmit.write, new_state) {
2054            // If a guest is waiting to write, notify it that the read end has
2055            // been dropped.
2056            WriteState::GuestReady { ty, handle, .. } => {
2057                state.update_event(
2058                    write_handle.rep(),
2059                    match ty {
2060                        TransmitIndex::Future(ty) => Event::FutureWrite {
2061                            code: ReturnCode::Dropped(0),
2062                            pending: Some((ty, handle)),
2063                        },
2064                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2065                            code: ReturnCode::Dropped(0),
2066                            pending: Some((ty, handle)),
2067                        },
2068                    },
2069                )?;
2070            }
2071
2072            WriteState::HostReady { .. } => {}
2073
2074            WriteState::Open => {
2075                state.update_event(
2076                    write_handle.rep(),
2077                    match kind {
2078                        TransmitKind::Future => Event::FutureWrite {
2079                            code: ReturnCode::Dropped(0),
2080                            pending: None,
2081                        },
2082                        TransmitKind::Stream => Event::StreamWrite {
2083                            code: ReturnCode::Dropped(0),
2084                            pending: None,
2085                        },
2086                    },
2087                )?;
2088            }
2089
2090            WriteState::Dropped => {
2091                log::trace!("host_drop_reader delete {transmit_id:?}");
2092                state.delete_transmit(transmit_id)?;
2093            }
2094        }
2095        Ok(())
2096    }
2097
2098    /// Drop the write end of a stream or future read from the host.
2099    fn host_drop_writer(
2100        &mut self,
2101        id: TableId<TransmitHandle>,
2102        on_drop_open: Option<fn() -> Result<()>>,
2103    ) -> Result<()> {
2104        let state = self.concurrent_state_mut();
2105        let transmit_id = state.get_mut(id)?.state;
2106        let transmit = state
2107            .get_mut(transmit_id)
2108            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2109        log::trace!(
2110            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2111            transmit.read,
2112            transmit.write
2113        );
2114
2115        // Existing queued transmits must be updated with information for the impending writer closure
2116        match &mut transmit.write {
2117            WriteState::GuestReady { .. } => {
2118                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2119            }
2120            WriteState::HostReady { .. } => {}
2121            v @ WriteState::Open => {
2122                if let (Some(on_drop_open), false) = (
2123                    on_drop_open,
2124                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2125                ) {
2126                    on_drop_open()?;
2127                } else {
2128                    *v = WriteState::Dropped;
2129                }
2130            }
2131            WriteState::Dropped => unreachable!("write state is already dropped"),
2132        }
2133
2134        let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2135
2136        // If the existing read state is dropped, then there's nothing to read
2137        // and we can keep it that way.
2138        //
2139        // If the read state was any other state, then we must set the new state to open
2140        // to indicate that there *is* data to be read
2141        let new_state = if let ReadState::Dropped = &transmit.read {
2142            ReadState::Dropped
2143        } else {
2144            ReadState::Open
2145        };
2146
2147        let read_handle = transmit.read_handle;
2148
2149        // Swap in the new read state
2150        match mem::replace(&mut transmit.read, new_state) {
2151            // If the guest was ready to read, then we cannot drop the reader (or writer);
2152            // we must deliver the event, and update the state associated with the handle to
2153            // represent that a read must be performed
2154            ReadState::GuestReady { ty, handle, .. } => {
2155                // Ensure the final read of the guest is queued, with appropriate closure indicator
2156                self.concurrent_state_mut().update_event(
2157                    read_handle.rep(),
2158                    match ty {
2159                        TransmitIndex::Future(ty) => Event::FutureRead {
2160                            code: ReturnCode::Dropped(0),
2161                            pending: Some((ty, handle)),
2162                        },
2163                        TransmitIndex::Stream(ty) => Event::StreamRead {
2164                            code: ReturnCode::Dropped(0),
2165                            pending: Some((ty, handle)),
2166                        },
2167                    },
2168                )?;
2169            }
2170
2171            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2172
2173            // If the read state is open, then there are no registered readers of the stream/future
2174            ReadState::Open => {
2175                self.concurrent_state_mut().update_event(
2176                    read_handle.rep(),
2177                    match on_drop_open {
2178                        Some(_) => Event::FutureRead {
2179                            code: ReturnCode::Dropped(0),
2180                            pending: None,
2181                        },
2182                        None => Event::StreamRead {
2183                            code: ReturnCode::Dropped(0),
2184                            pending: None,
2185                        },
2186                    },
2187                )?;
2188            }
2189
2190            // If the read state was already dropped, then we can remove the transmit state completely
2191            // (both writer and reader have been dropped)
2192            ReadState::Dropped => {
2193                log::trace!("host_drop_writer delete {transmit_id:?}");
2194                self.concurrent_state_mut().delete_transmit(transmit_id)?;
2195            }
2196        }
2197        Ok(())
2198    }
2199}
2200
2201impl<T> StoreContextMut<'_, T> {
2202    fn new_transmit<P: StreamProducer<T>>(
2203        mut self,
2204        kind: TransmitKind,
2205        producer: P,
2206    ) -> TableId<TransmitHandle>
2207    where
2208        P::Item: func::Lower,
2209    {
2210        let token = StoreToken::new(self.as_context_mut());
2211        let state = self.0.concurrent_state_mut();
2212        let (_, read) = state.new_transmit().unwrap();
2213        let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2214        let id = state.get_mut(read).unwrap().state;
2215        let produce = Box::new({
2216            let producer = producer.clone();
2217            move |instance| {
2218                let producer = producer.clone();
2219                async move {
2220                    let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2221
2222                    let (result, cancelled) = if buffer.remaining().is_empty() {
2223                        future::poll_fn(|cx| {
2224                        tls::get(|store| {
2225                            let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2226
2227                            let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2228                                unreachable!();
2229                            };
2230
2231                            let mut host_buffer =
2232                                if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2233                                    Some(Cursor::new(mem::take(buffer)))
2234                                } else {
2235                                    None
2236                                };
2237
2238                            let poll = mine.as_mut().poll_produce(
2239                                cx,
2240                                token.as_context_mut(store),
2241                                Destination {
2242                                    id,
2243                                    buffer: &mut buffer,
2244                                    host_buffer: host_buffer.as_mut(),
2245                                    _phantom: PhantomData,
2246                                },
2247                                cancel,
2248                            );
2249
2250                            let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2251
2252                            let host_offset = if let (
2253                                Some(host_buffer),
2254                                ReadState::HostToHost { buffer, limit, .. },
2255                            ) = (host_buffer, &mut transmit.read)
2256                            {
2257                                *limit = usize::try_from(host_buffer.position()).unwrap();
2258                                *buffer = host_buffer.into_inner();
2259                                *limit
2260                            } else {
2261                                0
2262                            };
2263
2264                            {
2265                                let WriteState::HostReady {
2266                                    guest_offset,
2267                                    cancel,
2268                                    cancel_waker,
2269                                    ..
2270                                } = &mut transmit.write
2271                                else {
2272                                    unreachable!();
2273                                };
2274
2275                                if poll.is_pending() {
2276                                    if !buffer.remaining().is_empty()
2277                                        || *guest_offset > 0
2278                                        || host_offset > 0
2279                                    {
2280                                        return Poll::Ready(Err(anyhow!(
2281                                            "StreamProducer::poll_produce returned Poll::Pending \
2282                                             after producing at least one item"
2283                                        )));
2284                                    }
2285                                    *cancel_waker = Some(cx.waker().clone());
2286                                } else {
2287                                    *cancel_waker = None;
2288                                    *cancel = false;
2289                                }
2290                            }
2291
2292                            poll.map(|v| v.map(|result| (result, cancel)))
2293                        })
2294                    })
2295                    .await?
2296                    } else {
2297                        (StreamResult::Completed, false)
2298                    };
2299
2300                    let (guest_offset, host_offset, count) = tls::get(|store| {
2301                        let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2302                        let (count, host_offset) = match &transmit.read {
2303                            &ReadState::GuestReady { count, .. } => (count, 0),
2304                            &ReadState::HostToHost { limit, .. } => (1, limit),
2305                            _ => unreachable!(),
2306                        };
2307                        let guest_offset = match &transmit.write {
2308                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2309                            _ => unreachable!(),
2310                        };
2311                        (guest_offset, host_offset, count)
2312                    });
2313
2314                    match result {
2315                        StreamResult::Completed => {
2316                            if count > 1
2317                                && buffer.remaining().is_empty()
2318                                && guest_offset == 0
2319                                && host_offset == 0
2320                            {
2321                                bail!(
2322                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2323                                     without producing any items"
2324                                );
2325                            }
2326                        }
2327                        StreamResult::Cancelled => {
2328                            if !cancelled {
2329                                bail!(
2330                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2331                                     without being given a `finish` parameter value of true"
2332                                );
2333                            }
2334                        }
2335                        StreamResult::Dropped => {}
2336                    }
2337
2338                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2339
2340                    *producer.lock().unwrap() = Some((mine, buffer));
2341
2342                    if write_buffer {
2343                        write(instance, token, id, producer, kind).await?;
2344                    }
2345
2346                    Ok(result)
2347                }
2348                .boxed()
2349            }
2350        });
2351        let try_into = Box::new(move |ty| {
2352            let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2353            match P::try_into(mine, ty) {
2354                Ok(value) => Some(value),
2355                Err(mine) => {
2356                    *producer.lock().unwrap() = Some((mine, buffer));
2357                    None
2358                }
2359            }
2360        });
2361        state.get_mut(id).unwrap().write = WriteState::HostReady {
2362            produce,
2363            try_into,
2364            guest_offset: 0,
2365            cancel: false,
2366            cancel_waker: None,
2367        };
2368        read
2369    }
2370
2371    fn set_consumer<C: StreamConsumer<T>>(
2372        mut self,
2373        id: TableId<TransmitHandle>,
2374        kind: TransmitKind,
2375        consumer: C,
2376    ) {
2377        let token = StoreToken::new(self.as_context_mut());
2378        let state = self.0.concurrent_state_mut();
2379        let id = state.get_mut(id).unwrap().state;
2380        let transmit = state.get_mut(id).unwrap();
2381        let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2382        let consume_with_buffer = {
2383            let consumer = consumer.clone();
2384            async move |instance, mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2385                let mut mine = consumer.lock().unwrap().take().unwrap();
2386
2387                let host_buffer_remaining_before =
2388                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2389
2390                let (result, cancelled) = future::poll_fn(|cx| {
2391                    tls::get(|store| {
2392                        let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2393                            &ReadState::HostReady { cancel, .. } => cancel,
2394                            ReadState::Open => false,
2395                            _ => unreachable!(),
2396                        };
2397
2398                        let poll = mine.as_mut().poll_consume(
2399                            cx,
2400                            token.as_context_mut(store),
2401                            Source {
2402                                instance,
2403                                id,
2404                                host_buffer: host_buffer.as_deref_mut(),
2405                            },
2406                            cancel,
2407                        );
2408
2409                        if let ReadState::HostReady {
2410                            cancel_waker,
2411                            cancel,
2412                            ..
2413                        } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2414                        {
2415                            if poll.is_pending() {
2416                                *cancel_waker = Some(cx.waker().clone());
2417                            } else {
2418                                *cancel_waker = None;
2419                                *cancel = false;
2420                            }
2421                        }
2422
2423                        poll.map(|v| v.map(|result| (result, cancel)))
2424                    })
2425                })
2426                .await?;
2427
2428                let (guest_offset, count) = tls::get(|store| {
2429                    let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2430                    (
2431                        match &transmit.read {
2432                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2433                            ReadState::Open => 0,
2434                            _ => unreachable!(),
2435                        },
2436                        match &transmit.write {
2437                            &WriteState::GuestReady { count, .. } => count,
2438                            WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2439                            _ => unreachable!(),
2440                        },
2441                    )
2442                });
2443
2444                match result {
2445                    StreamResult::Completed => {
2446                        if count > 0
2447                            && guest_offset == 0
2448                            && host_buffer_remaining_before
2449                                .zip(host_buffer.map(|v| v.remaining().len()))
2450                                .map(|(before, after)| before == after)
2451                                .unwrap_or(false)
2452                        {
2453                            bail!(
2454                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2455                                 without consuming any items"
2456                            );
2457                        }
2458
2459                        if let TransmitKind::Future = kind {
2460                            tls::get(|store| {
2461                                store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2462                            });
2463                        }
2464                    }
2465                    StreamResult::Cancelled => {
2466                        if !cancelled {
2467                            bail!(
2468                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2469                                 without being given a `finish` parameter value of true"
2470                            );
2471                        }
2472                    }
2473                    StreamResult::Dropped => {}
2474                }
2475
2476                *consumer.lock().unwrap() = Some(mine);
2477
2478                Ok(result)
2479            }
2480        };
2481        let consume = {
2482            let consume = consume_with_buffer.clone();
2483            Box::new(move |instance| {
2484                let consume = consume.clone();
2485                async move { consume(instance, None).await }.boxed()
2486            })
2487        };
2488
2489        match &transmit.write {
2490            WriteState::Open => {
2491                transmit.read = ReadState::HostReady {
2492                    consume,
2493                    guest_offset: 0,
2494                    cancel: false,
2495                    cancel_waker: None,
2496                };
2497            }
2498            &WriteState::GuestReady { instance, .. } => {
2499                let future = consume(Some(instance));
2500                transmit.read = ReadState::HostReady {
2501                    consume,
2502                    guest_offset: 0,
2503                    cancel: false,
2504                    cancel_waker: None,
2505                };
2506                self.0.pipe_from_guest(kind, id, future);
2507            }
2508            WriteState::HostReady { .. } => {
2509                let WriteState::HostReady { produce, .. } = mem::replace(
2510                    &mut transmit.write,
2511                    WriteState::HostReady {
2512                        produce: Box::new(|_| unreachable!()),
2513                        try_into: Box::new(|_| unreachable!()),
2514                        guest_offset: 0,
2515                        cancel: false,
2516                        cancel_waker: None,
2517                    },
2518                ) else {
2519                    unreachable!();
2520                };
2521
2522                transmit.read = ReadState::HostToHost {
2523                    accept: Box::new(move |input| {
2524                        let consume = consume_with_buffer.clone();
2525                        async move { consume(None, Some(input.get_mut::<C::Item>())).await }.boxed()
2526                    }),
2527                    buffer: Vec::new(),
2528                    limit: 0,
2529                };
2530
2531                let future = async move {
2532                    loop {
2533                        if tls::get(|store| {
2534                            anyhow::Ok(matches!(
2535                                store.concurrent_state_mut().get_mut(id)?.read,
2536                                ReadState::Dropped
2537                            ))
2538                        })? {
2539                            break Ok(());
2540                        }
2541
2542                        match produce(None).await? {
2543                            StreamResult::Completed | StreamResult::Cancelled => {}
2544                            StreamResult::Dropped => break Ok(()),
2545                        }
2546
2547                        if let TransmitKind::Future = kind {
2548                            break Ok(());
2549                        }
2550                    }
2551                }
2552                .map(move |result| {
2553                    tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2554                    result
2555                });
2556
2557                state.push_future(Box::pin(future));
2558            }
2559            WriteState::Dropped => {
2560                let reader = transmit.read_handle;
2561                self.0.host_drop_reader(reader, kind).unwrap();
2562            }
2563        }
2564    }
2565}
2566
2567async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2568    instance: Option<Instance>,
2569    token: StoreToken<D>,
2570    id: TableId<TransmitState>,
2571    pair: Arc<Mutex<Option<(P, B)>>>,
2572    kind: TransmitKind,
2573) -> Result<()> {
2574    let (read, guest_offset) = tls::get(|store| {
2575        let transmit = store.concurrent_state_mut().get_mut(id)?;
2576
2577        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2578            Some(guest_offset)
2579        } else {
2580            None
2581        };
2582
2583        anyhow::Ok((
2584            mem::replace(&mut transmit.read, ReadState::Open),
2585            guest_offset,
2586        ))
2587    })?;
2588
2589    match read {
2590        ReadState::GuestReady {
2591            ty,
2592            flat_abi,
2593            options,
2594            address,
2595            count,
2596            handle,
2597        } => {
2598            let guest_offset = guest_offset.unwrap();
2599
2600            if let TransmitKind::Future = kind {
2601                tls::get(|store| {
2602                    store.concurrent_state_mut().get_mut(id)?.done = true;
2603                    anyhow::Ok(())
2604                })?;
2605            }
2606
2607            let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2608            let accept = {
2609                let pair = pair.clone();
2610                move |mut store: StoreContextMut<D>| {
2611                    lower::<T, B, D>(
2612                        store.as_context_mut(),
2613                        instance.unwrap(),
2614                        &options,
2615                        ty,
2616                        address + (T::SIZE32 * guest_offset),
2617                        count - guest_offset,
2618                        &mut pair.lock().unwrap().as_mut().unwrap().1,
2619                    )?;
2620                    anyhow::Ok(())
2621                }
2622            };
2623
2624            if guest_offset < count {
2625                if T::MAY_REQUIRE_REALLOC {
2626                    // For payloads which may require a realloc call, use a
2627                    // oneshot::channel and background task.  This is
2628                    // necessary because calling the guest while there are
2629                    // host embedder frames on the stack is unsound.
2630                    let (tx, rx) = oneshot::channel();
2631                    tls::get(move |store| {
2632                        store
2633                            .concurrent_state_mut()
2634                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2635                                move |store| {
2636                                    _ = tx.send(accept(token.as_context_mut(store))?);
2637                                    Ok(())
2638                                },
2639                            ))))
2640                    });
2641                    rx.await?
2642                } else {
2643                    // Optimize flat payloads (i.e. those which do not
2644                    // require calling the guest's realloc function) by
2645                    // lowering directly instead of using a oneshot::channel
2646                    // and background task.
2647                    tls::get(|store| accept(token.as_context_mut(store)))?
2648                };
2649            }
2650
2651            tls::get(|store| {
2652                let count =
2653                    old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2654
2655                let transmit = store.concurrent_state_mut().get_mut(id)?;
2656
2657                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2658                    unreachable!();
2659                };
2660
2661                *guest_offset += count;
2662
2663                transmit.read = ReadState::GuestReady {
2664                    ty,
2665                    flat_abi,
2666                    options,
2667                    address,
2668                    count,
2669                    handle,
2670                };
2671
2672                anyhow::Ok(())
2673            })?;
2674
2675            Ok(())
2676        }
2677
2678        ReadState::HostToHost {
2679            accept,
2680            mut buffer,
2681            limit,
2682        } => {
2683            let mut state = StreamResult::Completed;
2684            let mut position = 0;
2685
2686            while !matches!(state, StreamResult::Dropped) && position < limit {
2687                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2688                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2689                (buffer, position, _) = slice_buffer.into_parts();
2690            }
2691
2692            {
2693                let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2694
2695                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2696                    state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2697                }
2698
2699                *pair.lock().unwrap() = Some((mine, buffer));
2700            }
2701
2702            tls::get(|store| {
2703                store.concurrent_state_mut().get_mut(id)?.read = match state {
2704                    StreamResult::Dropped => ReadState::Dropped,
2705                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2706                        accept,
2707                        buffer,
2708                        limit: 0,
2709                    },
2710                };
2711
2712                anyhow::Ok(())
2713            })?;
2714            Ok(())
2715        }
2716
2717        _ => unreachable!(),
2718    }
2719}
2720
2721impl Instance {
2722    /// Handle a host- or guest-initiated write by delivering the item(s) to the
2723    /// `StreamConsumer` for the specified stream or future.
2724    fn consume(
2725        self,
2726        store: &mut dyn VMStore,
2727        kind: TransmitKind,
2728        transmit_id: TableId<TransmitState>,
2729        consume: PollStream,
2730        guest_offset: usize,
2731        cancel: bool,
2732    ) -> Result<ReturnCode> {
2733        let mut future = consume(Some(self));
2734        store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2735            consume,
2736            guest_offset,
2737            cancel,
2738            cancel_waker: None,
2739        };
2740        let poll = tls::set(store, || {
2741            future
2742                .as_mut()
2743                .poll(&mut Context::from_waker(&Waker::noop()))
2744        });
2745
2746        Ok(match poll {
2747            Poll::Ready(state) => {
2748                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2749                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2750                    unreachable!();
2751                };
2752                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2753                transmit.write = WriteState::Open;
2754                code
2755            }
2756            Poll::Pending => {
2757                store.pipe_from_guest(kind, transmit_id, future);
2758                ReturnCode::Blocked
2759            }
2760        })
2761    }
2762
2763    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
2764    /// for the specified stream or future for items.
2765    fn produce(
2766        self,
2767        store: &mut dyn VMStore,
2768        kind: TransmitKind,
2769        transmit_id: TableId<TransmitState>,
2770        produce: PollStream,
2771        try_into: TryInto,
2772        guest_offset: usize,
2773        cancel: bool,
2774    ) -> Result<ReturnCode> {
2775        let mut future = produce(Some(self));
2776        store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2777            produce,
2778            try_into,
2779            guest_offset,
2780            cancel,
2781            cancel_waker: None,
2782        };
2783        let poll = tls::set(store, || {
2784            future
2785                .as_mut()
2786                .poll(&mut Context::from_waker(&Waker::noop()))
2787        });
2788
2789        Ok(match poll {
2790            Poll::Ready(state) => {
2791                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2792                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2793                    unreachable!();
2794                };
2795                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2796                transmit.read = ReadState::Open;
2797                code
2798            }
2799            Poll::Pending => {
2800                store.pipe_to_guest(kind, transmit_id, future);
2801                ReturnCode::Blocked
2802            }
2803        })
2804    }
2805
2806    /// Drop the writable end of the specified stream or future from the guest.
2807    pub(super) fn guest_drop_writable(
2808        self,
2809        store: &mut StoreOpaque,
2810        ty: TransmitIndex,
2811        writer: u32,
2812    ) -> Result<()> {
2813        let table = self.id().get_mut(store).table_for_transmit(ty);
2814        let transmit_rep = match ty {
2815            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2816            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2817        };
2818
2819        let id = TableId::<TransmitHandle>::new(transmit_rep);
2820        log::trace!("guest_drop_writable: drop writer {id:?}");
2821        match ty {
2822            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
2823            TransmitIndex::Future(_) => store.host_drop_writer(
2824                id,
2825                Some(|| {
2826                    Err(anyhow!(
2827                        "cannot drop future write end without first writing a value"
2828                    ))
2829                }),
2830            ),
2831        }
2832    }
2833
2834    /// Copy `count` items from `read_address` to `write_address` for the
2835    /// specified stream or future.
2836    fn copy<T: 'static>(
2837        self,
2838        mut store: StoreContextMut<T>,
2839        flat_abi: Option<FlatAbi>,
2840        write_ty: TransmitIndex,
2841        write_options: &Options,
2842        write_address: usize,
2843        read_ty: TransmitIndex,
2844        read_options: &Options,
2845        read_address: usize,
2846        count: usize,
2847        rep: u32,
2848    ) -> Result<()> {
2849        let types = self.id().get(store.0).component().types().clone();
2850        match (write_ty, read_ty) {
2851            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2852                assert_eq!(count, 1);
2853
2854                let val = types[types[write_ty].ty]
2855                    .payload
2856                    .map(|ty| {
2857                        let abi = types.canonical_abi(&ty);
2858                        // FIXME: needs to read an i64 for memory64
2859                        if write_address % usize::try_from(abi.align32)? != 0 {
2860                            bail!("write pointer not aligned");
2861                        }
2862
2863                        let lift =
2864                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2865                        let bytes = lift
2866                            .memory()
2867                            .get(write_address..)
2868                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2869                            .ok_or_else(|| {
2870                                anyhow::anyhow!("write pointer out of bounds of memory")
2871                            })?;
2872
2873                        Val::load(lift, ty, bytes)
2874                    })
2875                    .transpose()?;
2876
2877                if let Some(val) = val {
2878                    let lower =
2879                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2880                    let ty = types[types[read_ty].ty].payload.unwrap();
2881                    let ptr = func::validate_inbounds_dynamic(
2882                        types.canonical_abi(&ty),
2883                        lower.as_slice_mut(),
2884                        &ValRaw::u32(read_address.try_into().unwrap()),
2885                    )?;
2886                    val.store(lower, ty, ptr)?;
2887                }
2888            }
2889            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2890                if let Some(flat_abi) = flat_abi {
2891                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
2892                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2893                    if length_in_bytes > 0 {
2894                        if write_address % usize::try_from(flat_abi.align)? != 0 {
2895                            bail!("write pointer not aligned");
2896                        }
2897                        if read_address % usize::try_from(flat_abi.align)? != 0 {
2898                            bail!("read pointer not aligned");
2899                        }
2900
2901                        let store_opaque = store.0.store_opaque_mut();
2902
2903                        {
2904                            let src = write_options
2905                                .memory(store_opaque)
2906                                .get(write_address..)
2907                                .and_then(|b| b.get(..length_in_bytes))
2908                                .ok_or_else(|| {
2909                                    anyhow::anyhow!("write pointer out of bounds of memory")
2910                                })?
2911                                .as_ptr();
2912                            let dst = read_options
2913                                .memory_mut(store_opaque)
2914                                .get_mut(read_address..)
2915                                .and_then(|b| b.get_mut(..length_in_bytes))
2916                                .ok_or_else(|| {
2917                                    anyhow::anyhow!("read pointer out of bounds of memory")
2918                                })?
2919                                .as_mut_ptr();
2920                            // SAFETY: Both `src` and `dst` have been validated
2921                            // above.
2922                            unsafe { src.copy_to(dst, length_in_bytes) };
2923                        }
2924                    }
2925                } else {
2926                    let store_opaque = store.0.store_opaque_mut();
2927                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
2928                    let ty = types[types[write_ty].ty].payload.unwrap();
2929                    let abi = lift.types.canonical_abi(&ty);
2930                    let size = usize::try_from(abi.size32).unwrap();
2931                    if write_address % usize::try_from(abi.align32)? != 0 {
2932                        bail!("write pointer not aligned");
2933                    }
2934                    let bytes = lift
2935                        .memory()
2936                        .get(write_address..)
2937                        .and_then(|b| b.get(..size * count))
2938                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2939
2940                    let values = (0..count)
2941                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2942                        .collect::<Result<Vec<_>>>()?;
2943
2944                    let id = TableId::<TransmitHandle>::new(rep);
2945                    log::trace!("copy values {values:?} for {id:?}");
2946
2947                    let lower =
2948                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2949                    let ty = types[types[read_ty].ty].payload.unwrap();
2950                    let abi = lower.types.canonical_abi(&ty);
2951                    if read_address % usize::try_from(abi.align32)? != 0 {
2952                        bail!("read pointer not aligned");
2953                    }
2954                    let size = usize::try_from(abi.size32).unwrap();
2955                    lower
2956                        .as_slice_mut()
2957                        .get_mut(read_address..)
2958                        .and_then(|b| b.get_mut(..size * count))
2959                        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2960                    let mut ptr = read_address;
2961                    for value in values {
2962                        value.store(lower, ty, ptr)?;
2963                        ptr += size
2964                    }
2965                }
2966            }
2967            _ => unreachable!(),
2968        }
2969
2970        Ok(())
2971    }
2972
2973    fn check_bounds(
2974        self,
2975        store: &StoreOpaque,
2976        options: &Options,
2977        ty: TransmitIndex,
2978        address: usize,
2979        count: usize,
2980    ) -> Result<()> {
2981        let types = self.id().get(store).component().types().clone();
2982        let size = usize::try_from(
2983            match ty {
2984                TransmitIndex::Future(ty) => types[types[ty].ty]
2985                    .payload
2986                    .map(|ty| types.canonical_abi(&ty).size32),
2987                TransmitIndex::Stream(ty) => types[types[ty].ty]
2988                    .payload
2989                    .map(|ty| types.canonical_abi(&ty).size32),
2990            }
2991            .unwrap_or(0),
2992        )
2993        .unwrap();
2994
2995        if count > 0 && size > 0 {
2996            options
2997                .memory(store)
2998                .get(address..)
2999                .and_then(|b| b.get(..(size * count)))
3000                .map(drop)
3001                .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
3002        } else {
3003            Ok(())
3004        }
3005    }
3006
3007    /// Write to the specified stream or future from the guest.
3008    pub(super) fn guest_write<T: 'static>(
3009        self,
3010        mut store: StoreContextMut<T>,
3011        ty: TransmitIndex,
3012        options: OptionsIndex,
3013        flat_abi: Option<FlatAbi>,
3014        handle: u32,
3015        address: u32,
3016        count: u32,
3017    ) -> Result<ReturnCode> {
3018        let address = usize::try_from(address).unwrap();
3019        let count = usize::try_from(count).unwrap();
3020        let options = Options::new_index(store.0, self, options);
3021        self.check_bounds(store.0, &options, ty, address, count)?;
3022        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3023        let TransmitLocalState::Write { done } = *state else {
3024            bail!(
3025                "invalid handle {handle}; expected `Write`; got {:?}",
3026                *state
3027            );
3028        };
3029
3030        if done {
3031            bail!("cannot write to stream after being notified that the readable end dropped");
3032        }
3033
3034        *state = TransmitLocalState::Busy;
3035        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3036        let concurrent_state = store.0.concurrent_state_mut();
3037        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3038        let transmit = concurrent_state.get_mut(transmit_id)?;
3039        log::trace!(
3040            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3041            transmit.read
3042        );
3043
3044        if transmit.done {
3045            bail!("cannot write to future after previous write succeeded or readable end dropped");
3046        }
3047
3048        let new_state = if let ReadState::Dropped = &transmit.read {
3049            ReadState::Dropped
3050        } else {
3051            ReadState::Open
3052        };
3053
3054        let set_guest_ready = |me: &mut ConcurrentState| {
3055            let transmit = me.get_mut(transmit_id)?;
3056            assert!(matches!(&transmit.write, WriteState::Open));
3057            transmit.write = WriteState::GuestReady {
3058                instance: self,
3059                ty,
3060                flat_abi,
3061                options,
3062                address,
3063                count,
3064                handle,
3065            };
3066            Ok::<_, crate::Error>(())
3067        };
3068
3069        let mut result = match mem::replace(&mut transmit.read, new_state) {
3070            ReadState::GuestReady {
3071                ty: read_ty,
3072                flat_abi: read_flat_abi,
3073                options: read_options,
3074                address: read_address,
3075                count: read_count,
3076                handle: read_handle,
3077            } => {
3078                assert_eq!(flat_abi, read_flat_abi);
3079
3080                if let TransmitIndex::Future(_) = ty {
3081                    transmit.done = true;
3082                }
3083
3084                // Note that zero-length reads and writes are handling specially
3085                // by the spec to allow each end to signal readiness to the
3086                // other.  Quoting the spec:
3087                //
3088                // ```
3089                // The meaning of a read or write when the length is 0 is that
3090                // the caller is querying the "readiness" of the other
3091                // side. When a 0-length read/write rendezvous with a
3092                // non-0-length read/write, only the 0-length read/write
3093                // completes; the non-0-length read/write is kept pending (and
3094                // ready for a subsequent rendezvous).
3095                //
3096                // In the corner case where a 0-length read and write
3097                // rendezvous, only the writer is notified of readiness. To
3098                // avoid livelock, the Canonical ABI requires that a writer must
3099                // (eventually) follow a completed 0-length write with a
3100                // non-0-length write that is allowed to block (allowing the
3101                // reader end to run and rendezvous with its own non-0-length
3102                // read).
3103                // ```
3104
3105                let write_complete = count == 0 || read_count > 0;
3106                let read_complete = count > 0;
3107                let read_buffer_remaining = count < read_count;
3108
3109                let read_handle_rep = transmit.read_handle.rep();
3110
3111                let count = count.min(read_count);
3112
3113                self.copy(
3114                    store.as_context_mut(),
3115                    flat_abi,
3116                    ty,
3117                    &options,
3118                    address,
3119                    read_ty,
3120                    &read_options,
3121                    read_address,
3122                    count,
3123                    rep,
3124                )?;
3125
3126                let instance = self.id().get_mut(store.0);
3127                let types = instance.component().types();
3128                let item_size = payload(ty, types)
3129                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3130                    .unwrap_or(0);
3131                let concurrent_state = store.0.concurrent_state_mut();
3132                if read_complete {
3133                    let count = u32::try_from(count).unwrap();
3134                    let total = if let Some(Event::StreamRead {
3135                        code: ReturnCode::Completed(old_total),
3136                        ..
3137                    }) = concurrent_state.take_event(read_handle_rep)?
3138                    {
3139                        count + old_total
3140                    } else {
3141                        count
3142                    };
3143
3144                    let code = ReturnCode::completed(ty.kind(), total);
3145
3146                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3147                }
3148
3149                if read_buffer_remaining {
3150                    let transmit = concurrent_state.get_mut(transmit_id)?;
3151                    transmit.read = ReadState::GuestReady {
3152                        ty: read_ty,
3153                        flat_abi: read_flat_abi,
3154                        options: read_options,
3155                        address: read_address + (count * item_size),
3156                        count: read_count - count,
3157                        handle: read_handle,
3158                    };
3159                }
3160
3161                if write_complete {
3162                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3163                } else {
3164                    set_guest_ready(concurrent_state)?;
3165                    ReturnCode::Blocked
3166                }
3167            }
3168
3169            ReadState::HostReady {
3170                consume,
3171                guest_offset,
3172                cancel,
3173                cancel_waker,
3174            } => {
3175                assert!(cancel_waker.is_none());
3176                assert!(!cancel);
3177                assert_eq!(0, guest_offset);
3178
3179                if let TransmitIndex::Future(_) = ty {
3180                    transmit.done = true;
3181                }
3182
3183                set_guest_ready(concurrent_state)?;
3184                self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3185            }
3186
3187            ReadState::HostToHost { .. } => unreachable!(),
3188
3189            ReadState::Open => {
3190                set_guest_ready(concurrent_state)?;
3191                ReturnCode::Blocked
3192            }
3193
3194            ReadState::Dropped => {
3195                if let TransmitIndex::Future(_) = ty {
3196                    transmit.done = true;
3197                }
3198
3199                ReturnCode::Dropped(0)
3200            }
3201        };
3202
3203        if result == ReturnCode::Blocked && !options.async_() {
3204            result = self.wait_for_write(store.0, transmit_handle)?;
3205        }
3206
3207        if result != ReturnCode::Blocked {
3208            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3209                TransmitLocalState::Write {
3210                    done: matches!(
3211                        (result, ty),
3212                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3213                    ),
3214                };
3215        }
3216
3217        log::trace!(
3218            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3219        );
3220
3221        Ok(result)
3222    }
3223
3224    /// Read from the specified stream or future from the guest.
3225    pub(super) fn guest_read<T: 'static>(
3226        self,
3227        mut store: StoreContextMut<T>,
3228        ty: TransmitIndex,
3229        options: OptionsIndex,
3230        flat_abi: Option<FlatAbi>,
3231        handle: u32,
3232        address: u32,
3233        count: u32,
3234    ) -> Result<ReturnCode> {
3235        let address = usize::try_from(address).unwrap();
3236        let count = usize::try_from(count).unwrap();
3237        let options = Options::new_index(store.0, self, options);
3238        self.check_bounds(store.0, &options, ty, address, count)?;
3239        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3240        let TransmitLocalState::Read { done } = *state else {
3241            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3242        };
3243
3244        if done {
3245            bail!("cannot read from stream after being notified that the writable end dropped");
3246        }
3247
3248        *state = TransmitLocalState::Busy;
3249        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3250        let concurrent_state = store.0.concurrent_state_mut();
3251        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3252        let transmit = concurrent_state.get_mut(transmit_id)?;
3253        log::trace!(
3254            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3255            transmit.write
3256        );
3257
3258        if transmit.done {
3259            bail!("cannot read from future after previous read succeeded");
3260        }
3261
3262        let new_state = if let WriteState::Dropped = &transmit.write {
3263            WriteState::Dropped
3264        } else {
3265            WriteState::Open
3266        };
3267
3268        let set_guest_ready = |me: &mut ConcurrentState| {
3269            let transmit = me.get_mut(transmit_id)?;
3270            assert!(matches!(&transmit.read, ReadState::Open));
3271            transmit.read = ReadState::GuestReady {
3272                ty,
3273                flat_abi,
3274                options,
3275                address,
3276                count,
3277                handle,
3278            };
3279            Ok::<_, crate::Error>(())
3280        };
3281
3282        let mut result = match mem::replace(&mut transmit.write, new_state) {
3283            WriteState::GuestReady {
3284                instance: _,
3285                ty: write_ty,
3286                flat_abi: write_flat_abi,
3287                options: write_options,
3288                address: write_address,
3289                count: write_count,
3290                handle: write_handle,
3291            } => {
3292                assert_eq!(flat_abi, write_flat_abi);
3293
3294                if let TransmitIndex::Future(_) = ty {
3295                    transmit.done = true;
3296                }
3297
3298                let write_handle_rep = transmit.write_handle.rep();
3299
3300                // See the comment in `guest_write` for the
3301                // `ReadState::GuestReady` case concerning zero-length reads and
3302                // writes.
3303
3304                let write_complete = write_count == 0 || count > 0;
3305                let read_complete = write_count > 0;
3306                let write_buffer_remaining = count < write_count;
3307
3308                let count = count.min(write_count);
3309
3310                self.copy(
3311                    store.as_context_mut(),
3312                    flat_abi,
3313                    write_ty,
3314                    &write_options,
3315                    write_address,
3316                    ty,
3317                    &options,
3318                    address,
3319                    count,
3320                    rep,
3321                )?;
3322
3323                let instance = self.id().get_mut(store.0);
3324                let types = instance.component().types();
3325                let item_size = payload(ty, types)
3326                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3327                    .unwrap_or(0);
3328                let concurrent_state = store.0.concurrent_state_mut();
3329
3330                if write_complete {
3331                    let count = u32::try_from(count).unwrap();
3332                    let total = if let Some(Event::StreamWrite {
3333                        code: ReturnCode::Completed(old_total),
3334                        ..
3335                    }) = concurrent_state.take_event(write_handle_rep)?
3336                    {
3337                        count + old_total
3338                    } else {
3339                        count
3340                    };
3341
3342                    let code = ReturnCode::completed(ty.kind(), total);
3343
3344                    concurrent_state.send_write_result(
3345                        write_ty,
3346                        transmit_id,
3347                        write_handle,
3348                        code,
3349                    )?;
3350                }
3351
3352                if write_buffer_remaining {
3353                    let transmit = concurrent_state.get_mut(transmit_id)?;
3354                    transmit.write = WriteState::GuestReady {
3355                        instance: self,
3356                        ty: write_ty,
3357                        flat_abi: write_flat_abi,
3358                        options: write_options,
3359                        address: write_address + (count * item_size),
3360                        count: write_count - count,
3361                        handle: write_handle,
3362                    };
3363                }
3364
3365                if read_complete {
3366                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3367                } else {
3368                    set_guest_ready(concurrent_state)?;
3369                    ReturnCode::Blocked
3370                }
3371            }
3372
3373            WriteState::HostReady {
3374                produce,
3375                try_into,
3376                guest_offset,
3377                cancel,
3378                cancel_waker,
3379            } => {
3380                assert!(cancel_waker.is_none());
3381                assert!(!cancel);
3382                assert_eq!(0, guest_offset);
3383
3384                if let TransmitIndex::Future(_) = ty {
3385                    transmit.done = true;
3386                }
3387
3388                set_guest_ready(concurrent_state)?;
3389
3390                self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?
3391            }
3392
3393            WriteState::Open => {
3394                set_guest_ready(concurrent_state)?;
3395                ReturnCode::Blocked
3396            }
3397
3398            WriteState::Dropped => ReturnCode::Dropped(0),
3399        };
3400
3401        if result == ReturnCode::Blocked && !options.async_() {
3402            result = self.wait_for_read(store.0, transmit_handle)?;
3403        }
3404
3405        if result != ReturnCode::Blocked {
3406            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3407                TransmitLocalState::Read {
3408                    done: matches!(
3409                        (result, ty),
3410                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3411                    ),
3412                };
3413        }
3414
3415        log::trace!(
3416            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3417        );
3418
3419        Ok(result)
3420    }
3421
3422    fn wait_for_write(
3423        self,
3424        store: &mut StoreOpaque,
3425        handle: TableId<TransmitHandle>,
3426    ) -> Result<ReturnCode> {
3427        let waitable = Waitable::Transmit(handle);
3428        store.wait_for_event(waitable)?;
3429        let event = waitable.take_event(store.concurrent_state_mut())?;
3430        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3431            event
3432        {
3433            waitable.on_delivery(self.id().get_mut(store), event);
3434            Ok(code)
3435        } else {
3436            unreachable!()
3437        }
3438    }
3439
3440    /// Cancel a pending stream or future write.
3441    fn cancel_write(
3442        self,
3443        store: &mut StoreOpaque,
3444        transmit_id: TableId<TransmitState>,
3445        async_: bool,
3446    ) -> Result<ReturnCode> {
3447        let state = store.concurrent_state_mut();
3448        let transmit = state.get_mut(transmit_id)?;
3449        log::trace!(
3450            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3451            transmit.read,
3452            transmit.write
3453        );
3454
3455        let code = if let Some(event) =
3456            Waitable::Transmit(transmit.write_handle).take_event(state)?
3457        {
3458            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3459                unreachable!();
3460            };
3461            match (code, event) {
3462                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3463                    ReturnCode::Cancelled(count)
3464                }
3465                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3466                _ => unreachable!(),
3467            }
3468        } else if let ReadState::HostReady {
3469            cancel,
3470            cancel_waker,
3471            ..
3472        } = &mut state.get_mut(transmit_id)?.read
3473        {
3474            *cancel = true;
3475            if let Some(waker) = cancel_waker.take() {
3476                waker.wake();
3477            }
3478
3479            if async_ {
3480                ReturnCode::Blocked
3481            } else {
3482                let handle = store
3483                    .concurrent_state_mut()
3484                    .get_mut(transmit_id)?
3485                    .write_handle;
3486                self.wait_for_write(store, handle)?
3487            }
3488        } else {
3489            ReturnCode::Cancelled(0)
3490        };
3491
3492        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3493
3494        match &transmit.write {
3495            WriteState::GuestReady { .. } => {
3496                transmit.write = WriteState::Open;
3497            }
3498            WriteState::HostReady { .. } => todo!("support host write cancellation"),
3499            WriteState::Open | WriteState::Dropped => {}
3500        }
3501
3502        log::trace!("cancelled write {transmit_id:?}: {code:?}");
3503
3504        Ok(code)
3505    }
3506
3507    fn wait_for_read(
3508        self,
3509        store: &mut StoreOpaque,
3510        handle: TableId<TransmitHandle>,
3511    ) -> Result<ReturnCode> {
3512        let waitable = Waitable::Transmit(handle);
3513        store.wait_for_event(waitable)?;
3514        let event = waitable.take_event(store.concurrent_state_mut())?;
3515        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3516            event
3517        {
3518            waitable.on_delivery(self.id().get_mut(store), event);
3519            Ok(code)
3520        } else {
3521            unreachable!()
3522        }
3523    }
3524
3525    /// Cancel a pending stream or future read.
3526    fn cancel_read(
3527        self,
3528        store: &mut StoreOpaque,
3529        transmit_id: TableId<TransmitState>,
3530        async_: bool,
3531    ) -> Result<ReturnCode> {
3532        let state = store.concurrent_state_mut();
3533        let transmit = state.get_mut(transmit_id)?;
3534        log::trace!(
3535            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3536            transmit.read,
3537            transmit.write
3538        );
3539
3540        let code = if let Some(event) =
3541            Waitable::Transmit(transmit.read_handle).take_event(state)?
3542        {
3543            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3544                unreachable!();
3545            };
3546            match (code, event) {
3547                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3548                    ReturnCode::Cancelled(count)
3549                }
3550                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3551                _ => unreachable!(),
3552            }
3553        } else if let WriteState::HostReady {
3554            cancel,
3555            cancel_waker,
3556            ..
3557        } = &mut state.get_mut(transmit_id)?.write
3558        {
3559            *cancel = true;
3560            if let Some(waker) = cancel_waker.take() {
3561                waker.wake();
3562            }
3563
3564            if async_ {
3565                ReturnCode::Blocked
3566            } else {
3567                let handle = store
3568                    .concurrent_state_mut()
3569                    .get_mut(transmit_id)?
3570                    .read_handle;
3571                self.wait_for_read(store, handle)?
3572            }
3573        } else {
3574            ReturnCode::Cancelled(0)
3575        };
3576
3577        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3578
3579        match &transmit.read {
3580            ReadState::GuestReady { .. } => {
3581                transmit.read = ReadState::Open;
3582            }
3583            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3584                todo!("support host read cancellation")
3585            }
3586            ReadState::Open | ReadState::Dropped => {}
3587        }
3588
3589        log::trace!("cancelled read {transmit_id:?}: {code:?}");
3590
3591        Ok(code)
3592    }
3593
3594    /// Cancel a pending write for the specified stream or future from the guest.
3595    fn guest_cancel_write(
3596        self,
3597        store: &mut StoreOpaque,
3598        ty: TransmitIndex,
3599        async_: bool,
3600        writer: u32,
3601    ) -> Result<ReturnCode> {
3602        let (rep, state) =
3603            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3604        let id = TableId::<TransmitHandle>::new(rep);
3605        log::trace!("guest cancel write {id:?} (handle {writer})");
3606        match state {
3607            TransmitLocalState::Write { .. } => {
3608                bail!("stream or future write cancelled when no write is pending")
3609            }
3610            TransmitLocalState::Read { .. } => {
3611                bail!("passed read end to `{{stream|future}}.cancel-write`")
3612            }
3613            TransmitLocalState::Busy => {}
3614        }
3615        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3616        let code = self.cancel_write(store, transmit_id, async_)?;
3617        if !matches!(code, ReturnCode::Blocked) {
3618            let state =
3619                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3620                    .1;
3621            if let TransmitLocalState::Busy = state {
3622                *state = TransmitLocalState::Write { done: false };
3623            }
3624        }
3625        Ok(code)
3626    }
3627
3628    /// Cancel a pending read for the specified stream or future from the guest.
3629    fn guest_cancel_read(
3630        self,
3631        store: &mut StoreOpaque,
3632        ty: TransmitIndex,
3633        async_: bool,
3634        reader: u32,
3635    ) -> Result<ReturnCode> {
3636        let (rep, state) =
3637            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3638        let id = TableId::<TransmitHandle>::new(rep);
3639        log::trace!("guest cancel read {id:?} (handle {reader})");
3640        match state {
3641            TransmitLocalState::Read { .. } => {
3642                bail!("stream or future read cancelled when no read is pending")
3643            }
3644            TransmitLocalState::Write { .. } => {
3645                bail!("passed write end to `{{stream|future}}.cancel-read`")
3646            }
3647            TransmitLocalState::Busy => {}
3648        }
3649        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3650        let code = self.cancel_read(store, transmit_id, async_)?;
3651        if !matches!(code, ReturnCode::Blocked) {
3652            let state =
3653                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3654                    .1;
3655            if let TransmitLocalState::Busy = state {
3656                *state = TransmitLocalState::Read { done: false };
3657            }
3658        }
3659        Ok(code)
3660    }
3661
3662    /// Drop the readable end of the specified stream or future from the guest.
3663    fn guest_drop_readable(
3664        self,
3665        store: &mut StoreOpaque,
3666        ty: TransmitIndex,
3667        reader: u32,
3668    ) -> Result<()> {
3669        let table = self.id().get_mut(store).table_for_transmit(ty);
3670        let (rep, _is_done) = match ty {
3671            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3672            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3673        };
3674        let kind = match ty {
3675            TransmitIndex::Stream(_) => TransmitKind::Stream,
3676            TransmitIndex::Future(_) => TransmitKind::Future,
3677        };
3678        let id = TableId::<TransmitHandle>::new(rep);
3679        log::trace!("guest_drop_readable: drop reader {id:?}");
3680        store.host_drop_reader(id, kind)
3681    }
3682
3683    /// Create a new error context for the given component.
3684    pub(crate) fn error_context_new(
3685        self,
3686        store: &mut StoreOpaque,
3687        caller: RuntimeComponentInstanceIndex,
3688        ty: TypeComponentLocalErrorContextTableIndex,
3689        options: OptionsIndex,
3690        debug_msg_address: u32,
3691        debug_msg_len: u32,
3692    ) -> Result<u32> {
3693        self.id().get(store).check_may_leave(caller)?;
3694        let options = Options::new_index(store, self, options);
3695        let lift_ctx = &mut LiftContext::new(store, &options, self);
3696        //  Read string from guest memory
3697        let address = usize::try_from(debug_msg_address)?;
3698        let len = usize::try_from(debug_msg_len)?;
3699        lift_ctx
3700            .memory()
3701            .get(address..)
3702            .and_then(|b| b.get(..len))
3703            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3704        let message = WasmStr::new(address, len, lift_ctx)?;
3705
3706        // Create a new ErrorContext that is tracked along with other concurrent state
3707        let err_ctx = ErrorContextState {
3708            debug_msg: message
3709                .to_str_from_memory(options.memory(store))?
3710                .to_string(),
3711        };
3712        let state = store.concurrent_state_mut();
3713        let table_id = state.push(err_ctx)?;
3714        let global_ref_count_idx =
3715            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3716
3717        // Add to the global error context ref counts
3718        let _ = state
3719            .global_error_context_ref_counts
3720            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3721
3722        // Error context are tracked both locally (to a single component instance) and globally
3723        // the counts for both must stay in sync.
3724        //
3725        // Here we reflect the newly created global concurrent error context state into the
3726        // component instance's locally tracked count, along with the appropriate key into the global
3727        // ref tracking data structures to enable later lookup
3728        let local_idx = self
3729            .id()
3730            .get_mut(store)
3731            .table_for_error_context(ty)
3732            .error_context_insert(table_id.rep())?;
3733
3734        Ok(local_idx)
3735    }
3736
3737    /// Retrieve the debug message from the specified error context.
3738    pub(super) fn error_context_debug_message<T>(
3739        self,
3740        store: StoreContextMut<T>,
3741        ty: TypeComponentLocalErrorContextTableIndex,
3742        options: OptionsIndex,
3743        err_ctx_handle: u32,
3744        debug_msg_address: u32,
3745    ) -> Result<()> {
3746        // Retrieve the error context and internal debug message
3747        let handle_table_id_rep = self
3748            .id()
3749            .get_mut(store.0)
3750            .table_for_error_context(ty)
3751            .error_context_rep(err_ctx_handle)?;
3752
3753        let state = store.0.concurrent_state_mut();
3754        // Get the state associated with the error context
3755        let ErrorContextState { debug_msg } =
3756            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3757        let debug_msg = debug_msg.clone();
3758
3759        let options = Options::new_index(store.0, self, options);
3760        let types = self.id().get(store.0).component().types().clone();
3761        let lower_cx = &mut LowerContext::new(store, &options, &types, self);
3762        let debug_msg_address = usize::try_from(debug_msg_address)?;
3763        // Lower the string into the component's memory
3764        let offset = lower_cx
3765            .as_slice_mut()
3766            .get(debug_msg_address..)
3767            .and_then(|b| b.get(..debug_msg.bytes().len()))
3768            .map(|_| debug_msg_address)
3769            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3770        debug_msg
3771            .as_str()
3772            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3773
3774        Ok(())
3775    }
3776
3777    /// Implements the `future.cancel-read` intrinsic.
3778    pub(crate) fn future_cancel_read(
3779        self,
3780        store: &mut StoreOpaque,
3781        caller: RuntimeComponentInstanceIndex,
3782        ty: TypeFutureTableIndex,
3783        async_: bool,
3784        reader: u32,
3785    ) -> Result<u32> {
3786        self.id().get(store).check_may_leave(caller)?;
3787        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3788            .map(|v| v.encode())
3789    }
3790
3791    /// Implements the `future.cancel-write` intrinsic.
3792    pub(crate) fn future_cancel_write(
3793        self,
3794        store: &mut StoreOpaque,
3795        caller: RuntimeComponentInstanceIndex,
3796        ty: TypeFutureTableIndex,
3797        async_: bool,
3798        writer: u32,
3799    ) -> Result<u32> {
3800        self.id().get(store).check_may_leave(caller)?;
3801        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3802            .map(|v| v.encode())
3803    }
3804
3805    /// Implements the `stream.cancel-read` intrinsic.
3806    pub(crate) fn stream_cancel_read(
3807        self,
3808        store: &mut StoreOpaque,
3809        caller: RuntimeComponentInstanceIndex,
3810        ty: TypeStreamTableIndex,
3811        async_: bool,
3812        reader: u32,
3813    ) -> Result<u32> {
3814        self.id().get(store).check_may_leave(caller)?;
3815        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3816            .map(|v| v.encode())
3817    }
3818
3819    /// Implements the `stream.cancel-write` intrinsic.
3820    pub(crate) fn stream_cancel_write(
3821        self,
3822        store: &mut StoreOpaque,
3823        caller: RuntimeComponentInstanceIndex,
3824        ty: TypeStreamTableIndex,
3825        async_: bool,
3826        writer: u32,
3827    ) -> Result<u32> {
3828        self.id().get(store).check_may_leave(caller)?;
3829        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3830            .map(|v| v.encode())
3831    }
3832
3833    /// Implements the `future.drop-readable` intrinsic.
3834    pub(crate) fn future_drop_readable(
3835        self,
3836        store: &mut StoreOpaque,
3837        caller: RuntimeComponentInstanceIndex,
3838        ty: TypeFutureTableIndex,
3839        reader: u32,
3840    ) -> Result<()> {
3841        self.id().get(store).check_may_leave(caller)?;
3842        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3843    }
3844
3845    /// Implements the `stream.drop-readable` intrinsic.
3846    pub(crate) fn stream_drop_readable(
3847        self,
3848        store: &mut StoreOpaque,
3849        caller: RuntimeComponentInstanceIndex,
3850        ty: TypeStreamTableIndex,
3851        reader: u32,
3852    ) -> Result<()> {
3853        self.id().get(store).check_may_leave(caller)?;
3854        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3855    }
3856
3857    /// Allocate a new future or stream and grant ownership of both the read and
3858    /// write ends to the (sub-)component instance to which the specified
3859    /// `TransmitIndex` belongs.
3860    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
3861        let (write, read) = store.concurrent_state_mut().new_transmit()?;
3862
3863        let table = self.id().get_mut(store).table_for_transmit(ty);
3864        let (read_handle, write_handle) = match ty {
3865            TransmitIndex::Future(ty) => (
3866                table.future_insert_read(ty, read.rep())?,
3867                table.future_insert_write(ty, write.rep())?,
3868            ),
3869            TransmitIndex::Stream(ty) => (
3870                table.stream_insert_read(ty, read.rep())?,
3871                table.stream_insert_write(ty, write.rep())?,
3872            ),
3873        };
3874
3875        let state = store.concurrent_state_mut();
3876        state.get_mut(read)?.common.handle = Some(read_handle);
3877        state.get_mut(write)?.common.handle = Some(write_handle);
3878
3879        Ok(ResourcePair {
3880            write: write_handle,
3881            read: read_handle,
3882        })
3883    }
3884
3885    /// Drop the specified error context.
3886    pub(crate) fn error_context_drop(
3887        self,
3888        store: &mut StoreOpaque,
3889        caller: RuntimeComponentInstanceIndex,
3890        ty: TypeComponentLocalErrorContextTableIndex,
3891        error_context: u32,
3892    ) -> Result<()> {
3893        let instance = self.id().get_mut(store);
3894        instance.check_may_leave(caller)?;
3895
3896        let local_handle_table = instance.table_for_error_context(ty);
3897
3898        let rep = local_handle_table.error_context_drop(error_context)?;
3899
3900        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3901
3902        let state = store.concurrent_state_mut();
3903        let GlobalErrorContextRefCount(global_ref_count) = state
3904            .global_error_context_ref_counts
3905            .get_mut(&global_ref_count_idx)
3906            .expect("retrieve concurrent state for error context during drop");
3907
3908        // Reduce the component-global ref count, removing tracking if necessary
3909        assert!(*global_ref_count >= 1);
3910        *global_ref_count -= 1;
3911        if *global_ref_count == 0 {
3912            state
3913                .global_error_context_ref_counts
3914                .remove(&global_ref_count_idx);
3915
3916            state
3917                .delete(TableId::<ErrorContextState>::new(rep))
3918                .context("deleting component-global error context data")?;
3919        }
3920
3921        Ok(())
3922    }
3923
3924    /// Transfer ownership of the specified stream or future read end from one
3925    /// guest to another.
3926    fn guest_transfer(
3927        self,
3928        store: &mut StoreOpaque,
3929        src_idx: u32,
3930        src: TransmitIndex,
3931        dst: TransmitIndex,
3932    ) -> Result<u32> {
3933        let mut instance = self.id().get_mut(store);
3934        let src_table = instance.as_mut().table_for_transmit(src);
3935        let (rep, is_done) = match src {
3936            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
3937            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
3938        };
3939        if is_done {
3940            bail!("cannot lift after being notified that the writable end dropped");
3941        }
3942        let dst_table = instance.table_for_transmit(dst);
3943        let handle = match dst {
3944            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
3945            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
3946        }?;
3947        store
3948            .concurrent_state_mut()
3949            .get_mut(TableId::<TransmitHandle>::new(rep))?
3950            .common
3951            .handle = Some(handle);
3952        Ok(handle)
3953    }
3954
3955    /// Implements the `future.new` intrinsic.
3956    pub(crate) fn future_new(
3957        self,
3958        store: &mut StoreOpaque,
3959        caller: RuntimeComponentInstanceIndex,
3960        ty: TypeFutureTableIndex,
3961    ) -> Result<ResourcePair> {
3962        self.id().get(store).check_may_leave(caller)?;
3963        self.guest_new(store, TransmitIndex::Future(ty))
3964    }
3965
3966    /// Implements the `stream.new` intrinsic.
3967    pub(crate) fn stream_new(
3968        self,
3969        store: &mut StoreOpaque,
3970        caller: RuntimeComponentInstanceIndex,
3971        ty: TypeStreamTableIndex,
3972    ) -> Result<ResourcePair> {
3973        self.id().get(store).check_may_leave(caller)?;
3974        self.guest_new(store, TransmitIndex::Stream(ty))
3975    }
3976
3977    /// Transfer ownership of the specified future read end from one guest to
3978    /// another.
3979    pub(crate) fn future_transfer(
3980        self,
3981        store: &mut StoreOpaque,
3982        src_idx: u32,
3983        src: TypeFutureTableIndex,
3984        dst: TypeFutureTableIndex,
3985    ) -> Result<u32> {
3986        self.guest_transfer(
3987            store,
3988            src_idx,
3989            TransmitIndex::Future(src),
3990            TransmitIndex::Future(dst),
3991        )
3992    }
3993
3994    /// Transfer ownership of the specified stream read end from one guest to
3995    /// another.
3996    pub(crate) fn stream_transfer(
3997        self,
3998        store: &mut StoreOpaque,
3999        src_idx: u32,
4000        src: TypeStreamTableIndex,
4001        dst: TypeStreamTableIndex,
4002    ) -> Result<u32> {
4003        self.guest_transfer(
4004            store,
4005            src_idx,
4006            TransmitIndex::Stream(src),
4007            TransmitIndex::Stream(dst),
4008        )
4009    }
4010
4011    /// Copy the specified error context from one component to another.
4012    pub(crate) fn error_context_transfer(
4013        self,
4014        store: &mut StoreOpaque,
4015        src_idx: u32,
4016        src: TypeComponentLocalErrorContextTableIndex,
4017        dst: TypeComponentLocalErrorContextTableIndex,
4018    ) -> Result<u32> {
4019        let mut instance = self.id().get_mut(store);
4020        let rep = instance
4021            .as_mut()
4022            .table_for_error_context(src)
4023            .error_context_rep(src_idx)?;
4024        let dst_idx = instance
4025            .table_for_error_context(dst)
4026            .error_context_insert(rep)?;
4027
4028        // Update the global (cross-subcomponent) count for error contexts
4029        // as the new component has essentially created a new reference that will
4030        // be dropped/handled independently
4031        let global_ref_count = store
4032            .concurrent_state_mut()
4033            .global_error_context_ref_counts
4034            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4035            .context("global ref count present for existing (sub)component error context")?;
4036        global_ref_count.0 += 1;
4037
4038        Ok(dst_idx)
4039    }
4040}
4041
4042impl ComponentInstance {
4043    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4044        let (tables, types) = self.guest_tables();
4045        let runtime_instance = match ty {
4046            TransmitIndex::Stream(ty) => types[ty].instance,
4047            TransmitIndex::Future(ty) => types[ty].instance,
4048        };
4049        &mut tables[runtime_instance]
4050    }
4051
4052    fn table_for_error_context(
4053        self: Pin<&mut Self>,
4054        ty: TypeComponentLocalErrorContextTableIndex,
4055    ) -> &mut HandleTable {
4056        let (tables, types) = self.guest_tables();
4057        let runtime_instance = types[ty].instance;
4058        &mut tables[runtime_instance]
4059    }
4060
4061    fn get_mut_by_index(
4062        self: Pin<&mut Self>,
4063        ty: TransmitIndex,
4064        index: u32,
4065    ) -> Result<(u32, &mut TransmitLocalState)> {
4066        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4067    }
4068}
4069
4070impl ConcurrentState {
4071    fn send_write_result(
4072        &mut self,
4073        ty: TransmitIndex,
4074        id: TableId<TransmitState>,
4075        handle: u32,
4076        code: ReturnCode,
4077    ) -> Result<()> {
4078        let write_handle = self.get_mut(id)?.write_handle.rep();
4079        self.set_event(
4080            write_handle,
4081            match ty {
4082                TransmitIndex::Future(ty) => Event::FutureWrite {
4083                    code,
4084                    pending: Some((ty, handle)),
4085                },
4086                TransmitIndex::Stream(ty) => Event::StreamWrite {
4087                    code,
4088                    pending: Some((ty, handle)),
4089                },
4090            },
4091        )
4092    }
4093
4094    fn send_read_result(
4095        &mut self,
4096        ty: TransmitIndex,
4097        id: TableId<TransmitState>,
4098        handle: u32,
4099        code: ReturnCode,
4100    ) -> Result<()> {
4101        let read_handle = self.get_mut(id)?.read_handle.rep();
4102        self.set_event(
4103            read_handle,
4104            match ty {
4105                TransmitIndex::Future(ty) => Event::FutureRead {
4106                    code,
4107                    pending: Some((ty, handle)),
4108                },
4109                TransmitIndex::Stream(ty) => Event::StreamRead {
4110                    code,
4111                    pending: Some((ty, handle)),
4112                },
4113            },
4114        )
4115    }
4116
4117    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4118        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4119    }
4120
4121    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4122        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4123    }
4124
4125    /// Set or update the event for the specified waitable.
4126    ///
4127    /// If there is already an event set for this waitable, we assert that it is
4128    /// of the same variant as the new one and reuse the `ReturnCode` count and
4129    /// the `pending` field if applicable.
4130    // TODO: This is a bit awkward due to how
4131    // `Event::{Stream,Future}{Write,Read}` and
4132    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4133    // Consider updating those representations in a way that allows this
4134    // function to be simplified.
4135    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4136        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4137
4138        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4139            let (ReturnCode::Completed(count)
4140            | ReturnCode::Dropped(count)
4141            | ReturnCode::Cancelled(count)) = old
4142            else {
4143                unreachable!()
4144            };
4145
4146            match new {
4147                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4148                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4149                _ => unreachable!(),
4150            }
4151        }
4152
4153        let event = match (waitable.take_event(self)?, event) {
4154            (None, _) => event,
4155            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4156            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4157            (
4158                Some(Event::StreamWrite {
4159                    code: old_code,
4160                    pending: old_pending,
4161                }),
4162                Event::StreamWrite { code, pending },
4163            ) => Event::StreamWrite {
4164                code: update_code(old_code, code),
4165                pending: old_pending.or(pending),
4166            },
4167            (
4168                Some(Event::StreamRead {
4169                    code: old_code,
4170                    pending: old_pending,
4171                }),
4172                Event::StreamRead { code, pending },
4173            ) => Event::StreamRead {
4174                code: update_code(old_code, code),
4175                pending: old_pending.or(pending),
4176            },
4177            _ => unreachable!(),
4178        };
4179
4180        waitable.set_event(self, Some(event))
4181    }
4182
4183    /// Allocate a new future or stream, including the `TransmitState` and the
4184    /// `TransmitHandle`s corresponding to the read and write ends.
4185    fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4186        let state_id = self.push(TransmitState::default())?;
4187
4188        let write = self.push(TransmitHandle::new(state_id))?;
4189        let read = self.push(TransmitHandle::new(state_id))?;
4190
4191        let state = self.get_mut(state_id)?;
4192        state.write_handle = write;
4193        state.read_handle = read;
4194
4195        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4196
4197        Ok((write, read))
4198    }
4199
4200    /// Delete the specified future or stream, including the read and write ends.
4201    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4202        let state = self.delete(state_id)?;
4203        self.delete(state.write_handle)?;
4204        self.delete(state.read_handle)?;
4205
4206        log::trace!(
4207            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4208            state.write_handle,
4209            state.read_handle,
4210        );
4211
4212        Ok(())
4213    }
4214}
4215
4216pub(crate) struct ResourcePair {
4217    pub(crate) write: u32,
4218    pub(crate) read: u32,
4219}
4220
4221#[cfg(test)]
4222mod tests {
4223    use super::*;
4224    use crate::{Engine, Store};
4225    use core::future::pending;
4226    use core::pin::pin;
4227    use std::sync::LazyLock;
4228
4229    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4230
4231    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4232    where
4233        T: FutureProducer<()>,
4234    {
4235        rx.poll_produce(
4236            &mut Context::from_waker(Waker::noop()),
4237            Store::new(&ENGINE, ()).as_context_mut(),
4238            finish,
4239        )
4240    }
4241
4242    #[test]
4243    fn future_producer() {
4244        let mut fut = pin!(async { anyhow::Ok(()) });
4245        assert!(matches!(
4246            poll_future_producer(fut.as_mut(), false),
4247            Poll::Ready(Ok(Some(()))),
4248        ));
4249
4250        let mut fut = pin!(async { anyhow::Ok(()) });
4251        assert!(matches!(
4252            poll_future_producer(fut.as_mut(), true),
4253            Poll::Ready(Ok(Some(()))),
4254        ));
4255
4256        let mut fut = pin!(pending::<Result<()>>());
4257        assert!(matches!(
4258            poll_future_producer(fut.as_mut(), false),
4259            Poll::Pending,
4260        ));
4261        assert!(matches!(
4262            poll_future_producer(fut.as_mut(), true),
4263            Poll::Ready(Ok(None)),
4264        ));
4265
4266        let (tx, rx) = oneshot::channel();
4267        let mut rx = pin!(rx);
4268        assert!(matches!(
4269            poll_future_producer(rx.as_mut(), false),
4270            Poll::Pending,
4271        ));
4272        assert!(matches!(
4273            poll_future_producer(rx.as_mut(), true),
4274            Poll::Ready(Ok(None)),
4275        ));
4276        tx.send(()).unwrap();
4277        assert!(matches!(
4278            poll_future_producer(rx.as_mut(), true),
4279            Poll::Ready(Ok(Some(()))),
4280        ));
4281
4282        let (tx, rx) = oneshot::channel();
4283        let mut rx = pin!(rx);
4284        tx.send(()).unwrap();
4285        assert!(matches!(
4286            poll_future_producer(rx.as_mut(), false),
4287            Poll::Ready(Ok(Some(()))),
4288        ));
4289
4290        let (tx, rx) = oneshot::channel::<()>();
4291        let mut rx = pin!(rx);
4292        drop(tx);
4293        assert!(matches!(
4294            poll_future_producer(rx.as_mut(), false),
4295            Poll::Ready(Err(..)),
4296        ));
4297
4298        let (tx, rx) = oneshot::channel::<()>();
4299        let mut rx = pin!(rx);
4300        drop(tx);
4301        assert!(matches!(
4302            poll_future_producer(rx.as_mut(), true),
4303            Poll::Ready(Err(..)),
4304        ));
4305    }
4306}