wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext, Options};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use futures::FutureExt;
15use futures::channel::oneshot;
16use std::boxed::Box;
17use std::fmt;
18use std::future;
19use std::io::Cursor;
20use std::iter;
21use std::marker::PhantomData;
22use std::mem::{self, MaybeUninit};
23use std::pin::Pin;
24use std::string::{String, ToString};
25use std::sync::{Arc, Mutex};
26use std::task::{self, Context, Poll, Waker};
27use std::vec::Vec;
28use wasmtime_environ::component::{
29    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
30    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
31    TypeFutureTableIndex, TypeStreamTableIndex,
32};
33
34pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
35
36mod buffers;
37
38/// Enum for distinguishing between a stream or future in functions that handle
39/// both.
40#[derive(Copy, Clone, Debug)]
41pub enum TransmitKind {
42    Stream,
43    Future,
44}
45
46/// Represents `{stream,future}.{read,write}` results.
47#[derive(Copy, Clone, Debug, PartialEq)]
48pub enum ReturnCode {
49    Blocked,
50    Completed(u32),
51    Dropped(u32),
52    Cancelled(u32),
53}
54
55impl ReturnCode {
56    /// Pack `self` into a single 32-bit integer that may be returned to the
57    /// guest.
58    ///
59    /// This corresponds to `pack_copy_result` in the Component Model spec.
60    pub fn encode(&self) -> u32 {
61        const BLOCKED: u32 = 0xffff_ffff;
62        const COMPLETED: u32 = 0x0;
63        const DROPPED: u32 = 0x1;
64        const CANCELLED: u32 = 0x2;
65        match self {
66            ReturnCode::Blocked => BLOCKED,
67            ReturnCode::Completed(n) => {
68                debug_assert!(*n < (1 << 28));
69                (n << 4) | COMPLETED
70            }
71            ReturnCode::Dropped(n) => {
72                debug_assert!(*n < (1 << 28));
73                (n << 4) | DROPPED
74            }
75            ReturnCode::Cancelled(n) => {
76                debug_assert!(*n < (1 << 28));
77                (n << 4) | CANCELLED
78            }
79        }
80    }
81
82    /// Returns `Self::Completed` with the specified count (or zero if
83    /// `matches!(kind, TransmitKind::Future)`)
84    fn completed(kind: TransmitKind, count: u32) -> Self {
85        Self::Completed(if let TransmitKind::Future = kind {
86            0
87        } else {
88            count
89        })
90    }
91}
92
93/// Represents a stream or future type index.
94///
95/// This is useful as a parameter type for functions which operate on either a
96/// future or a stream.
97#[derive(Copy, Clone, Debug)]
98pub enum TransmitIndex {
99    Stream(TypeStreamTableIndex),
100    Future(TypeFutureTableIndex),
101}
102
103impl TransmitIndex {
104    pub fn kind(&self) -> TransmitKind {
105        match self {
106            TransmitIndex::Stream(_) => TransmitKind::Stream,
107            TransmitIndex::Future(_) => TransmitKind::Future,
108        }
109    }
110}
111
112/// Retrieve the payload type of the specified stream or future, or `None` if it
113/// has no payload type.
114fn payload(ty: TransmitIndex, types: &Arc<ComponentTypes>) -> Option<InterfaceType> {
115    match ty {
116        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
117        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
118    }
119}
120
121/// Retrieve the host rep and state for the specified guest-visible waitable
122/// handle.
123fn get_mut_by_index_from(
124    handle_table: &mut HandleTable,
125    ty: TransmitIndex,
126    index: u32,
127) -> Result<(u32, &mut TransmitLocalState)> {
128    match ty {
129        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
130        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
131    }
132}
133
134fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
135    mut store: StoreContextMut<U>,
136    instance: Instance,
137    options: &Options,
138    ty: TransmitIndex,
139    address: usize,
140    count: usize,
141    buffer: &mut B,
142) -> Result<()> {
143    let types = instance.id().get(store.0).component().types().clone();
144    let count = buffer.remaining().len().min(count);
145
146    let lower = &mut if T::MAY_REQUIRE_REALLOC {
147        LowerContext::new
148    } else {
149        LowerContext::new_without_realloc
150    }(store.as_context_mut(), options, &types, instance);
151
152    if address % usize::try_from(T::ALIGN32)? != 0 {
153        bail!("read pointer not aligned");
154    }
155    lower
156        .as_slice_mut()
157        .get_mut(address..)
158        .and_then(|b| b.get_mut(..T::SIZE32 * count))
159        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
160
161    if let Some(ty) = payload(ty, &types) {
162        T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
163    }
164
165    buffer.skip(count);
166
167    Ok(())
168}
169
170fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>, U>(
171    lift: &mut LiftContext<'_>,
172    ty: Option<InterfaceType>,
173    buffer: &mut B,
174    address: usize,
175    count: usize,
176) -> Result<()> {
177    let count = count.min(buffer.remaining_capacity());
178    if T::IS_RUST_UNIT_TYPE {
179        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
180        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
181        // is a valid way to populate the zero-sized buffer.
182        buffer.extend(
183            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
184        )
185    } else {
186        let ty = ty.unwrap();
187        if address % usize::try_from(T::ALIGN32)? != 0 {
188            bail!("write pointer not aligned");
189        }
190        lift.memory()
191            .get(address..)
192            .and_then(|b| b.get(..T::SIZE32 * count))
193            .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
194
195        let list = &WasmList::new(address, count, lift, ty)?;
196        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
197    }
198    Ok(())
199}
200
201/// Represents the state associated with an error context
202#[derive(Debug, PartialEq, Eq, PartialOrd)]
203pub(super) struct ErrorContextState {
204    /// Debug message associated with the error context
205    pub(crate) debug_msg: String,
206}
207
208/// Represents the size and alignment for a "flat" Component Model type,
209/// i.e. one containing no pointers or handles.
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub(super) struct FlatAbi {
212    pub(super) size: u32,
213    pub(super) align: u32,
214}
215
216/// Represents the buffer for a host- or guest-initiated stream read.
217pub struct Destination<'a, T, B> {
218    instance: Instance,
219    id: TableId<TransmitState>,
220    buffer: &'a mut B,
221    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
222    _phantom: PhantomData<fn() -> T>,
223}
224
225impl<'a, T, B> Destination<'a, T, B> {
226    /// Reborrow `self` so it can be used again later.
227    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
228        Destination {
229            instance: self.instance,
230            id: self.id,
231            buffer: &mut *self.buffer,
232            host_buffer: self.host_buffer.as_deref_mut(),
233            _phantom: PhantomData,
234        }
235    }
236
237    /// Take the buffer out of `self`, leaving a default-initialized one in its
238    /// place.
239    ///
240    /// This can be useful for reusing the previously-stored buffer's capacity
241    /// instead of allocating a fresh one.
242    pub fn take_buffer(&mut self) -> B
243    where
244        B: Default,
245    {
246        mem::take(self.buffer)
247    }
248
249    /// Store the specified buffer in `self`.
250    ///
251    /// Any items contained in the buffer will be delivered to the reader after
252    /// the `StreamProducer::poll_produce` call to which this `Destination` was
253    /// passed returns (unless overwritten by another call to `set_buffer`).
254    ///
255    /// If items are stored via this buffer _and_ written via a
256    /// `DirectDestination` view of `self`, then the items in the buffer will be
257    /// delivered after the ones written using `DirectDestination`.
258    pub fn set_buffer(&mut self, buffer: B) {
259        *self.buffer = buffer;
260    }
261
262    /// Return the remaining number of items the current read has capacity to
263    /// accept, if known.
264    ///
265    /// This will return `Some(_)` if the reader is a guest; it will return
266    /// `None` if the reader is the host.
267    ///
268    /// Note that, if this returns `None(0)`, the producer must still attempt to
269    /// produce at least one item if the value of `finish` passed to
270    /// `StreamProducer::poll_produce` is false.  In that case, the reader is
271    /// effectively asking when the producer will be able to produce items
272    /// without blocking (or reach a terminal state such as end-of-stream),
273    /// meaning the next non-zero read must complete without blocking.
274    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
275        let transmit = self
276            .instance
277            .concurrent_state_mut(store.as_context_mut().0)
278            .get_mut(self.id)
279            .unwrap();
280
281        if let &ReadState::GuestReady { count, .. } = &transmit.read {
282            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
283                unreachable!()
284            };
285
286            Some(count - guest_offset)
287        } else {
288            None
289        }
290    }
291}
292
293impl<'a, B> Destination<'a, u8, B> {
294    /// Return a `DirectDestination` view of `self`.
295    ///
296    /// If the reader is a guest, this will provide direct access to the guest's
297    /// read buffer.  If the reader is a host, this will provide access to a
298    /// buffer which will be delivered to the host before any items stored using
299    /// `Destination::set_buffer`.
300    ///
301    /// `capacity` will only be used if the reader is a host, in which case it
302    /// will update the length of the buffer, possibly zero-initializing the new
303    /// elements if the new length is larger than the old length.
304    pub fn as_direct<D>(
305        mut self,
306        store: StoreContextMut<'a, D>,
307        capacity: usize,
308    ) -> DirectDestination<'a, D> {
309        if let Some(buffer) = self.host_buffer.as_deref_mut() {
310            buffer.set_position(0);
311            if buffer.get_mut().is_empty() {
312                buffer.get_mut().resize(capacity, 0);
313            }
314        }
315
316        DirectDestination {
317            instance: self.instance,
318            id: self.id,
319            host_buffer: self.host_buffer,
320            store,
321        }
322    }
323}
324
325/// Represents a read from a `stream<u8>`, providing direct access to the
326/// writer's buffer.
327pub struct DirectDestination<'a, D: 'static> {
328    instance: Instance,
329    id: TableId<TransmitState>,
330    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
331    store: StoreContextMut<'a, D>,
332}
333
334impl<D: 'static> DirectDestination<'_, D> {
335    /// Provide direct access to the writer's buffer.
336    pub fn remaining(&mut self) -> &mut [u8] {
337        if let Some(buffer) = self.host_buffer.as_deref_mut() {
338            buffer.get_mut()
339        } else {
340            let transmit = self
341                .instance
342                .concurrent_state_mut(self.store.as_context_mut().0)
343                .get_mut(self.id)
344                .unwrap();
345
346            let &ReadState::GuestReady {
347                address,
348                count,
349                options,
350                ..
351            } = &transmit.read
352            else {
353                unreachable!();
354            };
355
356            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
357                unreachable!()
358            };
359
360            options
361                .memory_mut(self.store.0)
362                .get_mut((address + guest_offset)..)
363                .and_then(|b| b.get_mut(..(count - guest_offset)))
364                .unwrap()
365        }
366    }
367
368    /// Mark the specified number of bytes as written to the writer's buffer.
369    ///
370    /// This will panic if the count is larger than the size of the
371    /// buffer returned by `Self::remaining`.
372    pub fn mark_written(&mut self, count: usize) {
373        if let Some(buffer) = self.host_buffer.as_deref_mut() {
374            buffer.set_position(
375                buffer
376                    .position()
377                    .checked_add(u64::try_from(count).unwrap())
378                    .unwrap(),
379            );
380        } else {
381            let transmit = self
382                .instance
383                .concurrent_state_mut(self.store.as_context_mut().0)
384                .get_mut(self.id)
385                .unwrap();
386
387            let ReadState::GuestReady {
388                count: read_count, ..
389            } = &transmit.read
390            else {
391                unreachable!();
392            };
393
394            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
395                unreachable!()
396            };
397
398            if *guest_offset + count > *read_count {
399                panic!(
400                    "write count ({count}) must be less than or equal to read count ({read_count})"
401                )
402            } else {
403                *guest_offset += count;
404            }
405        }
406    }
407}
408
409/// Represents the state of a `Stream{Producer,Consumer}`.
410#[derive(Copy, Clone, Debug)]
411pub enum StreamResult {
412    /// The operation completed normally, and the producer or consumer may be
413    /// able to produce or consume more items, respectively.
414    Completed,
415    /// The operation was interrupted (i.e. it wrapped up early after receiving
416    /// a `finish` parameter value of true in a call to `poll_produce` or
417    /// `poll_consume`), and the producer or consumer may be able to produce or
418    /// consume more items, respectively.
419    Cancelled,
420    /// The operation completed normally, but the producer or consumer will
421    /// _not_ able to produce or consume more items, respectively.
422    Dropped,
423}
424
425/// Represents the host-owned write end of a stream.
426pub trait StreamProducer<D>: Send + 'static {
427    /// The payload type of this stream.
428    type Item;
429
430    /// The `WriteBuffer` type to use when delivering items.
431    type Buffer: WriteBuffer<Self::Item> + Default;
432
433    /// Handle a host- or guest-initiated read by delivering zero or more items
434    /// to the specified destination.
435    ///
436    /// This will be called whenever the reader starts a read.
437    ///
438    /// If the implementation is able to produce one or more items immediately,
439    /// it should write them to `destination` and return either
440    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
441    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
442    /// any more items.
443    ///
444    /// If the implementation is unable to produce any items immediately, but
445    /// expects to do so later, and `finish` is _false_, it should store the
446    /// waker from `cx` for later and return `Poll::Pending` without writing
447    /// anything to `destination`.  Later, it should alert the waker when either
448    /// the items arrive, the stream has ended, or an error occurs.
449    ///
450    /// If the implementation is unable to produce any items immediately, but
451    /// expects to do so later, and `finish` is _true_, it should, if possible,
452    /// return `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without
453    /// writing anything to `destination`.  However, that might not be possible
454    /// if an earlier call to `poll_produce` kicked off an asynchronous
455    /// operation which needs to be completed (and possibly interrupted)
456    /// gracefully, in which case the implementation may return `Poll::Pending`
457    /// and later alert the waker as described above.  In other words, when
458    /// `finish` is true, the implementation should prioritize returning a
459    /// result to the reader (even if no items can be produced) rather than wait
460    /// indefinitely for at least one item to arrive.
461    ///
462    /// In all of the above cases, the implementation may alternatively choose
463    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
464    /// the guest (if any) to trap and render the component instance (if any)
465    /// unusable.  The implementation should report errors that _are_
466    /// recoverable by other means (e.g. by writing to a `future`) and return
467    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
468    ///
469    /// Note that the implementation should never return `Poll::Pending` after
470    /// writing one or more items to `destination`; if it does, the caller will
471    /// trap as if `Err(_)` was returned.  Conversely, it should only return
472    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without writing any items to
473    /// `destination` if called with `finish` set to true.  If it does so when
474    /// `finish` is false, the caller will trap.  Additionally, it should only
475    /// return `Poll::Ready(Ok(StreamResult::Completed))` after writing at least
476    /// one item to `destination` if it has capacity to accept that item;
477    /// otherwise, the caller will trap.
478    ///
479    /// If more items are written to `destination` than the reader has immediate
480    /// capacity to accept, they will be retained in memory by the caller and
481    /// used to satisfy future reads, in which case `poll_produce` will only be
482    /// called again once all those items have been delivered.
483    ///
484    /// If this function is called with zero capacity
485    /// (i.e. `Destination::remaining` returns `Some(0)`), the implementation
486    /// should either:
487    ///
488    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
489    /// anything if it expects to be able to produce items immediately
490    /// (i.e. without first returning `Poll::Pending`) the next time
491    /// `poll_produce` is called with non-zero capacity _or_ if that cannot be
492    /// reliably determined.
493    ///
494    /// - Return `Poll::Pending` if the next call to `poll_produce` with
495    /// non-zero capacity is likely to also return `Poll::Pending`.
496    ///
497    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
498    /// `Destination::set_buffer` with one more more items.  Note, however, that
499    /// this creates the hazard that the items will never be received by the
500    /// guest if it decides not to do another non-zero-length read before
501    /// closing the stream.  Moreover, if `Self::Item` is e.g. a `Resource<_>`,
502    /// they may end up leaking in that scenario.
503    fn poll_produce<'a>(
504        self: Pin<&mut Self>,
505        cx: &mut Context<'_>,
506        store: StoreContextMut<'a, D>,
507        destination: Destination<'a, Self::Item, Self::Buffer>,
508        finish: bool,
509    ) -> Poll<Result<StreamResult>>;
510}
511
512/// Represents the buffer for a host- or guest-initiated stream write.
513pub struct Source<'a, T> {
514    instance: Instance,
515    id: TableId<TransmitState>,
516    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
517}
518
519impl<'a, T> Source<'a, T> {
520    /// Reborrow `self` so it can be used again later.
521    pub fn reborrow(&mut self) -> Source<'_, T> {
522        Source {
523            instance: self.instance,
524            id: self.id,
525            host_buffer: self.host_buffer.as_deref_mut(),
526        }
527    }
528
529    /// Accept zero or more items from the writer.
530    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
531    where
532        T: func::Lift + 'static,
533        B: ReadBuffer<T>,
534    {
535        if let Some(input) = &mut self.host_buffer {
536            let count = input.remaining().len().min(buffer.remaining_capacity());
537            buffer.move_from(*input, count);
538        } else {
539            let store = store.as_context_mut();
540            let transmit = self
541                .instance
542                .concurrent_state_mut(store.0)
543                .get_mut(self.id)?;
544
545            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
546                unreachable!();
547            };
548
549            let &WriteState::GuestReady {
550                ty,
551                address,
552                count,
553                options,
554                ..
555            } = &transmit.write
556            else {
557                unreachable!()
558            };
559
560            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self.instance);
561            let ty = payload(ty, cx.types);
562            let old_remaining = buffer.remaining_capacity();
563            lift::<T, B, S::Data>(
564                cx,
565                ty,
566                buffer,
567                address + (T::SIZE32 * guest_offset),
568                count - guest_offset,
569            )?;
570
571            let transmit = self
572                .instance
573                .concurrent_state_mut(store.0)
574                .get_mut(self.id)?;
575
576            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
577                unreachable!();
578            };
579
580            *guest_offset += old_remaining - buffer.remaining_capacity();
581        }
582
583        Ok(())
584    }
585
586    /// Return the number of items remaining to be read from the current write
587    /// operation.
588    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
589    where
590        T: 'static,
591    {
592        let transmit = self
593            .instance
594            .concurrent_state_mut(store.as_context_mut().0)
595            .get_mut(self.id)
596            .unwrap();
597
598        if let &WriteState::GuestReady { count, .. } = &transmit.write {
599            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
600                unreachable!()
601            };
602
603            count - guest_offset
604        } else if let Some(host_buffer) = &self.host_buffer {
605            host_buffer.remaining().len()
606        } else {
607            unreachable!()
608        }
609    }
610}
611
612impl<'a> Source<'a, u8> {
613    /// Return a `DirectSource` view of `self`.
614    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
615        DirectSource {
616            instance: self.instance,
617            id: self.id,
618            host_buffer: self.host_buffer,
619            store,
620        }
621    }
622}
623
624/// Represents a write to a `stream<u8>`, providing direct access to the
625/// writer's buffer.
626pub struct DirectSource<'a, D: 'static> {
627    instance: Instance,
628    id: TableId<TransmitState>,
629    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
630    store: StoreContextMut<'a, D>,
631}
632
633impl<D: 'static> DirectSource<'_, D> {
634    /// Provide direct access to the writer's buffer.
635    pub fn remaining(&mut self) -> &[u8] {
636        if let Some(buffer) = self.host_buffer.as_deref_mut() {
637            buffer.remaining()
638        } else {
639            let transmit = self
640                .instance
641                .concurrent_state_mut(self.store.as_context_mut().0)
642                .get_mut(self.id)
643                .unwrap();
644
645            let &WriteState::GuestReady {
646                address,
647                count,
648                options,
649                ..
650            } = &transmit.write
651            else {
652                unreachable!()
653            };
654
655            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
656                unreachable!()
657            };
658
659            options
660                .memory(self.store.0)
661                .get((address + guest_offset)..)
662                .and_then(|b| b.get(..(count - guest_offset)))
663                .unwrap()
664        }
665    }
666
667    /// Mark the specified number of bytes as read from the writer's buffer.
668    ///
669    /// This will panic if the count is larger than the size of the buffer
670    /// returned by `Self::remaining`.
671    pub fn mark_read(&mut self, count: usize) {
672        if let Some(buffer) = self.host_buffer.as_deref_mut() {
673            buffer.skip(count);
674        } else {
675            let transmit = self
676                .instance
677                .concurrent_state_mut(self.store.as_context_mut().0)
678                .get_mut(self.id)
679                .unwrap();
680
681            let WriteState::GuestReady {
682                count: write_count, ..
683            } = &transmit.write
684            else {
685                unreachable!()
686            };
687
688            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
689                unreachable!()
690            };
691
692            if *guest_offset + count > *write_count {
693                panic!(
694                    "read count ({count}) must be less than or equal to write count ({write_count})"
695                )
696            } else {
697                *guest_offset += count;
698            }
699        }
700    }
701}
702
703/// Represents the host-owned read end of a stream.
704pub trait StreamConsumer<D>: Send + 'static {
705    /// The payload type of this stream.
706    type Item;
707
708    /// Handle a host- or guest-initiated write by accepting zero or more items
709    /// from the specified source.
710    ///
711    /// This will be called whenever the writer starts a write.
712    ///
713    /// If the implementation is able to consume one or more items immediately,
714    /// it should take them from `source` and return either
715    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
716    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
717    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
718    /// indicate that the caller should delay sending a `COMPLETED` event to the
719    /// writer until a later call to this function returns `Poll::Ready(_)`.
720    /// For more about that, see the `Backpressure` section below.
721    ///
722    /// If the implementation cannot consume any items immediately and `finish`
723    /// is _false_, it should store the waker from `cx` for later and return
724    /// `Poll::Pending` without writing anything to `destination`.  Later, it
725    /// should alert the waker when either (1) the items arrive, (2) the stream
726    /// has ended, or (3) an error occurs.
727    ///
728    /// If the implementation cannot consume any items immediately and `finish`
729    /// is _true_, it should, if possible, return
730    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
731    /// anything from `source`.  However, that might not be possible if an
732    /// earlier call to `poll_consume` kicked off an asynchronous operation
733    /// which needs to be completed (and possibly interrupted) gracefully, in
734    /// which case the implementation may return `Poll::Pending` and later alert
735    /// the waker as described above.  In other words, when `finish` is true,
736    /// the implementation should prioritize returning a result to the reader
737    /// (even if no items can be consumed) rather than wait indefinitely for at
738    /// capacity to free up.
739    ///
740    /// In all of the above cases, the implementation may alternatively choose
741    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
742    /// the guest (if any) to trap and render the component instance (if any)
743    /// unusable.  The implementation should report errors that _are_
744    /// recoverable by other means (e.g. by writing to a `future`) and return
745    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
746    ///
747    /// Note that the implementation should only return
748    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
749    /// items from `source` if called with `finish` set to true.  If it does so
750    /// when `finish` is false, the caller will trap.  Additionally, it should
751    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
752    /// least one item from `source` if there is an item available; otherwise,
753    /// the caller will trap.  If `poll_consume` is called with no items in
754    /// `source`, it should only return `Poll::Ready(_)` once it is able to
755    /// accept at least one item during the next call to `poll_consume`.
756    ///
757    /// Note that any items which the implementation of this trait takes from
758    /// `source` become the responsibility of that implementation.  For that
759    /// reason, an implementation which forwards items to an upstream sink
760    /// should reserve capacity in that sink before taking items out of
761    /// `source`, if possible.  Alternatively, it might buffer items which can't
762    /// be forwarded immediately and send them once capacity is freed up.
763    ///
764    /// ## Backpressure
765    ///
766    /// As mentioned above, an implementation might choose to return
767    /// `Poll::Pending` after taking items from `source`, which tells the caller
768    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
769    /// a form of backpressure when the items are forwarded to an upstream sink
770    /// asynchronously.  Note, however, that it's not possible to "put back"
771    /// items into `source` once they've been taken out, so if the upstream sink
772    /// is unable to accept all the items, that cannot be communicated to the
773    /// writer at this level of abstraction.  Just as with application-specific,
774    /// recoverable errors, information about which items could be forwarded and
775    /// which could not must be communicated out-of-band, e.g. by writing to an
776    /// application-specific `future`.
777    ///
778    /// Similarly, if the writer cancels the write after items have been taken
779    /// from `source` but before the items have all been forwarded to an
780    /// upstream sink, `poll_consume` will be called with `finish` set to true,
781    /// and the implementation may either:
782    ///
783    /// - Interrupt the forwarding process gracefully.  This may be preferable
784    /// if there is an out-of-band channel for communicating to the writer how
785    /// many items were forwarded before being interrupted.
786    ///
787    /// - Allow the forwarding to complete without interrupting it.  This is
788    /// usually preferable if there's no out-of-band channel for reporting back
789    /// to the writer how many items were forwarded.
790    fn poll_consume(
791        self: Pin<&mut Self>,
792        cx: &mut Context<'_>,
793        store: StoreContextMut<D>,
794        source: Source<'_, Self::Item>,
795        finish: bool,
796    ) -> Poll<Result<StreamResult>>;
797}
798
799/// Represents a host-owned write end of a future.
800pub trait FutureProducer<D>: Send + 'static {
801    /// The payload type of this future.
802    type Item;
803
804    /// Handle a host- or guest-initiated read by producing a value.
805    ///
806    /// This is equivalent to `StreamProducer::poll_produce`, but with a
807    /// simplified interface for futures.
808    ///
809    /// If `finish` is true, the implementation may return
810    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
811    /// could produce a value.  Otherwise, it must either return
812    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
813    fn poll_produce(
814        self: Pin<&mut Self>,
815        cx: &mut Context<'_>,
816        store: StoreContextMut<D>,
817        finish: bool,
818    ) -> Poll<Result<Option<Self::Item>>>;
819}
820
821/// Represents a host-owned read end of a future.
822pub trait FutureConsumer<D>: Send + 'static {
823    /// The payload type of this future.
824    type Item;
825
826    /// Handle a host- or guest-initiated write by consuming a value.
827    ///
828    /// This is equivalent to `StreamProducer::poll_produce`, but with a
829    /// simplified interface for futures.
830    ///
831    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
832    /// without taking the item from `source`, which indicates the operation was
833    /// canceled before it could consume the value.  Otherwise, it must either
834    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
835    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
836    /// the item).
837    fn poll_consume(
838        self: Pin<&mut Self>,
839        cx: &mut Context<'_>,
840        store: StoreContextMut<D>,
841        source: Source<'_, Self::Item>,
842        finish: bool,
843    ) -> Poll<Result<()>>;
844}
845
846/// Represents the readable end of a Component Model `future`.
847///
848/// Note that `FutureReader` instances must be disposed of using either `pipe`
849/// or `close`; otherwise the in-store representation will leak and the writer
850/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
851/// ensure that disposal happens automatically.
852pub struct FutureReader<T> {
853    instance: Instance,
854    id: TableId<TransmitHandle>,
855    _phantom: PhantomData<T>,
856}
857
858impl<T> FutureReader<T> {
859    /// Create a new future with the specified producer.
860    pub fn new<S: AsContextMut>(
861        instance: Instance,
862        mut store: S,
863        producer: impl FutureProducer<S::Data, Item = T>,
864    ) -> Self
865    where
866        T: func::Lower + func::Lift + Send + Sync + 'static,
867    {
868        struct Producer<P>(P);
869
870        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
871            for Producer<P>
872        {
873            type Item = P::Item;
874            type Buffer = Option<P::Item>;
875
876            fn poll_produce<'a>(
877                self: Pin<&mut Self>,
878                cx: &mut Context<'_>,
879                store: StoreContextMut<D>,
880                mut destination: Destination<'a, Self::Item, Self::Buffer>,
881                finish: bool,
882            ) -> Poll<Result<StreamResult>> {
883                // SAFETY: This is a standard pin-projection, and we never move
884                // out of `self`.
885                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
886
887                Poll::Ready(Ok(
888                    if let Some(value) = task::ready!(producer.poll_produce(cx, store, finish))? {
889                        destination.set_buffer(Some(value));
890
891                        // Here we return `StreamResult::Completed` even though
892                        // we've produced the last item we'll ever produce.
893                        // That's because the ABI expects
894                        // `ReturnCode::Completed(1)` rather than
895                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
896                        // called again since the future will have resolved.
897                        StreamResult::Completed
898                    } else {
899                        StreamResult::Cancelled
900                    },
901                ))
902            }
903        }
904
905        Self::new_(
906            instance.new_transmit(
907                store.as_context_mut(),
908                TransmitKind::Future,
909                Producer(producer),
910            ),
911            instance,
912        )
913    }
914
915    fn new_(id: TableId<TransmitHandle>, instance: Instance) -> Self {
916        Self {
917            instance,
918            id,
919            _phantom: PhantomData,
920        }
921    }
922
923    /// Set the consumer that accepts the result of this future.
924    pub fn pipe<S: AsContextMut>(
925        self,
926        store: S,
927        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
928    ) where
929        T: func::Lift + 'static,
930    {
931        struct Consumer<C>(C);
932
933        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
934            for Consumer<C>
935        {
936            type Item = T;
937
938            fn poll_consume(
939                self: Pin<&mut Self>,
940                cx: &mut Context<'_>,
941                mut store: StoreContextMut<D>,
942                mut source: Source<Self::Item>,
943                finish: bool,
944            ) -> Poll<Result<StreamResult>> {
945                // SAFETY: This is a standard pin-projection, and we never move
946                // out of `self`.
947                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
948
949                task::ready!(consumer.poll_consume(
950                    cx,
951                    store.as_context_mut(),
952                    source.reborrow(),
953                    finish
954                ))?;
955
956                Poll::Ready(Ok(if source.remaining(store) == 0 {
957                    // Here we return `StreamResult::Completed` even though
958                    // we've consumed the last item we'll ever consume.  That's
959                    // because the ABI expects `ReturnCode::Completed(1)` rather
960                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
961                    // called again since the future will have resolved.
962                    StreamResult::Completed
963                } else {
964                    StreamResult::Cancelled
965                }))
966            }
967        }
968
969        self.instance
970            .set_consumer(store, self.id, TransmitKind::Future, Consumer(consumer));
971    }
972
973    /// Convert this `FutureReader` into a [`Val`].
974    // See TODO comment for `FutureAny`; this is prone to handle leakage.
975    pub fn into_val(self) -> Val {
976        Val::Future(FutureAny(self.id.rep()))
977    }
978
979    /// Attempt to convert the specified [`Val`] to a `FutureReader`.
980    pub fn from_val(
981        mut store: impl AsContextMut<Data: Send>,
982        instance: Instance,
983        value: &Val,
984    ) -> Result<Self> {
985        let Val::Future(FutureAny(rep)) = value else {
986            bail!("expected `future`; got `{}`", value.desc());
987        };
988        let store = store.as_context_mut();
989        let id = TableId::<TransmitHandle>::new(*rep);
990        instance.concurrent_state_mut(store.0).get_mut(id)?; // Just make sure it's present
991        Ok(Self::new_(id, instance))
992    }
993
994    /// Transfer ownership of the read end of a future from a guest to the host.
995    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
996        match ty {
997            InterfaceType::Future(src) => {
998                let handle_table = cx
999                    .instance_mut()
1000                    .table_for_transmit(TransmitIndex::Future(src));
1001                let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1002                if is_done {
1003                    bail!("cannot lift future after being notified that the writable end dropped");
1004                }
1005                let id = TableId::<TransmitHandle>::new(rep);
1006                let concurrent_state = cx.instance_mut().concurrent_state_mut();
1007                let future = concurrent_state.get_mut(id)?;
1008                future.common.handle = None;
1009                let state = future.state;
1010
1011                if concurrent_state.get_mut(state)?.done {
1012                    bail!("cannot lift future after previous read succeeded");
1013                }
1014
1015                Ok(Self::new_(id, cx.instance_handle()))
1016            }
1017            _ => func::bad_type_info(),
1018        }
1019    }
1020
1021    /// Close this `FutureReader`, writing the default value.
1022    ///
1023    /// # Panics
1024    ///
1025    /// Panics if the store that the [`Accessor`] is derived from does not own
1026    /// this future. Usage of this future after calling `close` will also cause
1027    /// a panic.
1028    pub fn close(&mut self, mut store: impl AsContextMut) {
1029        // `self` should never be used again, but leave an invalid handle there just in case.
1030        let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1031        self.instance
1032            .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Future)
1033            .unwrap();
1034    }
1035
1036    /// Convenience method around [`Self::close`].
1037    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1038        accessor.as_accessor().with(|access| self.close(access))
1039    }
1040
1041    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1042    /// drop and clean it up from the store.
1043    ///
1044    /// Note that the `accessor` provided must own this future and is
1045    /// additionally transferred to the `GuardedFutureReader` return value.
1046    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1047    where
1048        A: AsAccessor,
1049    {
1050        GuardedFutureReader::new(accessor, self)
1051    }
1052}
1053
1054impl<T> fmt::Debug for FutureReader<T> {
1055    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1056        f.debug_struct("FutureReader")
1057            .field("id", &self.id)
1058            .field("instance", &self.instance)
1059            .finish()
1060    }
1061}
1062
1063/// Transfer ownership of the read end of a future from the host to a guest.
1064pub(crate) fn lower_future_to_index<U>(
1065    rep: u32,
1066    cx: &mut LowerContext<'_, U>,
1067    ty: InterfaceType,
1068) -> Result<u32> {
1069    match ty {
1070        InterfaceType::Future(dst) => {
1071            let concurrent_state = cx.instance_mut().concurrent_state_mut();
1072            let id = TableId::<TransmitHandle>::new(rep);
1073            let state = concurrent_state.get_mut(id)?.state;
1074            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1075
1076            let handle = cx
1077                .instance_mut()
1078                .table_for_transmit(TransmitIndex::Future(dst))
1079                .future_insert_read(dst, rep)?;
1080
1081            cx.instance_mut()
1082                .concurrent_state_mut()
1083                .get_mut(id)?
1084                .common
1085                .handle = Some(handle);
1086
1087            Ok(handle)
1088        }
1089        _ => func::bad_type_info(),
1090    }
1091}
1092
1093// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1094// safe and correct since we lift and lower future handles as `u32`s.
1095unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1096    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1097
1098    type Lower = <u32 as func::ComponentType>::Lower;
1099
1100    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1101        match ty {
1102            InterfaceType::Future(_) => Ok(()),
1103            other => bail!("expected `future`, found `{}`", func::desc(other)),
1104        }
1105    }
1106}
1107
1108// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1109unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1110    fn linear_lower_to_flat<U>(
1111        &self,
1112        cx: &mut LowerContext<'_, U>,
1113        ty: InterfaceType,
1114        dst: &mut MaybeUninit<Self::Lower>,
1115    ) -> Result<()> {
1116        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1117            cx,
1118            InterfaceType::U32,
1119            dst,
1120        )
1121    }
1122
1123    fn linear_lower_to_memory<U>(
1124        &self,
1125        cx: &mut LowerContext<'_, U>,
1126        ty: InterfaceType,
1127        offset: usize,
1128    ) -> Result<()> {
1129        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1130            cx,
1131            InterfaceType::U32,
1132            offset,
1133        )
1134    }
1135}
1136
1137// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1138unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1139    fn linear_lift_from_flat(
1140        cx: &mut LiftContext<'_>,
1141        ty: InterfaceType,
1142        src: &Self::Lower,
1143    ) -> Result<Self> {
1144        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1145        Self::lift_from_index(cx, ty, index)
1146    }
1147
1148    fn linear_lift_from_memory(
1149        cx: &mut LiftContext<'_>,
1150        ty: InterfaceType,
1151        bytes: &[u8],
1152    ) -> Result<Self> {
1153        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1154        Self::lift_from_index(cx, ty, index)
1155    }
1156}
1157
1158/// A [`FutureReader`] paired with an [`Accessor`].
1159///
1160/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1161/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1162/// [`FutureReader::guard`].
1163pub struct GuardedFutureReader<T, A>
1164where
1165    A: AsAccessor,
1166{
1167    // This field is `None` to implement the conversion from this guard back to
1168    // `FutureReader`. When `None` is seen in the destructor it will cause the
1169    // destructor to do nothing.
1170    reader: Option<FutureReader<T>>,
1171    accessor: A,
1172}
1173
1174impl<T, A> GuardedFutureReader<T, A>
1175where
1176    A: AsAccessor,
1177{
1178    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1179    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1180        Self {
1181            reader: Some(reader),
1182            accessor,
1183        }
1184    }
1185
1186    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1187    /// back.
1188    pub fn into_future(self) -> FutureReader<T> {
1189        self.into()
1190    }
1191}
1192
1193impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1194where
1195    A: AsAccessor,
1196{
1197    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1198        guard.reader.take().unwrap()
1199    }
1200}
1201
1202impl<T, A> Drop for GuardedFutureReader<T, A>
1203where
1204    A: AsAccessor,
1205{
1206    fn drop(&mut self) {
1207        if let Some(reader) = &mut self.reader {
1208            reader.close_with(&self.accessor)
1209        }
1210    }
1211}
1212
1213/// Represents the readable end of a Component Model `stream`.
1214///
1215/// Note that `StreamReader` instances must be disposed of using `close`;
1216/// otherwise the in-store representation will leak and the writer end will hang
1217/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1218/// disposal happens automatically.
1219pub struct StreamReader<T> {
1220    instance: Instance,
1221    id: TableId<TransmitHandle>,
1222    _phantom: PhantomData<T>,
1223}
1224
1225impl<T> StreamReader<T> {
1226    /// Create a new stream with the specified producer.
1227    pub fn new<S: AsContextMut>(
1228        instance: Instance,
1229        store: S,
1230        producer: impl StreamProducer<S::Data, Item = T>,
1231    ) -> Self
1232    where
1233        T: func::Lower + func::Lift + Send + Sync + 'static,
1234    {
1235        Self::new_(
1236            instance.new_transmit(store, TransmitKind::Stream, producer),
1237            instance,
1238        )
1239    }
1240
1241    fn new_(id: TableId<TransmitHandle>, instance: Instance) -> Self {
1242        Self {
1243            instance,
1244            id,
1245            _phantom: PhantomData,
1246        }
1247    }
1248
1249    /// Set the consumer that accepts the items delivered to this stream.
1250    pub fn pipe<S: AsContextMut>(self, store: S, consumer: impl StreamConsumer<S::Data, Item = T>)
1251    where
1252        T: 'static,
1253    {
1254        self.instance
1255            .set_consumer(store, self.id, TransmitKind::Stream, consumer);
1256    }
1257
1258    /// Convert this `StreamReader` into a [`Val`].
1259    // See TODO comment for `StreamAny`; this is prone to handle leakage.
1260    pub fn into_val(self) -> Val {
1261        Val::Stream(StreamAny(self.id.rep()))
1262    }
1263
1264    /// Attempt to convert the specified [`Val`] to a `StreamReader`.
1265    pub fn from_val(
1266        mut store: impl AsContextMut<Data: Send>,
1267        instance: Instance,
1268        value: &Val,
1269    ) -> Result<Self> {
1270        let Val::Stream(StreamAny(rep)) = value else {
1271            bail!("expected `stream`; got `{}`", value.desc());
1272        };
1273        let store = store.as_context_mut();
1274        let id = TableId::<TransmitHandle>::new(*rep);
1275        instance.concurrent_state_mut(store.0).get_mut(id)?; // Just make sure it's present
1276        Ok(Self::new_(id, instance))
1277    }
1278
1279    /// Transfer ownership of the read end of a stream from a guest to the host.
1280    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1281        match ty {
1282            InterfaceType::Stream(src) => {
1283                let handle_table = cx
1284                    .instance_mut()
1285                    .table_for_transmit(TransmitIndex::Stream(src));
1286                let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1287                if is_done {
1288                    bail!("cannot lift stream after being notified that the writable end dropped");
1289                }
1290                let id = TableId::<TransmitHandle>::new(rep);
1291                cx.instance_mut()
1292                    .concurrent_state_mut()
1293                    .get_mut(id)?
1294                    .common
1295                    .handle = None;
1296                Ok(Self::new_(id, cx.instance_handle()))
1297            }
1298            _ => func::bad_type_info(),
1299        }
1300    }
1301
1302    /// Close this `StreamReader`, writing the default value.
1303    ///
1304    /// # Panics
1305    ///
1306    /// Panics if the store that the [`Accessor`] is derived from does not own
1307    /// this future. Usage of this future after calling `close` will also cause
1308    /// a panic.
1309    pub fn close(&mut self, mut store: impl AsContextMut) {
1310        // `self` should never be used again, but leave an invalid handle there just in case.
1311        let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1312        self.instance
1313            .host_drop_reader(store.as_context_mut().0, id, TransmitKind::Stream)
1314            .unwrap()
1315    }
1316
1317    /// Convenience method around [`Self::close`].
1318    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1319        accessor.as_accessor().with(|access| self.close(access))
1320    }
1321
1322    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1323    /// drop and clean it up from the store.
1324    ///
1325    /// Note that the `accessor` provided must own this future and is
1326    /// additionally transferred to the `GuardedStreamReader` return value.
1327    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1328    where
1329        A: AsAccessor,
1330    {
1331        GuardedStreamReader::new(accessor, self)
1332    }
1333}
1334
1335impl<T> fmt::Debug for StreamReader<T> {
1336    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1337        f.debug_struct("StreamReader")
1338            .field("id", &self.id)
1339            .field("instance", &self.instance)
1340            .finish()
1341    }
1342}
1343
1344/// Transfer ownership of the read end of a stream from the host to a guest.
1345pub(crate) fn lower_stream_to_index<U>(
1346    rep: u32,
1347    cx: &mut LowerContext<'_, U>,
1348    ty: InterfaceType,
1349) -> Result<u32> {
1350    match ty {
1351        InterfaceType::Stream(dst) => {
1352            let concurrent_state = cx.instance_mut().concurrent_state_mut();
1353            let id = TableId::<TransmitHandle>::new(rep);
1354            let state = concurrent_state.get_mut(id)?.state;
1355            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1356
1357            let handle = cx
1358                .instance_mut()
1359                .table_for_transmit(TransmitIndex::Stream(dst))
1360                .stream_insert_read(dst, rep)?;
1361
1362            cx.instance_mut()
1363                .concurrent_state_mut()
1364                .get_mut(id)?
1365                .common
1366                .handle = Some(handle);
1367
1368            Ok(handle)
1369        }
1370        _ => func::bad_type_info(),
1371    }
1372}
1373
1374// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1375// safe and correct since we lift and lower stream handles as `u32`s.
1376unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1377    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1378
1379    type Lower = <u32 as func::ComponentType>::Lower;
1380
1381    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1382        match ty {
1383            InterfaceType::Stream(_) => Ok(()),
1384            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1385        }
1386    }
1387}
1388
1389// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1390unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1391    fn linear_lower_to_flat<U>(
1392        &self,
1393        cx: &mut LowerContext<'_, U>,
1394        ty: InterfaceType,
1395        dst: &mut MaybeUninit<Self::Lower>,
1396    ) -> Result<()> {
1397        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1398            cx,
1399            InterfaceType::U32,
1400            dst,
1401        )
1402    }
1403
1404    fn linear_lower_to_memory<U>(
1405        &self,
1406        cx: &mut LowerContext<'_, U>,
1407        ty: InterfaceType,
1408        offset: usize,
1409    ) -> Result<()> {
1410        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1411            cx,
1412            InterfaceType::U32,
1413            offset,
1414        )
1415    }
1416}
1417
1418// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1419unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1420    fn linear_lift_from_flat(
1421        cx: &mut LiftContext<'_>,
1422        ty: InterfaceType,
1423        src: &Self::Lower,
1424    ) -> Result<Self> {
1425        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1426        Self::lift_from_index(cx, ty, index)
1427    }
1428
1429    fn linear_lift_from_memory(
1430        cx: &mut LiftContext<'_>,
1431        ty: InterfaceType,
1432        bytes: &[u8],
1433    ) -> Result<Self> {
1434        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1435        Self::lift_from_index(cx, ty, index)
1436    }
1437}
1438
1439/// A [`StreamReader`] paired with an [`Accessor`].
1440///
1441/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1442/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1443/// [`StreamReader::guard`].
1444pub struct GuardedStreamReader<T, A>
1445where
1446    A: AsAccessor,
1447{
1448    // This field is `None` to implement the conversion from this guard back to
1449    // `StreamReader`. When `None` is seen in the destructor it will cause the
1450    // destructor to do nothing.
1451    reader: Option<StreamReader<T>>,
1452    accessor: A,
1453}
1454
1455impl<T, A> GuardedStreamReader<T, A>
1456where
1457    A: AsAccessor,
1458{
1459    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1460    /// `reader`.
1461    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1462        Self {
1463            reader: Some(reader),
1464            accessor,
1465        }
1466    }
1467
1468    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1469    /// back.
1470    pub fn into_stream(self) -> StreamReader<T> {
1471        self.into()
1472    }
1473}
1474
1475impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1476where
1477    A: AsAccessor,
1478{
1479    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1480        guard.reader.take().unwrap()
1481    }
1482}
1483
1484impl<T, A> Drop for GuardedStreamReader<T, A>
1485where
1486    A: AsAccessor,
1487{
1488    fn drop(&mut self) {
1489        if let Some(reader) = &mut self.reader {
1490            reader.close_with(&self.accessor)
1491        }
1492    }
1493}
1494
1495/// Represents a Component Model `error-context`.
1496pub struct ErrorContext {
1497    rep: u32,
1498}
1499
1500impl ErrorContext {
1501    pub(crate) fn new(rep: u32) -> Self {
1502        Self { rep }
1503    }
1504
1505    /// Convert this `ErrorContext` into a [`Val`].
1506    pub fn into_val(self) -> Val {
1507        Val::ErrorContext(ErrorContextAny(self.rep))
1508    }
1509
1510    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1511    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1512        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1513            bail!("expected `error-context`; got `{}`", value.desc());
1514        };
1515        Ok(Self::new(*rep))
1516    }
1517
1518    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1519        match ty {
1520            InterfaceType::ErrorContext(src) => {
1521                let rep = cx
1522                    .instance_mut()
1523                    .table_for_error_context(src)
1524                    .error_context_rep(index)?;
1525
1526                Ok(Self { rep })
1527            }
1528            _ => func::bad_type_info(),
1529        }
1530    }
1531}
1532
1533pub(crate) fn lower_error_context_to_index<U>(
1534    rep: u32,
1535    cx: &mut LowerContext<'_, U>,
1536    ty: InterfaceType,
1537) -> Result<u32> {
1538    match ty {
1539        InterfaceType::ErrorContext(dst) => {
1540            let tbl = cx.instance_mut().table_for_error_context(dst);
1541            tbl.error_context_insert(rep)
1542        }
1543        _ => func::bad_type_info(),
1544    }
1545}
1546// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1547// safe and correct since we lift and lower future handles as `u32`s.
1548unsafe impl func::ComponentType for ErrorContext {
1549    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1550
1551    type Lower = <u32 as func::ComponentType>::Lower;
1552
1553    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1554        match ty {
1555            InterfaceType::ErrorContext(_) => Ok(()),
1556            other => bail!("expected `error`, found `{}`", func::desc(other)),
1557        }
1558    }
1559}
1560
1561// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1562unsafe impl func::Lower for ErrorContext {
1563    fn linear_lower_to_flat<T>(
1564        &self,
1565        cx: &mut LowerContext<'_, T>,
1566        ty: InterfaceType,
1567        dst: &mut MaybeUninit<Self::Lower>,
1568    ) -> Result<()> {
1569        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1570            cx,
1571            InterfaceType::U32,
1572            dst,
1573        )
1574    }
1575
1576    fn linear_lower_to_memory<T>(
1577        &self,
1578        cx: &mut LowerContext<'_, T>,
1579        ty: InterfaceType,
1580        offset: usize,
1581    ) -> Result<()> {
1582        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1583            cx,
1584            InterfaceType::U32,
1585            offset,
1586        )
1587    }
1588}
1589
1590// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1591unsafe impl func::Lift for ErrorContext {
1592    fn linear_lift_from_flat(
1593        cx: &mut LiftContext<'_>,
1594        ty: InterfaceType,
1595        src: &Self::Lower,
1596    ) -> Result<Self> {
1597        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1598        Self::lift_from_index(cx, ty, index)
1599    }
1600
1601    fn linear_lift_from_memory(
1602        cx: &mut LiftContext<'_>,
1603        ty: InterfaceType,
1604        bytes: &[u8],
1605    ) -> Result<Self> {
1606        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1607        Self::lift_from_index(cx, ty, index)
1608    }
1609}
1610
1611/// Represents the read or write end of a stream or future.
1612pub(super) struct TransmitHandle {
1613    pub(super) common: WaitableCommon,
1614    /// See `TransmitState`
1615    state: TableId<TransmitState>,
1616}
1617
1618impl TransmitHandle {
1619    fn new(state: TableId<TransmitState>) -> Self {
1620        Self {
1621            common: WaitableCommon::default(),
1622            state,
1623        }
1624    }
1625}
1626
1627impl TableDebug for TransmitHandle {
1628    fn type_name() -> &'static str {
1629        "TransmitHandle"
1630    }
1631}
1632
1633/// Represents the state of a stream or future.
1634struct TransmitState {
1635    /// The write end of the stream or future.
1636    write_handle: TableId<TransmitHandle>,
1637    /// The read end of the stream or future.
1638    read_handle: TableId<TransmitHandle>,
1639    /// See `WriteState`
1640    write: WriteState,
1641    /// See `ReadState`
1642    read: ReadState,
1643    /// Whether futher values may be transmitted via this stream or future.
1644    done: bool,
1645}
1646
1647impl Default for TransmitState {
1648    fn default() -> Self {
1649        Self {
1650            write_handle: TableId::new(u32::MAX),
1651            read_handle: TableId::new(u32::MAX),
1652            read: ReadState::Open,
1653            write: WriteState::Open,
1654            done: false,
1655        }
1656    }
1657}
1658
1659impl TableDebug for TransmitState {
1660    fn type_name() -> &'static str {
1661        "TransmitState"
1662    }
1663}
1664
1665type PollStream = Box<
1666    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
1667>;
1668
1669/// Represents the state of the write end of a stream or future.
1670enum WriteState {
1671    /// The write end is open, but no write is pending.
1672    Open,
1673    /// The write end is owned by a guest task and a write is pending.
1674    GuestReady {
1675        ty: TransmitIndex,
1676        flat_abi: Option<FlatAbi>,
1677        options: Options,
1678        address: usize,
1679        count: usize,
1680        handle: u32,
1681    },
1682    /// The write end is owned by the host, which is ready to produce items.
1683    HostReady {
1684        produce: PollStream,
1685        guest_offset: usize,
1686        cancel: bool,
1687        cancel_waker: Option<Waker>,
1688    },
1689    /// The write end has been dropped.
1690    Dropped,
1691}
1692
1693impl fmt::Debug for WriteState {
1694    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1695        match self {
1696            Self::Open => f.debug_tuple("Open").finish(),
1697            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1698            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1699            Self::Dropped => f.debug_tuple("Dropped").finish(),
1700        }
1701    }
1702}
1703
1704/// Represents the state of the read end of a stream or future.
1705enum ReadState {
1706    /// The read end is open, but no read is pending.
1707    Open,
1708    /// The read end is owned by a guest task and a read is pending.
1709    GuestReady {
1710        ty: TransmitIndex,
1711        flat_abi: Option<FlatAbi>,
1712        options: Options,
1713        address: usize,
1714        count: usize,
1715        handle: u32,
1716    },
1717    /// The read end is owned by a host task, and it is ready to consume items.
1718    HostReady {
1719        consume: PollStream,
1720        guest_offset: usize,
1721        cancel: bool,
1722        cancel_waker: Option<Waker>,
1723    },
1724    /// Both the read and write ends are owned by the host.
1725    HostToHost {
1726        accept: Box<
1727            dyn for<'a> Fn(
1728                    &'a mut UntypedWriteBuffer<'a>,
1729                )
1730                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1731                + Send
1732                + Sync,
1733        >,
1734        buffer: Vec<u8>,
1735        limit: usize,
1736    },
1737    /// The read end has been dropped.
1738    Dropped,
1739}
1740
1741impl fmt::Debug for ReadState {
1742    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1743        match self {
1744            Self::Open => f.debug_tuple("Open").finish(),
1745            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1746            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1747            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1748            Self::Dropped => f.debug_tuple("Dropped").finish(),
1749        }
1750    }
1751}
1752
1753fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
1754    let count = guest_offset.try_into().unwrap();
1755    match state {
1756        StreamResult::Dropped => ReturnCode::Dropped(count),
1757        StreamResult::Completed => ReturnCode::completed(kind, count),
1758        StreamResult::Cancelled => ReturnCode::Cancelled(count),
1759    }
1760}
1761
1762impl Instance {
1763    fn new_transmit<S: AsContextMut, P: StreamProducer<S::Data>>(
1764        self,
1765        mut store: S,
1766        kind: TransmitKind,
1767        producer: P,
1768    ) -> TableId<TransmitHandle>
1769    where
1770        P::Item: func::Lower,
1771    {
1772        let mut store = store.as_context_mut();
1773        let token = StoreToken::new(store.as_context_mut());
1774        let state = self.concurrent_state_mut(store.0);
1775        let (_, read) = state.new_transmit().unwrap();
1776        let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
1777        let id = state.get_mut(read).unwrap().state;
1778        let produce = Box::new(move || {
1779            let producer = producer.clone();
1780            async move {
1781                let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
1782
1783                let (result, cancelled) = if buffer.remaining().is_empty() {
1784                    future::poll_fn(|cx| {
1785                        tls::get(|store| {
1786                            let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1787
1788                            let &WriteState::HostReady { cancel, .. } = &transmit.write else {
1789                                unreachable!();
1790                            };
1791
1792                            let mut host_buffer =
1793                                if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
1794                                    Some(Cursor::new(mem::take(buffer)))
1795                                } else {
1796                                    None
1797                                };
1798
1799                            let poll = mine.as_mut().poll_produce(
1800                                cx,
1801                                token.as_context_mut(store),
1802                                Destination {
1803                                    instance: self,
1804                                    id,
1805                                    buffer: &mut buffer,
1806                                    host_buffer: host_buffer.as_mut(),
1807                                    _phantom: PhantomData,
1808                                },
1809                                cancel,
1810                            );
1811
1812                            let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1813
1814                            let host_offset = if let (
1815                                Some(host_buffer),
1816                                ReadState::HostToHost { buffer, limit, .. },
1817                            ) = (host_buffer, &mut transmit.read)
1818                            {
1819                                *limit = usize::try_from(host_buffer.position()).unwrap();
1820                                *buffer = host_buffer.into_inner();
1821                                *limit
1822                            } else {
1823                                0
1824                            };
1825
1826                            {
1827                                let WriteState::HostReady {
1828                                    guest_offset,
1829                                    cancel,
1830                                    cancel_waker,
1831                                    ..
1832                                } = &mut transmit.write
1833                                else {
1834                                    unreachable!();
1835                                };
1836
1837                                if poll.is_pending() {
1838                                    if !buffer.remaining().is_empty()
1839                                        || *guest_offset > 0
1840                                        || host_offset > 0
1841                                    {
1842                                        return Poll::Ready(Err(anyhow!(
1843                                            "StreamProducer::poll_produce returned Poll::Pending \
1844                                             after producing at least one item"
1845                                        )));
1846                                    }
1847                                    *cancel_waker = Some(cx.waker().clone());
1848                                } else {
1849                                    *cancel_waker = None;
1850                                    *cancel = false;
1851                                }
1852                            }
1853
1854                            poll.map(|v| v.map(|result| (result, cancel)))
1855                        })
1856                    })
1857                    .await?
1858                } else {
1859                    (StreamResult::Completed, false)
1860                };
1861
1862                let (guest_offset, host_offset, count) = tls::get(|store| {
1863                    let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1864                    let (count, host_offset) = match &transmit.read {
1865                        &ReadState::GuestReady { count, .. } => (count, 0),
1866                        &ReadState::HostToHost { limit, .. } => (1, limit),
1867                        _ => unreachable!(),
1868                    };
1869                    let guest_offset = match &transmit.write {
1870                        &WriteState::HostReady { guest_offset, .. } => guest_offset,
1871                        _ => unreachable!(),
1872                    };
1873                    (guest_offset, host_offset, count)
1874                });
1875
1876                match result {
1877                    StreamResult::Completed => {
1878                        if count > 1
1879                            && buffer.remaining().is_empty()
1880                            && guest_offset == 0
1881                            && host_offset == 0
1882                        {
1883                            bail!(
1884                                "StreamProducer::poll_produce returned StreamResult::Completed \
1885                                 without producing any items"
1886                            );
1887                        }
1888                    }
1889                    StreamResult::Cancelled => {
1890                        if !cancelled {
1891                            bail!(
1892                                "StreamProducer::poll_produce returned StreamResult::Cancelled \
1893                                 without being given a `finish` parameter value of true"
1894                            );
1895                        }
1896                    }
1897                    StreamResult::Dropped => {}
1898                }
1899
1900                let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
1901
1902                *producer.lock().unwrap() = Some((mine, buffer));
1903
1904                if write_buffer {
1905                    self.write(token, id, producer, kind).await?;
1906                }
1907
1908                Ok(result)
1909            }
1910            .boxed()
1911        });
1912        state.get_mut(id).unwrap().write = WriteState::HostReady {
1913            produce,
1914            guest_offset: 0,
1915            cancel: false,
1916            cancel_waker: None,
1917        };
1918        read
1919    }
1920
1921    fn set_consumer<S: AsContextMut, C: StreamConsumer<S::Data>>(
1922        self,
1923        mut store: S,
1924        id: TableId<TransmitHandle>,
1925        kind: TransmitKind,
1926        consumer: C,
1927    ) {
1928        let mut store = store.as_context_mut();
1929        let token = StoreToken::new(store.as_context_mut());
1930        let state = self.concurrent_state_mut(store.0);
1931        let id = state.get_mut(id).unwrap().state;
1932        let transmit = state.get_mut(id).unwrap();
1933        let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
1934        let consume_with_buffer = {
1935            let consumer = consumer.clone();
1936            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
1937                let mut mine = consumer.lock().unwrap().take().unwrap();
1938
1939                let host_buffer_remaining_before =
1940                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
1941
1942                let (result, cancelled) = future::poll_fn(|cx| {
1943                    tls::get(|store| {
1944                        let cancel =
1945                            match &self.concurrent_state_mut(store).get_mut(id).unwrap().read {
1946                                &ReadState::HostReady { cancel, .. } => cancel,
1947                                ReadState::Open => false,
1948                                _ => unreachable!(),
1949                            };
1950
1951                        let poll = mine.as_mut().poll_consume(
1952                            cx,
1953                            token.as_context_mut(store),
1954                            Source {
1955                                instance: self,
1956                                id,
1957                                host_buffer: host_buffer.as_deref_mut(),
1958                            },
1959                            cancel,
1960                        );
1961
1962                        if let ReadState::HostReady {
1963                            cancel_waker,
1964                            cancel,
1965                            ..
1966                        } = &mut self.concurrent_state_mut(store).get_mut(id).unwrap().read
1967                        {
1968                            if poll.is_pending() {
1969                                *cancel_waker = Some(cx.waker().clone());
1970                            } else {
1971                                *cancel_waker = None;
1972                                *cancel = false;
1973                            }
1974                        }
1975
1976                        poll.map(|v| v.map(|result| (result, cancel)))
1977                    })
1978                })
1979                .await?;
1980
1981                let (guest_offset, count) = tls::get(|store| {
1982                    let transmit = self.concurrent_state_mut(store).get_mut(id).unwrap();
1983                    (
1984                        match &transmit.read {
1985                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
1986                            ReadState::Open => 0,
1987                            _ => unreachable!(),
1988                        },
1989                        match &transmit.write {
1990                            &WriteState::GuestReady { count, .. } => count,
1991                            WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
1992                            _ => unreachable!(),
1993                        },
1994                    )
1995                });
1996
1997                match result {
1998                    StreamResult::Completed => {
1999                        if count > 0
2000                            && guest_offset == 0
2001                            && host_buffer_remaining_before
2002                                .zip(host_buffer.map(|v| v.remaining().len()))
2003                                .map(|(before, after)| before == after)
2004                                .unwrap_or(false)
2005                        {
2006                            bail!(
2007                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2008                                 without consuming any items"
2009                            );
2010                        }
2011
2012                        if let TransmitKind::Future = kind {
2013                            tls::get(|store| {
2014                                self.concurrent_state_mut(store).get_mut(id).unwrap().done = true;
2015                            });
2016                        }
2017                    }
2018                    StreamResult::Cancelled => {
2019                        if !cancelled {
2020                            bail!(
2021                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2022                                 without being given a `finish` parameter value of true"
2023                            );
2024                        }
2025                    }
2026                    StreamResult::Dropped => {}
2027                }
2028
2029                *consumer.lock().unwrap() = Some(mine);
2030
2031                Ok(result)
2032            }
2033        };
2034        let consume = {
2035            let consume = consume_with_buffer.clone();
2036            Box::new(move || {
2037                let consume = consume.clone();
2038                async move { consume(None).await }.boxed()
2039            })
2040        };
2041
2042        match &transmit.write {
2043            WriteState::Open => {
2044                transmit.read = ReadState::HostReady {
2045                    consume,
2046                    guest_offset: 0,
2047                    cancel: false,
2048                    cancel_waker: None,
2049                };
2050            }
2051            WriteState::GuestReady { .. } => {
2052                let future = consume();
2053                transmit.read = ReadState::HostReady {
2054                    consume,
2055                    guest_offset: 0,
2056                    cancel: false,
2057                    cancel_waker: None,
2058                };
2059                self.pipe_from_guest(store.0, kind, id, future);
2060            }
2061            WriteState::HostReady { .. } => {
2062                let WriteState::HostReady { produce, .. } = mem::replace(
2063                    &mut transmit.write,
2064                    WriteState::HostReady {
2065                        produce: Box::new(|| unreachable!()),
2066                        guest_offset: 0,
2067                        cancel: false,
2068                        cancel_waker: None,
2069                    },
2070                ) else {
2071                    unreachable!();
2072                };
2073
2074                transmit.read = ReadState::HostToHost {
2075                    accept: Box::new(move |input| {
2076                        let consume = consume_with_buffer.clone();
2077                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2078                    }),
2079                    buffer: Vec::new(),
2080                    limit: 0,
2081                };
2082
2083                let future = async move {
2084                    loop {
2085                        if tls::get(|store| {
2086                            anyhow::Ok(matches!(
2087                                self.concurrent_state_mut(store).get_mut(id)?.read,
2088                                ReadState::Dropped
2089                            ))
2090                        })? {
2091                            break Ok(());
2092                        }
2093
2094                        match produce().await? {
2095                            StreamResult::Completed | StreamResult::Cancelled => {}
2096                            StreamResult::Dropped => break Ok(()),
2097                        }
2098
2099                        if let TransmitKind::Future = kind {
2100                            break Ok(());
2101                        }
2102                    }
2103                }
2104                .map(move |result| {
2105                    tls::get(|store| self.concurrent_state_mut(store).delete_transmit(id))?;
2106                    result
2107                });
2108
2109                state.push_future(Box::pin(future));
2110            }
2111            WriteState::Dropped => {
2112                let reader = transmit.read_handle;
2113                self.host_drop_reader(store.0, reader, kind).unwrap();
2114            }
2115        }
2116    }
2117
2118    async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2119        self,
2120        token: StoreToken<D>,
2121        id: TableId<TransmitState>,
2122        pair: Arc<Mutex<Option<(P, B)>>>,
2123        kind: TransmitKind,
2124    ) -> Result<()> {
2125        let (read, guest_offset) = tls::get(|store| {
2126            let transmit = self.concurrent_state_mut(store).get_mut(id)?;
2127
2128            let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write
2129            {
2130                Some(guest_offset)
2131            } else {
2132                None
2133            };
2134
2135            anyhow::Ok((
2136                mem::replace(&mut transmit.read, ReadState::Open),
2137                guest_offset,
2138            ))
2139        })?;
2140
2141        match read {
2142            ReadState::GuestReady {
2143                ty,
2144                flat_abi,
2145                options,
2146                address,
2147                count,
2148                handle,
2149            } => {
2150                let guest_offset = guest_offset.unwrap();
2151
2152                if let TransmitKind::Future = kind {
2153                    tls::get(|store| {
2154                        self.concurrent_state_mut(store).get_mut(id)?.done = true;
2155                        anyhow::Ok(())
2156                    })?;
2157                }
2158
2159                let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2160                let accept = {
2161                    let pair = pair.clone();
2162                    move |mut store: StoreContextMut<D>| {
2163                        lower::<T, B, D>(
2164                            store.as_context_mut(),
2165                            self,
2166                            &options,
2167                            ty,
2168                            address + (T::SIZE32 * guest_offset),
2169                            count - guest_offset,
2170                            &mut pair.lock().unwrap().as_mut().unwrap().1,
2171                        )?;
2172                        anyhow::Ok(())
2173                    }
2174                };
2175
2176                if guest_offset < count {
2177                    if T::MAY_REQUIRE_REALLOC {
2178                        // For payloads which may require a realloc call, use a
2179                        // oneshot::channel and background task.  This is
2180                        // necessary because calling the guest while there are
2181                        // host embedder frames on the stack is unsound.
2182                        let (tx, rx) = oneshot::channel();
2183                        tls::get(move |store| {
2184                            self.concurrent_state_mut(store).push_high_priority(
2185                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2186                                    move |store, _| {
2187                                        _ = tx.send(accept(token.as_context_mut(store))?);
2188                                        Ok(())
2189                                    },
2190                                ))),
2191                            )
2192                        });
2193                        rx.await?
2194                    } else {
2195                        // Optimize flat payloads (i.e. those which do not
2196                        // require calling the guest's realloc function) by
2197                        // lowering directly instead of using a oneshot::channel
2198                        // and background task.
2199                        tls::get(|store| accept(token.as_context_mut(store)))?
2200                    };
2201                }
2202
2203                tls::get(|store| {
2204                    let count =
2205                        old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2206
2207                    let transmit = self.concurrent_state_mut(store).get_mut(id)?;
2208
2209                    let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2210                        unreachable!();
2211                    };
2212
2213                    *guest_offset += count;
2214
2215                    transmit.read = ReadState::GuestReady {
2216                        ty,
2217                        flat_abi,
2218                        options,
2219                        address,
2220                        count,
2221                        handle,
2222                    };
2223
2224                    anyhow::Ok(())
2225                })?;
2226
2227                Ok(())
2228            }
2229
2230            ReadState::HostToHost {
2231                accept,
2232                mut buffer,
2233                limit,
2234            } => {
2235                let mut state = StreamResult::Completed;
2236                let mut position = 0;
2237
2238                while !matches!(state, StreamResult::Dropped) && position < limit {
2239                    let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2240                    state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2241                    (buffer, position, _) = slice_buffer.into_parts();
2242                }
2243
2244                {
2245                    let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2246
2247                    while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty())
2248                    {
2249                        state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2250                    }
2251
2252                    *pair.lock().unwrap() = Some((mine, buffer));
2253                }
2254
2255                tls::get(|store| {
2256                    self.concurrent_state_mut(store).get_mut(id)?.read = match state {
2257                        StreamResult::Dropped => ReadState::Dropped,
2258                        StreamResult::Completed | StreamResult::Cancelled => {
2259                            ReadState::HostToHost {
2260                                accept,
2261                                buffer,
2262                                limit: 0,
2263                            }
2264                        }
2265                    };
2266
2267                    anyhow::Ok(())
2268                })?;
2269                Ok(())
2270            }
2271
2272            _ => unreachable!(),
2273        }
2274    }
2275
2276    fn pipe_from_guest(
2277        self,
2278        store: &mut dyn VMStore,
2279        kind: TransmitKind,
2280        id: TableId<TransmitState>,
2281        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2282    ) {
2283        let future = async move {
2284            let stream_state = future.await?;
2285            tls::get(|store| {
2286                let state = self.concurrent_state_mut(store);
2287                let transmit = state.get_mut(id)?;
2288                let ReadState::HostReady {
2289                    consume,
2290                    guest_offset,
2291                    ..
2292                } = mem::replace(&mut transmit.read, ReadState::Open)
2293                else {
2294                    unreachable!();
2295                };
2296                let code = return_code(kind, stream_state, guest_offset);
2297                transmit.read = match stream_state {
2298                    StreamResult::Dropped => ReadState::Dropped,
2299                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2300                        consume,
2301                        guest_offset: 0,
2302                        cancel: false,
2303                        cancel_waker: None,
2304                    },
2305                };
2306                let WriteState::GuestReady { ty, handle, .. } =
2307                    mem::replace(&mut transmit.write, WriteState::Open)
2308                else {
2309                    unreachable!();
2310                };
2311                state.send_write_result(ty, id, handle, code)?;
2312                Ok(())
2313            })
2314        };
2315
2316        self.concurrent_state_mut(store).push_future(future.boxed());
2317    }
2318
2319    fn pipe_to_guest(
2320        self,
2321        store: &mut dyn VMStore,
2322        kind: TransmitKind,
2323        id: TableId<TransmitState>,
2324        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2325    ) {
2326        let future = async move {
2327            let stream_state = future.await?;
2328            tls::get(|store| {
2329                let state = self.concurrent_state_mut(store);
2330                let transmit = state.get_mut(id)?;
2331                let WriteState::HostReady {
2332                    produce,
2333                    guest_offset,
2334                    ..
2335                } = mem::replace(&mut transmit.write, WriteState::Open)
2336                else {
2337                    unreachable!();
2338                };
2339                let code = return_code(kind, stream_state, guest_offset);
2340                transmit.write = match stream_state {
2341                    StreamResult::Dropped => WriteState::Dropped,
2342                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2343                        produce,
2344                        guest_offset: 0,
2345                        cancel: false,
2346                        cancel_waker: None,
2347                    },
2348                };
2349                let ReadState::GuestReady { ty, handle, .. } =
2350                    mem::replace(&mut transmit.read, ReadState::Open)
2351                else {
2352                    unreachable!();
2353                };
2354                state.send_read_result(ty, id, handle, code)?;
2355                Ok(())
2356            })
2357        };
2358
2359        self.concurrent_state_mut(store).push_future(future.boxed());
2360    }
2361
2362    /// Drop the read end of a stream or future read from the host.
2363    fn host_drop_reader(
2364        self,
2365        store: &mut dyn VMStore,
2366        id: TableId<TransmitHandle>,
2367        kind: TransmitKind,
2368    ) -> Result<()> {
2369        let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
2370        let state = self.concurrent_state_mut(store);
2371        let transmit = state
2372            .get_mut(transmit_id)
2373            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2374        log::trace!(
2375            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2376            transmit.read,
2377            transmit.write
2378        );
2379
2380        transmit.read = ReadState::Dropped;
2381
2382        // If the write end is already dropped, it should stay dropped,
2383        // otherwise, it should be opened.
2384        let new_state = if let WriteState::Dropped = &transmit.write {
2385            WriteState::Dropped
2386        } else {
2387            WriteState::Open
2388        };
2389
2390        let write_handle = transmit.write_handle;
2391
2392        match mem::replace(&mut transmit.write, new_state) {
2393            // If a guest is waiting to write, notify it that the read end has
2394            // been dropped.
2395            WriteState::GuestReady { ty, handle, .. } => {
2396                state.update_event(
2397                    write_handle.rep(),
2398                    match ty {
2399                        TransmitIndex::Future(ty) => Event::FutureWrite {
2400                            code: ReturnCode::Dropped(0),
2401                            pending: Some((ty, handle)),
2402                        },
2403                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2404                            code: ReturnCode::Dropped(0),
2405                            pending: Some((ty, handle)),
2406                        },
2407                    },
2408                )?;
2409            }
2410
2411            WriteState::HostReady { .. } => {}
2412
2413            WriteState::Open => {
2414                state.update_event(
2415                    write_handle.rep(),
2416                    match kind {
2417                        TransmitKind::Future => Event::FutureWrite {
2418                            code: ReturnCode::Dropped(0),
2419                            pending: None,
2420                        },
2421                        TransmitKind::Stream => Event::StreamWrite {
2422                            code: ReturnCode::Dropped(0),
2423                            pending: None,
2424                        },
2425                    },
2426                )?;
2427            }
2428
2429            WriteState::Dropped => {
2430                log::trace!("host_drop_reader delete {transmit_id:?}");
2431                state.delete_transmit(transmit_id)?;
2432            }
2433        }
2434        Ok(())
2435    }
2436
2437    /// Drop the write end of a stream or future read from the host.
2438    fn host_drop_writer<U>(
2439        self,
2440        store: StoreContextMut<U>,
2441        id: TableId<TransmitHandle>,
2442        on_drop_open: Option<fn() -> Result<()>>,
2443    ) -> Result<()> {
2444        let transmit_id = self.concurrent_state_mut(store.0).get_mut(id)?.state;
2445        let transmit = self
2446            .concurrent_state_mut(store.0)
2447            .get_mut(transmit_id)
2448            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2449        log::trace!(
2450            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2451            transmit.read,
2452            transmit.write
2453        );
2454
2455        // Existing queued transmits must be updated with information for the impending writer closure
2456        match &mut transmit.write {
2457            WriteState::GuestReady { .. } => {
2458                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2459            }
2460            WriteState::HostReady { .. } => {}
2461            v @ WriteState::Open => {
2462                if let (Some(on_drop_open), false) = (
2463                    on_drop_open,
2464                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2465                ) {
2466                    on_drop_open()?;
2467                } else {
2468                    *v = WriteState::Dropped;
2469                }
2470            }
2471            WriteState::Dropped => unreachable!("write state is already dropped"),
2472        }
2473
2474        let transmit = self.concurrent_state_mut(store.0).get_mut(transmit_id)?;
2475
2476        // If the existing read state is dropped, then there's nothing to read
2477        // and we can keep it that way.
2478        //
2479        // If the read state was any other state, then we must set the new state to open
2480        // to indicate that there *is* data to be read
2481        let new_state = if let ReadState::Dropped = &transmit.read {
2482            ReadState::Dropped
2483        } else {
2484            ReadState::Open
2485        };
2486
2487        let read_handle = transmit.read_handle;
2488
2489        // Swap in the new read state
2490        match mem::replace(&mut transmit.read, new_state) {
2491            // If the guest was ready to read, then we cannot drop the reader (or writer);
2492            // we must deliver the event, and update the state associated with the handle to
2493            // represent that a read must be performed
2494            ReadState::GuestReady { ty, handle, .. } => {
2495                // Ensure the final read of the guest is queued, with appropriate closure indicator
2496                self.concurrent_state_mut(store.0).update_event(
2497                    read_handle.rep(),
2498                    match ty {
2499                        TransmitIndex::Future(ty) => Event::FutureRead {
2500                            code: ReturnCode::Dropped(0),
2501                            pending: Some((ty, handle)),
2502                        },
2503                        TransmitIndex::Stream(ty) => Event::StreamRead {
2504                            code: ReturnCode::Dropped(0),
2505                            pending: Some((ty, handle)),
2506                        },
2507                    },
2508                )?;
2509            }
2510
2511            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2512
2513            // If the read state is open, then there are no registered readers of the stream/future
2514            ReadState::Open => {
2515                self.concurrent_state_mut(store.0).update_event(
2516                    read_handle.rep(),
2517                    match on_drop_open {
2518                        Some(_) => Event::FutureRead {
2519                            code: ReturnCode::Dropped(0),
2520                            pending: None,
2521                        },
2522                        None => Event::StreamRead {
2523                            code: ReturnCode::Dropped(0),
2524                            pending: None,
2525                        },
2526                    },
2527                )?;
2528            }
2529
2530            // If the read state was already dropped, then we can remove the transmit state completely
2531            // (both writer and reader have been dropped)
2532            ReadState::Dropped => {
2533                log::trace!("host_drop_writer delete {transmit_id:?}");
2534                self.concurrent_state_mut(store.0)
2535                    .delete_transmit(transmit_id)?;
2536            }
2537        }
2538        Ok(())
2539    }
2540
2541    /// Drop the writable end of the specified stream or future from the guest.
2542    pub(super) fn guest_drop_writable<T>(
2543        self,
2544        store: StoreContextMut<T>,
2545        ty: TransmitIndex,
2546        writer: u32,
2547    ) -> Result<()> {
2548        let table = self.id().get_mut(store.0).table_for_transmit(ty);
2549        let transmit_rep = match ty {
2550            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2551            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2552        };
2553
2554        let id = TableId::<TransmitHandle>::new(transmit_rep);
2555        log::trace!("guest_drop_writable: drop writer {id:?}");
2556        match ty {
2557            TransmitIndex::Stream(_) => self.host_drop_writer(store, id, None),
2558            TransmitIndex::Future(_) => self.host_drop_writer(
2559                store,
2560                id,
2561                Some(|| {
2562                    Err(anyhow!(
2563                        "cannot drop future write end without first writing a value"
2564                    ))
2565                }),
2566            ),
2567        }
2568    }
2569
2570    /// Copy `count` items from `read_address` to `write_address` for the
2571    /// specified stream or future.
2572    fn copy<T: 'static>(
2573        self,
2574        mut store: StoreContextMut<T>,
2575        flat_abi: Option<FlatAbi>,
2576        write_ty: TransmitIndex,
2577        write_options: &Options,
2578        write_address: usize,
2579        read_ty: TransmitIndex,
2580        read_options: &Options,
2581        read_address: usize,
2582        count: usize,
2583        rep: u32,
2584    ) -> Result<()> {
2585        let types = self.id().get(store.0).component().types().clone();
2586        match (write_ty, read_ty) {
2587            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2588                assert_eq!(count, 1);
2589
2590                let val = types[types[write_ty].ty]
2591                    .payload
2592                    .map(|ty| {
2593                        let abi = types.canonical_abi(&ty);
2594                        // FIXME: needs to read an i64 for memory64
2595                        if write_address % usize::try_from(abi.align32)? != 0 {
2596                            bail!("write pointer not aligned");
2597                        }
2598
2599                        let lift =
2600                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2601                        let bytes = lift
2602                            .memory()
2603                            .get(write_address..)
2604                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2605                            .ok_or_else(|| {
2606                                anyhow::anyhow!("write pointer out of bounds of memory")
2607                            })?;
2608
2609                        Val::load(lift, ty, bytes)
2610                    })
2611                    .transpose()?;
2612
2613                if let Some(val) = val {
2614                    let lower =
2615                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2616                    let ty = types[types[read_ty].ty].payload.unwrap();
2617                    let ptr = func::validate_inbounds_dynamic(
2618                        types.canonical_abi(&ty),
2619                        lower.as_slice_mut(),
2620                        &ValRaw::u32(read_address.try_into().unwrap()),
2621                    )?;
2622                    val.store(lower, ty, ptr)?;
2623                }
2624            }
2625            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2626                if let Some(flat_abi) = flat_abi {
2627                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
2628                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2629                    if length_in_bytes > 0 {
2630                        if write_address % usize::try_from(flat_abi.align)? != 0 {
2631                            bail!("write pointer not aligned");
2632                        }
2633                        if read_address % usize::try_from(flat_abi.align)? != 0 {
2634                            bail!("read pointer not aligned");
2635                        }
2636
2637                        let store_opaque = store.0.store_opaque_mut();
2638
2639                        {
2640                            let src = write_options
2641                                .memory(store_opaque)
2642                                .get(write_address..)
2643                                .and_then(|b| b.get(..length_in_bytes))
2644                                .ok_or_else(|| {
2645                                    anyhow::anyhow!("write pointer out of bounds of memory")
2646                                })?
2647                                .as_ptr();
2648                            let dst = read_options
2649                                .memory_mut(store_opaque)
2650                                .get_mut(read_address..)
2651                                .and_then(|b| b.get_mut(..length_in_bytes))
2652                                .ok_or_else(|| {
2653                                    anyhow::anyhow!("read pointer out of bounds of memory")
2654                                })?
2655                                .as_mut_ptr();
2656                            // SAFETY: Both `src` and `dst` have been validated
2657                            // above.
2658                            unsafe { src.copy_to(dst, length_in_bytes) };
2659                        }
2660                    }
2661                } else {
2662                    let store_opaque = store.0.store_opaque_mut();
2663                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
2664                    let ty = types[types[write_ty].ty].payload.unwrap();
2665                    let abi = lift.types.canonical_abi(&ty);
2666                    let size = usize::try_from(abi.size32).unwrap();
2667                    if write_address % usize::try_from(abi.align32)? != 0 {
2668                        bail!("write pointer not aligned");
2669                    }
2670                    let bytes = lift
2671                        .memory()
2672                        .get(write_address..)
2673                        .and_then(|b| b.get(..size * count))
2674                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
2675
2676                    let values = (0..count)
2677                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
2678                        .collect::<Result<Vec<_>>>()?;
2679
2680                    let id = TableId::<TransmitHandle>::new(rep);
2681                    log::trace!("copy values {values:?} for {id:?}");
2682
2683                    let lower =
2684                        &mut LowerContext::new(store.as_context_mut(), read_options, &types, self);
2685                    let ty = types[types[read_ty].ty].payload.unwrap();
2686                    let abi = lower.types.canonical_abi(&ty);
2687                    if read_address % usize::try_from(abi.align32)? != 0 {
2688                        bail!("read pointer not aligned");
2689                    }
2690                    let size = usize::try_from(abi.size32).unwrap();
2691                    lower
2692                        .as_slice_mut()
2693                        .get_mut(read_address..)
2694                        .and_then(|b| b.get_mut(..size * count))
2695                        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
2696                    let mut ptr = read_address;
2697                    for value in values {
2698                        value.store(lower, ty, ptr)?;
2699                        ptr += size
2700                    }
2701                }
2702            }
2703            _ => unreachable!(),
2704        }
2705
2706        Ok(())
2707    }
2708
2709    fn check_bounds(
2710        self,
2711        store: &StoreOpaque,
2712        options: &Options,
2713        ty: TransmitIndex,
2714        address: usize,
2715        count: usize,
2716    ) -> Result<()> {
2717        let types = self.id().get(store).component().types().clone();
2718        let size = usize::try_from(
2719            match ty {
2720                TransmitIndex::Future(ty) => types[types[ty].ty]
2721                    .payload
2722                    .map(|ty| types.canonical_abi(&ty).size32),
2723                TransmitIndex::Stream(ty) => types[types[ty].ty]
2724                    .payload
2725                    .map(|ty| types.canonical_abi(&ty).size32),
2726            }
2727            .unwrap_or(0),
2728        )
2729        .unwrap();
2730
2731        if count > 0 && size > 0 {
2732            options
2733                .memory(store)
2734                .get(address..)
2735                .and_then(|b| b.get(..(size * count)))
2736                .map(drop)
2737                .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
2738        } else {
2739            Ok(())
2740        }
2741    }
2742
2743    /// Write to the specified stream or future from the guest.
2744    pub(super) fn guest_write<T: 'static>(
2745        self,
2746        mut store: StoreContextMut<T>,
2747        ty: TransmitIndex,
2748        options: OptionsIndex,
2749        flat_abi: Option<FlatAbi>,
2750        handle: u32,
2751        address: u32,
2752        count: u32,
2753    ) -> Result<ReturnCode> {
2754        let address = usize::try_from(address).unwrap();
2755        let count = usize::try_from(count).unwrap();
2756        let options = Options::new_index(store.0, self, options);
2757        self.check_bounds(store.0, &options, ty, address, count)?;
2758        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
2759        let TransmitLocalState::Write { done } = *state else {
2760            bail!(
2761                "invalid handle {handle}; expected `Write`; got {:?}",
2762                *state
2763            );
2764        };
2765
2766        if done {
2767            bail!("cannot write to stream after being notified that the readable end dropped");
2768        }
2769
2770        *state = TransmitLocalState::Busy;
2771        let transmit_handle = TableId::<TransmitHandle>::new(rep);
2772        let concurrent_state = self.concurrent_state_mut(store.0);
2773        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
2774        let transmit = concurrent_state.get_mut(transmit_id)?;
2775        log::trace!(
2776            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
2777            transmit.read
2778        );
2779
2780        if transmit.done {
2781            bail!("cannot write to future after previous write succeeded or readable end dropped");
2782        }
2783
2784        let new_state = if let ReadState::Dropped = &transmit.read {
2785            ReadState::Dropped
2786        } else {
2787            ReadState::Open
2788        };
2789
2790        let set_guest_ready = |me: &mut ConcurrentState| {
2791            let transmit = me.get_mut(transmit_id)?;
2792            assert!(matches!(&transmit.write, WriteState::Open));
2793            transmit.write = WriteState::GuestReady {
2794                ty,
2795                flat_abi,
2796                options,
2797                address,
2798                count,
2799                handle,
2800            };
2801            Ok::<_, crate::Error>(())
2802        };
2803
2804        let mut result = match mem::replace(&mut transmit.read, new_state) {
2805            ReadState::GuestReady {
2806                ty: read_ty,
2807                flat_abi: read_flat_abi,
2808                options: read_options,
2809                address: read_address,
2810                count: read_count,
2811                handle: read_handle,
2812            } => {
2813                assert_eq!(flat_abi, read_flat_abi);
2814
2815                if let TransmitIndex::Future(_) = ty {
2816                    transmit.done = true;
2817                }
2818
2819                // Note that zero-length reads and writes are handling specially
2820                // by the spec to allow each end to signal readiness to the
2821                // other.  Quoting the spec:
2822                //
2823                // ```
2824                // The meaning of a read or write when the length is 0 is that
2825                // the caller is querying the "readiness" of the other
2826                // side. When a 0-length read/write rendezvous with a
2827                // non-0-length read/write, only the 0-length read/write
2828                // completes; the non-0-length read/write is kept pending (and
2829                // ready for a subsequent rendezvous).
2830                //
2831                // In the corner case where a 0-length read and write
2832                // rendezvous, only the writer is notified of readiness. To
2833                // avoid livelock, the Canonical ABI requires that a writer must
2834                // (eventually) follow a completed 0-length write with a
2835                // non-0-length write that is allowed to block (allowing the
2836                // reader end to run and rendezvous with its own non-0-length
2837                // read).
2838                // ```
2839
2840                let write_complete = count == 0 || read_count > 0;
2841                let read_complete = count > 0;
2842                let read_buffer_remaining = count < read_count;
2843
2844                let read_handle_rep = transmit.read_handle.rep();
2845
2846                let count = count.min(read_count);
2847
2848                self.copy(
2849                    store.as_context_mut(),
2850                    flat_abi,
2851                    ty,
2852                    &options,
2853                    address,
2854                    read_ty,
2855                    &read_options,
2856                    read_address,
2857                    count,
2858                    rep,
2859                )?;
2860
2861                let instance = self.id().get_mut(store.0);
2862                let types = instance.component().types();
2863                let item_size = payload(ty, types)
2864                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
2865                    .unwrap_or(0);
2866                let concurrent_state = instance.concurrent_state_mut();
2867                if read_complete {
2868                    let count = u32::try_from(count).unwrap();
2869                    let total = if let Some(Event::StreamRead {
2870                        code: ReturnCode::Completed(old_total),
2871                        ..
2872                    }) = concurrent_state.take_event(read_handle_rep)?
2873                    {
2874                        count + old_total
2875                    } else {
2876                        count
2877                    };
2878
2879                    let code = ReturnCode::completed(ty.kind(), total);
2880
2881                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
2882                }
2883
2884                if read_buffer_remaining {
2885                    let transmit = concurrent_state.get_mut(transmit_id)?;
2886                    transmit.read = ReadState::GuestReady {
2887                        ty: read_ty,
2888                        flat_abi: read_flat_abi,
2889                        options: read_options,
2890                        address: read_address + (count * item_size),
2891                        count: read_count - count,
2892                        handle: read_handle,
2893                    };
2894                }
2895
2896                if write_complete {
2897                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
2898                } else {
2899                    set_guest_ready(concurrent_state)?;
2900                    ReturnCode::Blocked
2901                }
2902            }
2903
2904            ReadState::HostReady {
2905                consume,
2906                guest_offset,
2907                cancel,
2908                cancel_waker,
2909            } => {
2910                assert!(cancel_waker.is_none());
2911                assert!(!cancel);
2912                assert_eq!(0, guest_offset);
2913
2914                if let TransmitIndex::Future(_) = ty {
2915                    transmit.done = true;
2916                }
2917
2918                set_guest_ready(concurrent_state)?;
2919                self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
2920            }
2921
2922            ReadState::HostToHost { .. } => unreachable!(),
2923
2924            ReadState::Open => {
2925                set_guest_ready(concurrent_state)?;
2926                ReturnCode::Blocked
2927            }
2928
2929            ReadState::Dropped => {
2930                if let TransmitIndex::Future(_) = ty {
2931                    transmit.done = true;
2932                }
2933
2934                ReturnCode::Dropped(0)
2935            }
2936        };
2937
2938        if result == ReturnCode::Blocked && !options.async_() {
2939            result = self.wait_for_write(store.0, transmit_handle)?;
2940        }
2941
2942        if result != ReturnCode::Blocked {
2943            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
2944                TransmitLocalState::Write {
2945                    done: matches!(
2946                        (result, ty),
2947                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
2948                    ),
2949                };
2950        }
2951
2952        log::trace!(
2953            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
2954        );
2955
2956        Ok(result)
2957    }
2958
2959    /// Handle a host- or guest-initiated write by delivering the item(s) to the
2960    /// `StreamConsumer` for the specified stream or future.
2961    fn consume(
2962        self,
2963        store: &mut dyn VMStore,
2964        kind: TransmitKind,
2965        transmit_id: TableId<TransmitState>,
2966        consume: PollStream,
2967        guest_offset: usize,
2968        cancel: bool,
2969    ) -> Result<ReturnCode> {
2970        let mut future = consume();
2971        self.concurrent_state_mut(store).get_mut(transmit_id)?.read = ReadState::HostReady {
2972            consume,
2973            guest_offset,
2974            cancel,
2975            cancel_waker: None,
2976        };
2977        let poll = self.set_tls(store, || {
2978            future
2979                .as_mut()
2980                .poll(&mut Context::from_waker(&Waker::noop()))
2981        });
2982
2983        Ok(match poll {
2984            Poll::Ready(state) => {
2985                let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
2986                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2987                    unreachable!();
2988                };
2989                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2990                transmit.write = WriteState::Open;
2991                code
2992            }
2993            Poll::Pending => {
2994                self.pipe_from_guest(store, kind, transmit_id, future);
2995                ReturnCode::Blocked
2996            }
2997        })
2998    }
2999
3000    /// Read from the specified stream or future from the guest.
3001    pub(super) fn guest_read<T: 'static>(
3002        self,
3003        mut store: StoreContextMut<T>,
3004        ty: TransmitIndex,
3005        options: OptionsIndex,
3006        flat_abi: Option<FlatAbi>,
3007        handle: u32,
3008        address: u32,
3009        count: u32,
3010    ) -> Result<ReturnCode> {
3011        let address = usize::try_from(address).unwrap();
3012        let count = usize::try_from(count).unwrap();
3013        let options = Options::new_index(store.0, self, options);
3014        self.check_bounds(store.0, &options, ty, address, count)?;
3015        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3016        let TransmitLocalState::Read { done } = *state else {
3017            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3018        };
3019
3020        if done {
3021            bail!("cannot read from stream after being notified that the writable end dropped");
3022        }
3023
3024        *state = TransmitLocalState::Busy;
3025        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3026        let concurrent_state = self.concurrent_state_mut(store.0);
3027        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3028        let transmit = concurrent_state.get_mut(transmit_id)?;
3029        log::trace!(
3030            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3031            transmit.write
3032        );
3033
3034        if transmit.done {
3035            bail!("cannot read from future after previous read succeeded");
3036        }
3037
3038        let new_state = if let WriteState::Dropped = &transmit.write {
3039            WriteState::Dropped
3040        } else {
3041            WriteState::Open
3042        };
3043
3044        let set_guest_ready = |me: &mut ConcurrentState| {
3045            let transmit = me.get_mut(transmit_id)?;
3046            assert!(matches!(&transmit.read, ReadState::Open));
3047            transmit.read = ReadState::GuestReady {
3048                ty,
3049                flat_abi,
3050                options,
3051                address,
3052                count,
3053                handle,
3054            };
3055            Ok::<_, crate::Error>(())
3056        };
3057
3058        let mut result = match mem::replace(&mut transmit.write, new_state) {
3059            WriteState::GuestReady {
3060                ty: write_ty,
3061                flat_abi: write_flat_abi,
3062                options: write_options,
3063                address: write_address,
3064                count: write_count,
3065                handle: write_handle,
3066            } => {
3067                assert_eq!(flat_abi, write_flat_abi);
3068
3069                if let TransmitIndex::Future(_) = ty {
3070                    transmit.done = true;
3071                }
3072
3073                let write_handle_rep = transmit.write_handle.rep();
3074
3075                // See the comment in `guest_write` for the
3076                // `ReadState::GuestReady` case concerning zero-length reads and
3077                // writes.
3078
3079                let write_complete = write_count == 0 || count > 0;
3080                let read_complete = write_count > 0;
3081                let write_buffer_remaining = count < write_count;
3082
3083                let count = count.min(write_count);
3084
3085                self.copy(
3086                    store.as_context_mut(),
3087                    flat_abi,
3088                    write_ty,
3089                    &write_options,
3090                    write_address,
3091                    ty,
3092                    &options,
3093                    address,
3094                    count,
3095                    rep,
3096                )?;
3097
3098                let instance = self.id().get_mut(store.0);
3099                let types = instance.component().types();
3100                let item_size = payload(ty, types)
3101                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3102                    .unwrap_or(0);
3103                let concurrent_state = instance.concurrent_state_mut();
3104
3105                if write_complete {
3106                    let count = u32::try_from(count).unwrap();
3107                    let total = if let Some(Event::StreamWrite {
3108                        code: ReturnCode::Completed(old_total),
3109                        ..
3110                    }) = concurrent_state.take_event(write_handle_rep)?
3111                    {
3112                        count + old_total
3113                    } else {
3114                        count
3115                    };
3116
3117                    let code = ReturnCode::completed(ty.kind(), total);
3118
3119                    concurrent_state.send_write_result(
3120                        write_ty,
3121                        transmit_id,
3122                        write_handle,
3123                        code,
3124                    )?;
3125                }
3126
3127                if write_buffer_remaining {
3128                    let transmit = concurrent_state.get_mut(transmit_id)?;
3129                    transmit.write = WriteState::GuestReady {
3130                        ty: write_ty,
3131                        flat_abi: write_flat_abi,
3132                        options: write_options,
3133                        address: write_address + (count * item_size),
3134                        count: write_count - count,
3135                        handle: write_handle,
3136                    };
3137                }
3138
3139                if read_complete {
3140                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3141                } else {
3142                    set_guest_ready(concurrent_state)?;
3143                    ReturnCode::Blocked
3144                }
3145            }
3146
3147            WriteState::HostReady {
3148                produce,
3149                guest_offset,
3150                cancel,
3151                cancel_waker,
3152            } => {
3153                assert!(cancel_waker.is_none());
3154                assert!(!cancel);
3155                assert_eq!(0, guest_offset);
3156
3157                if let TransmitIndex::Future(_) = ty {
3158                    transmit.done = true;
3159                }
3160
3161                set_guest_ready(concurrent_state)?;
3162
3163                self.produce(store.0, ty.kind(), transmit_id, produce, 0, false)?
3164            }
3165
3166            WriteState::Open => {
3167                set_guest_ready(concurrent_state)?;
3168                ReturnCode::Blocked
3169            }
3170
3171            WriteState::Dropped => ReturnCode::Dropped(0),
3172        };
3173
3174        if result == ReturnCode::Blocked && !options.async_() {
3175            result = self.wait_for_read(store.0, transmit_handle)?;
3176        }
3177
3178        if result != ReturnCode::Blocked {
3179            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3180                TransmitLocalState::Read {
3181                    done: matches!(
3182                        (result, ty),
3183                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3184                    ),
3185                };
3186        }
3187
3188        log::trace!(
3189            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3190        );
3191
3192        Ok(result)
3193    }
3194
3195    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
3196    /// for the specified stream or future for items.
3197    fn produce(
3198        self,
3199        store: &mut dyn VMStore,
3200        kind: TransmitKind,
3201        transmit_id: TableId<TransmitState>,
3202        produce: PollStream,
3203        guest_offset: usize,
3204        cancel: bool,
3205    ) -> Result<ReturnCode> {
3206        let mut future = produce();
3207        self.concurrent_state_mut(store).get_mut(transmit_id)?.write = WriteState::HostReady {
3208            produce,
3209            guest_offset,
3210            cancel,
3211            cancel_waker: None,
3212        };
3213        let poll = self.set_tls(store, || {
3214            future
3215                .as_mut()
3216                .poll(&mut Context::from_waker(&Waker::noop()))
3217        });
3218
3219        Ok(match poll {
3220            Poll::Ready(state) => {
3221                let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3222                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3223                    unreachable!();
3224                };
3225                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3226                transmit.read = ReadState::Open;
3227                code
3228            }
3229            Poll::Pending => {
3230                self.pipe_to_guest(store, kind, transmit_id, future);
3231                ReturnCode::Blocked
3232            }
3233        })
3234    }
3235
3236    fn wait_for_write(
3237        self,
3238        store: &mut dyn VMStore,
3239        handle: TableId<TransmitHandle>,
3240    ) -> Result<ReturnCode> {
3241        let waitable = Waitable::Transmit(handle);
3242        self.wait_for_event(store, waitable)?;
3243        let event = waitable.take_event(self.concurrent_state_mut(store))?;
3244        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3245            event
3246        {
3247            waitable.on_delivery(self.id().get_mut(store), event);
3248            Ok(code)
3249        } else {
3250            unreachable!()
3251        }
3252    }
3253
3254    /// Cancel a pending stream or future write.
3255    fn cancel_write(
3256        self,
3257        store: &mut dyn VMStore,
3258        transmit_id: TableId<TransmitState>,
3259        async_: bool,
3260    ) -> Result<ReturnCode> {
3261        let state = self.concurrent_state_mut(store);
3262        let transmit = state.get_mut(transmit_id)?;
3263        log::trace!(
3264            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3265            transmit.read,
3266            transmit.write
3267        );
3268
3269        let code = if let Some(event) =
3270            Waitable::Transmit(transmit.write_handle).take_event(state)?
3271        {
3272            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3273                unreachable!();
3274            };
3275            match (code, event) {
3276                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3277                    ReturnCode::Cancelled(count)
3278                }
3279                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3280                _ => unreachable!(),
3281            }
3282        } else if let ReadState::HostReady {
3283            cancel,
3284            cancel_waker,
3285            ..
3286        } = &mut state.get_mut(transmit_id)?.read
3287        {
3288            *cancel = true;
3289            if let Some(waker) = cancel_waker.take() {
3290                waker.wake();
3291            }
3292
3293            if async_ {
3294                ReturnCode::Blocked
3295            } else {
3296                let handle = self
3297                    .concurrent_state_mut(store)
3298                    .get_mut(transmit_id)?
3299                    .write_handle;
3300                self.wait_for_write(store, handle)?
3301            }
3302        } else {
3303            ReturnCode::Cancelled(0)
3304        };
3305
3306        let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3307
3308        match &transmit.write {
3309            WriteState::GuestReady { .. } => {
3310                transmit.write = WriteState::Open;
3311            }
3312            WriteState::HostReady { .. } => todo!("support host write cancellation"),
3313            WriteState::Open | WriteState::Dropped => {}
3314        }
3315
3316        log::trace!("cancelled write {transmit_id:?}: {code:?}");
3317
3318        Ok(code)
3319    }
3320
3321    fn wait_for_read(
3322        self,
3323        store: &mut dyn VMStore,
3324        handle: TableId<TransmitHandle>,
3325    ) -> Result<ReturnCode> {
3326        let waitable = Waitable::Transmit(handle);
3327        self.wait_for_event(store, waitable)?;
3328        let event = waitable.take_event(self.concurrent_state_mut(store))?;
3329        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3330            event
3331        {
3332            waitable.on_delivery(self.id().get_mut(store), event);
3333            Ok(code)
3334        } else {
3335            unreachable!()
3336        }
3337    }
3338
3339    /// Cancel a pending stream or future read.
3340    fn cancel_read(
3341        self,
3342        store: &mut dyn VMStore,
3343        transmit_id: TableId<TransmitState>,
3344        async_: bool,
3345    ) -> Result<ReturnCode> {
3346        let state = self.concurrent_state_mut(store);
3347        let transmit = state.get_mut(transmit_id)?;
3348        log::trace!(
3349            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3350            transmit.read,
3351            transmit.write
3352        );
3353
3354        let code = if let Some(event) =
3355            Waitable::Transmit(transmit.read_handle).take_event(state)?
3356        {
3357            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3358                unreachable!();
3359            };
3360            match (code, event) {
3361                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3362                    ReturnCode::Cancelled(count)
3363                }
3364                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3365                _ => unreachable!(),
3366            }
3367        } else if let WriteState::HostReady {
3368            cancel,
3369            cancel_waker,
3370            ..
3371        } = &mut state.get_mut(transmit_id)?.write
3372        {
3373            *cancel = true;
3374            if let Some(waker) = cancel_waker.take() {
3375                waker.wake();
3376            }
3377
3378            if async_ {
3379                ReturnCode::Blocked
3380            } else {
3381                let handle = self
3382                    .concurrent_state_mut(store)
3383                    .get_mut(transmit_id)?
3384                    .read_handle;
3385                self.wait_for_read(store, handle)?
3386            }
3387        } else {
3388            ReturnCode::Cancelled(0)
3389        };
3390
3391        let transmit = self.concurrent_state_mut(store).get_mut(transmit_id)?;
3392
3393        match &transmit.read {
3394            ReadState::GuestReady { .. } => {
3395                transmit.read = ReadState::Open;
3396            }
3397            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3398                todo!("support host read cancellation")
3399            }
3400            ReadState::Open | ReadState::Dropped => {}
3401        }
3402
3403        log::trace!("cancelled read {transmit_id:?}: {code:?}");
3404
3405        Ok(code)
3406    }
3407
3408    /// Cancel a pending write for the specified stream or future from the guest.
3409    fn guest_cancel_write(
3410        self,
3411        store: &mut dyn VMStore,
3412        ty: TransmitIndex,
3413        async_: bool,
3414        writer: u32,
3415    ) -> Result<ReturnCode> {
3416        let (rep, state) =
3417            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3418        let id = TableId::<TransmitHandle>::new(rep);
3419        log::trace!("guest cancel write {id:?} (handle {writer})");
3420        match state {
3421            TransmitLocalState::Write { .. } => {
3422                bail!("stream or future write cancelled when no write is pending")
3423            }
3424            TransmitLocalState::Read { .. } => {
3425                bail!("passed read end to `{{stream|future}}.cancel-write`")
3426            }
3427            TransmitLocalState::Busy => {}
3428        }
3429        let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
3430        let code = self.cancel_write(store, transmit_id, async_)?;
3431        if !matches!(code, ReturnCode::Blocked) {
3432            let state =
3433                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3434                    .1;
3435            if let TransmitLocalState::Busy = state {
3436                *state = TransmitLocalState::Write { done: false };
3437            }
3438        }
3439        Ok(code)
3440    }
3441
3442    /// Cancel a pending read for the specified stream or future from the guest.
3443    fn guest_cancel_read(
3444        self,
3445        store: &mut dyn VMStore,
3446        ty: TransmitIndex,
3447        async_: bool,
3448        reader: u32,
3449    ) -> Result<ReturnCode> {
3450        let (rep, state) =
3451            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3452        let id = TableId::<TransmitHandle>::new(rep);
3453        log::trace!("guest cancel read {id:?} (handle {reader})");
3454        match state {
3455            TransmitLocalState::Read { .. } => {
3456                bail!("stream or future read cancelled when no read is pending")
3457            }
3458            TransmitLocalState::Write { .. } => {
3459                bail!("passed write end to `{{stream|future}}.cancel-read`")
3460            }
3461            TransmitLocalState::Busy => {}
3462        }
3463        let transmit_id = self.concurrent_state_mut(store).get_mut(id)?.state;
3464        let code = self.cancel_read(store, transmit_id, async_)?;
3465        if !matches!(code, ReturnCode::Blocked) {
3466            let state =
3467                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3468                    .1;
3469            if let TransmitLocalState::Busy = state {
3470                *state = TransmitLocalState::Read { done: false };
3471            }
3472        }
3473        Ok(code)
3474    }
3475
3476    /// Drop the readable end of the specified stream or future from the guest.
3477    fn guest_drop_readable(
3478        self,
3479        store: &mut dyn VMStore,
3480        ty: TransmitIndex,
3481        reader: u32,
3482    ) -> Result<()> {
3483        let table = self.id().get_mut(store).table_for_transmit(ty);
3484        let (rep, _is_done) = match ty {
3485            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3486            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3487        };
3488        let kind = match ty {
3489            TransmitIndex::Stream(_) => TransmitKind::Stream,
3490            TransmitIndex::Future(_) => TransmitKind::Future,
3491        };
3492        let id = TableId::<TransmitHandle>::new(rep);
3493        log::trace!("guest_drop_readable: drop reader {id:?}");
3494        self.host_drop_reader(store, id, kind)
3495    }
3496
3497    /// Create a new error context for the given component.
3498    pub(crate) fn error_context_new(
3499        self,
3500        store: &mut StoreOpaque,
3501        caller: RuntimeComponentInstanceIndex,
3502        ty: TypeComponentLocalErrorContextTableIndex,
3503        options: OptionsIndex,
3504        debug_msg_address: u32,
3505        debug_msg_len: u32,
3506    ) -> Result<u32> {
3507        self.id().get(store).check_may_leave(caller)?;
3508        let options = Options::new_index(store, self, options);
3509        let lift_ctx = &mut LiftContext::new(store, &options, self);
3510        //  Read string from guest memory
3511        let address = usize::try_from(debug_msg_address)?;
3512        let len = usize::try_from(debug_msg_len)?;
3513        lift_ctx
3514            .memory()
3515            .get(address..)
3516            .and_then(|b| b.get(..len))
3517            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3518        let message = WasmStr::new(address, len, lift_ctx)?;
3519
3520        // Create a new ErrorContext that is tracked along with other concurrent state
3521        let err_ctx = ErrorContextState {
3522            debug_msg: message
3523                .to_str_from_memory(options.memory(store))?
3524                .to_string(),
3525        };
3526        let state = self.concurrent_state_mut(store);
3527        let table_id = state.push(err_ctx)?;
3528        let global_ref_count_idx =
3529            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3530
3531        // Add to the global error context ref counts
3532        let _ = state
3533            .global_error_context_ref_counts
3534            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3535
3536        // Error context are tracked both locally (to a single component instance) and globally
3537        // the counts for both must stay in sync.
3538        //
3539        // Here we reflect the newly created global concurrent error context state into the
3540        // component instance's locally tracked count, along with the appropriate key into the global
3541        // ref tracking data structures to enable later lookup
3542        let local_idx = self
3543            .id()
3544            .get_mut(store)
3545            .table_for_error_context(ty)
3546            .error_context_insert(table_id.rep())?;
3547
3548        Ok(local_idx)
3549    }
3550
3551    /// Retrieve the debug message from the specified error context.
3552    pub(super) fn error_context_debug_message<T>(
3553        self,
3554        store: StoreContextMut<T>,
3555        ty: TypeComponentLocalErrorContextTableIndex,
3556        options: OptionsIndex,
3557        err_ctx_handle: u32,
3558        debug_msg_address: u32,
3559    ) -> Result<()> {
3560        // Retrieve the error context and internal debug message
3561        let handle_table_id_rep = self
3562            .id()
3563            .get_mut(store.0)
3564            .table_for_error_context(ty)
3565            .error_context_rep(err_ctx_handle)?;
3566
3567        let state = self.concurrent_state_mut(store.0);
3568        // Get the state associated with the error context
3569        let ErrorContextState { debug_msg } =
3570            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3571        let debug_msg = debug_msg.clone();
3572
3573        let options = Options::new_index(store.0, self, options);
3574        let types = self.id().get(store.0).component().types().clone();
3575        let lower_cx = &mut LowerContext::new(store, &options, &types, self);
3576        let debug_msg_address = usize::try_from(debug_msg_address)?;
3577        // Lower the string into the component's memory
3578        let offset = lower_cx
3579            .as_slice_mut()
3580            .get(debug_msg_address..)
3581            .and_then(|b| b.get(..debug_msg.bytes().len()))
3582            .map(|_| debug_msg_address)
3583            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3584        debug_msg
3585            .as_str()
3586            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3587
3588        Ok(())
3589    }
3590
3591    /// Implements the `future.cancel-read` intrinsic.
3592    pub(crate) fn future_cancel_read(
3593        self,
3594        store: &mut dyn VMStore,
3595        caller: RuntimeComponentInstanceIndex,
3596        ty: TypeFutureTableIndex,
3597        async_: bool,
3598        reader: u32,
3599    ) -> Result<u32> {
3600        self.id().get(store).check_may_leave(caller)?;
3601        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3602            .map(|v| v.encode())
3603    }
3604
3605    /// Implements the `future.cancel-write` intrinsic.
3606    pub(crate) fn future_cancel_write(
3607        self,
3608        store: &mut dyn VMStore,
3609        caller: RuntimeComponentInstanceIndex,
3610        ty: TypeFutureTableIndex,
3611        async_: bool,
3612        writer: u32,
3613    ) -> Result<u32> {
3614        self.id().get(store).check_may_leave(caller)?;
3615        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3616            .map(|v| v.encode())
3617    }
3618
3619    /// Implements the `stream.cancel-read` intrinsic.
3620    pub(crate) fn stream_cancel_read(
3621        self,
3622        store: &mut dyn VMStore,
3623        caller: RuntimeComponentInstanceIndex,
3624        ty: TypeStreamTableIndex,
3625        async_: bool,
3626        reader: u32,
3627    ) -> Result<u32> {
3628        self.id().get(store).check_may_leave(caller)?;
3629        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3630            .map(|v| v.encode())
3631    }
3632
3633    /// Implements the `stream.cancel-write` intrinsic.
3634    pub(crate) fn stream_cancel_write(
3635        self,
3636        store: &mut dyn VMStore,
3637        caller: RuntimeComponentInstanceIndex,
3638        ty: TypeStreamTableIndex,
3639        async_: bool,
3640        writer: u32,
3641    ) -> Result<u32> {
3642        self.id().get(store).check_may_leave(caller)?;
3643        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3644            .map(|v| v.encode())
3645    }
3646
3647    /// Implements the `future.drop-readable` intrinsic.
3648    pub(crate) fn future_drop_readable(
3649        self,
3650        store: &mut dyn VMStore,
3651        caller: RuntimeComponentInstanceIndex,
3652        ty: TypeFutureTableIndex,
3653        reader: u32,
3654    ) -> Result<()> {
3655        self.id().get(store).check_may_leave(caller)?;
3656        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3657    }
3658
3659    /// Implements the `stream.drop-readable` intrinsic.
3660    pub(crate) fn stream_drop_readable(
3661        self,
3662        store: &mut dyn VMStore,
3663        caller: RuntimeComponentInstanceIndex,
3664        ty: TypeStreamTableIndex,
3665        reader: u32,
3666    ) -> Result<()> {
3667        self.id().get(store).check_may_leave(caller)?;
3668        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3669    }
3670}
3671
3672impl ComponentInstance {
3673    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
3674        let (tables, types) = self.guest_tables();
3675        let runtime_instance = match ty {
3676            TransmitIndex::Stream(ty) => types[ty].instance,
3677            TransmitIndex::Future(ty) => types[ty].instance,
3678        };
3679        &mut tables[runtime_instance]
3680    }
3681
3682    fn table_for_error_context(
3683        self: Pin<&mut Self>,
3684        ty: TypeComponentLocalErrorContextTableIndex,
3685    ) -> &mut HandleTable {
3686        let (tables, types) = self.guest_tables();
3687        let runtime_instance = types[ty].instance;
3688        &mut tables[runtime_instance]
3689    }
3690
3691    fn get_mut_by_index(
3692        self: Pin<&mut Self>,
3693        ty: TransmitIndex,
3694        index: u32,
3695    ) -> Result<(u32, &mut TransmitLocalState)> {
3696        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
3697    }
3698
3699    /// Allocate a new future or stream and grant ownership of both the read and
3700    /// write ends to the (sub-)component instance to which the specified
3701    /// `TransmitIndex` belongs.
3702    fn guest_new(mut self: Pin<&mut Self>, ty: TransmitIndex) -> Result<ResourcePair> {
3703        let (write, read) = self.as_mut().concurrent_state_mut().new_transmit()?;
3704
3705        let table = self.as_mut().table_for_transmit(ty);
3706        let (read_handle, write_handle) = match ty {
3707            TransmitIndex::Future(ty) => (
3708                table.future_insert_read(ty, read.rep())?,
3709                table.future_insert_write(ty, write.rep())?,
3710            ),
3711            TransmitIndex::Stream(ty) => (
3712                table.stream_insert_read(ty, read.rep())?,
3713                table.stream_insert_write(ty, write.rep())?,
3714            ),
3715        };
3716
3717        let state = self.as_mut().concurrent_state_mut();
3718        state.get_mut(read)?.common.handle = Some(read_handle);
3719        state.get_mut(write)?.common.handle = Some(write_handle);
3720
3721        Ok(ResourcePair {
3722            write: write_handle,
3723            read: read_handle,
3724        })
3725    }
3726
3727    /// Drop the specified error context.
3728    pub(crate) fn error_context_drop(
3729        mut self: Pin<&mut Self>,
3730        caller: RuntimeComponentInstanceIndex,
3731        ty: TypeComponentLocalErrorContextTableIndex,
3732        error_context: u32,
3733    ) -> Result<()> {
3734        self.check_may_leave(caller)?;
3735
3736        let local_handle_table = self.as_mut().table_for_error_context(ty);
3737
3738        let rep = local_handle_table.error_context_drop(error_context)?;
3739
3740        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
3741
3742        let state = self.concurrent_state_mut();
3743        let GlobalErrorContextRefCount(global_ref_count) = state
3744            .global_error_context_ref_counts
3745            .get_mut(&global_ref_count_idx)
3746            .expect("retrieve concurrent state for error context during drop");
3747
3748        // Reduce the component-global ref count, removing tracking if necessary
3749        assert!(*global_ref_count >= 1);
3750        *global_ref_count -= 1;
3751        if *global_ref_count == 0 {
3752            state
3753                .global_error_context_ref_counts
3754                .remove(&global_ref_count_idx);
3755
3756            state
3757                .delete(TableId::<ErrorContextState>::new(rep))
3758                .context("deleting component-global error context data")?;
3759        }
3760
3761        Ok(())
3762    }
3763
3764    /// Transfer ownership of the specified stream or future read end from one
3765    /// guest to another.
3766    fn guest_transfer(
3767        mut self: Pin<&mut Self>,
3768        src_idx: u32,
3769        src: TransmitIndex,
3770        dst: TransmitIndex,
3771    ) -> Result<u32> {
3772        let src_table = self.as_mut().table_for_transmit(src);
3773        let (rep, is_done) = match src {
3774            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
3775            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
3776        };
3777        if is_done {
3778            bail!("cannot lift after being notified that the writable end dropped");
3779        }
3780        let dst_table = self.as_mut().table_for_transmit(dst);
3781        let handle = match dst {
3782            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
3783            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
3784        }?;
3785        self.concurrent_state_mut()
3786            .get_mut(TableId::<TransmitHandle>::new(rep))?
3787            .common
3788            .handle = Some(handle);
3789        Ok(handle)
3790    }
3791
3792    /// Implements the `future.new` intrinsic.
3793    pub(crate) fn future_new(
3794        self: Pin<&mut Self>,
3795        caller: RuntimeComponentInstanceIndex,
3796        ty: TypeFutureTableIndex,
3797    ) -> Result<ResourcePair> {
3798        self.check_may_leave(caller)?;
3799        self.guest_new(TransmitIndex::Future(ty))
3800    }
3801
3802    /// Implements the `stream.new` intrinsic.
3803    pub(crate) fn stream_new(
3804        self: Pin<&mut Self>,
3805        caller: RuntimeComponentInstanceIndex,
3806        ty: TypeStreamTableIndex,
3807    ) -> Result<ResourcePair> {
3808        self.check_may_leave(caller)?;
3809        self.guest_new(TransmitIndex::Stream(ty))
3810    }
3811
3812    /// Transfer ownership of the specified future read end from one guest to
3813    /// another.
3814    pub(crate) fn future_transfer(
3815        self: Pin<&mut Self>,
3816        src_idx: u32,
3817        src: TypeFutureTableIndex,
3818        dst: TypeFutureTableIndex,
3819    ) -> Result<u32> {
3820        self.guest_transfer(
3821            src_idx,
3822            TransmitIndex::Future(src),
3823            TransmitIndex::Future(dst),
3824        )
3825    }
3826
3827    /// Transfer ownership of the specified stream read end from one guest to
3828    /// another.
3829    pub(crate) fn stream_transfer(
3830        self: Pin<&mut Self>,
3831        src_idx: u32,
3832        src: TypeStreamTableIndex,
3833        dst: TypeStreamTableIndex,
3834    ) -> Result<u32> {
3835        self.guest_transfer(
3836            src_idx,
3837            TransmitIndex::Stream(src),
3838            TransmitIndex::Stream(dst),
3839        )
3840    }
3841
3842    /// Copy the specified error context from one component to another.
3843    pub(crate) fn error_context_transfer(
3844        mut self: Pin<&mut Self>,
3845        src_idx: u32,
3846        src: TypeComponentLocalErrorContextTableIndex,
3847        dst: TypeComponentLocalErrorContextTableIndex,
3848    ) -> Result<u32> {
3849        let rep = self
3850            .as_mut()
3851            .table_for_error_context(src)
3852            .error_context_rep(src_idx)?;
3853        let dst_idx = self
3854            .as_mut()
3855            .table_for_error_context(dst)
3856            .error_context_insert(rep)?;
3857
3858        // Update the global (cross-subcomponent) count for error contexts
3859        // as the new component has essentially created a new reference that will
3860        // be dropped/handled independently
3861        let global_ref_count = self
3862            .concurrent_state_mut()
3863            .global_error_context_ref_counts
3864            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
3865            .context("global ref count present for existing (sub)component error context")?;
3866        global_ref_count.0 += 1;
3867
3868        Ok(dst_idx)
3869    }
3870}
3871
3872impl ConcurrentState {
3873    fn send_write_result(
3874        &mut self,
3875        ty: TransmitIndex,
3876        id: TableId<TransmitState>,
3877        handle: u32,
3878        code: ReturnCode,
3879    ) -> Result<()> {
3880        let write_handle = self.get_mut(id)?.write_handle.rep();
3881        self.set_event(
3882            write_handle,
3883            match ty {
3884                TransmitIndex::Future(ty) => Event::FutureWrite {
3885                    code,
3886                    pending: Some((ty, handle)),
3887                },
3888                TransmitIndex::Stream(ty) => Event::StreamWrite {
3889                    code,
3890                    pending: Some((ty, handle)),
3891                },
3892            },
3893        )
3894    }
3895
3896    fn send_read_result(
3897        &mut self,
3898        ty: TransmitIndex,
3899        id: TableId<TransmitState>,
3900        handle: u32,
3901        code: ReturnCode,
3902    ) -> Result<()> {
3903        let read_handle = self.get_mut(id)?.read_handle.rep();
3904        self.set_event(
3905            read_handle,
3906            match ty {
3907                TransmitIndex::Future(ty) => Event::FutureRead {
3908                    code,
3909                    pending: Some((ty, handle)),
3910                },
3911                TransmitIndex::Stream(ty) => Event::StreamRead {
3912                    code,
3913                    pending: Some((ty, handle)),
3914                },
3915            },
3916        )
3917    }
3918
3919    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
3920        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
3921    }
3922
3923    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3924        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
3925    }
3926
3927    /// Set or update the event for the specified waitable.
3928    ///
3929    /// If there is already an event set for this waitable, we assert that it is
3930    /// of the same variant as the new one and reuse the `ReturnCode` count and
3931    /// the `pending` field if applicable.
3932    // TODO: This is a bit awkward due to how
3933    // `Event::{Stream,Future}{Write,Read}` and
3934    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
3935    // Consider updating those representations in a way that allows this
3936    // function to be simplified.
3937    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
3938        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
3939
3940        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
3941            let (ReturnCode::Completed(count)
3942            | ReturnCode::Dropped(count)
3943            | ReturnCode::Cancelled(count)) = old
3944            else {
3945                unreachable!()
3946            };
3947
3948            match new {
3949                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
3950                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
3951                _ => unreachable!(),
3952            }
3953        }
3954
3955        let event = match (waitable.take_event(self)?, event) {
3956            (None, _) => event,
3957            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
3958            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
3959            (
3960                Some(Event::StreamWrite {
3961                    code: old_code,
3962                    pending: old_pending,
3963                }),
3964                Event::StreamWrite { code, pending },
3965            ) => Event::StreamWrite {
3966                code: update_code(old_code, code),
3967                pending: old_pending.or(pending),
3968            },
3969            (
3970                Some(Event::StreamRead {
3971                    code: old_code,
3972                    pending: old_pending,
3973                }),
3974                Event::StreamRead { code, pending },
3975            ) => Event::StreamRead {
3976                code: update_code(old_code, code),
3977                pending: old_pending.or(pending),
3978            },
3979            _ => unreachable!(),
3980        };
3981
3982        waitable.set_event(self, Some(event))
3983    }
3984
3985    /// Allocate a new future or stream, including the `TransmitState` and the
3986    /// `TransmitHandle`s corresponding to the read and write ends.
3987    fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
3988        let state_id = self.push(TransmitState::default())?;
3989
3990        let write = self.push(TransmitHandle::new(state_id))?;
3991        let read = self.push(TransmitHandle::new(state_id))?;
3992
3993        let state = self.get_mut(state_id)?;
3994        state.write_handle = write;
3995        state.read_handle = read;
3996
3997        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
3998
3999        Ok((write, read))
4000    }
4001
4002    /// Delete the specified future or stream, including the read and write ends.
4003    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4004        let state = self.delete(state_id)?;
4005        self.delete(state.write_handle)?;
4006        self.delete(state.read_handle)?;
4007
4008        log::trace!(
4009            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4010            state.write_handle,
4011            state.read_handle,
4012        );
4013
4014        Ok(())
4015    }
4016}
4017
4018pub(crate) struct ResourcePair {
4019    pub(crate) write: u32,
4020    pub(crate) read: u32,
4021}