Skip to main content

wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9    AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10    Val, WasmList,
11};
12use crate::prelude::*;
13use crate::store::{StoreOpaque, StoreToken};
14use crate::try_mutex::{TryMutex, TryMutexGuard};
15use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
16use crate::vm::{AlwaysMut, VMStore};
17use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
18use crate::{
19    Error, Result, Trap, bail, bail_bug, ensure,
20    error::{Context as _, format_err},
21};
22use alloc::sync::Arc;
23use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
24use core::any::{Any, TypeId};
25use core::fmt;
26use core::future;
27use core::iter;
28use core::marker::PhantomData;
29use core::mem::{self, ManuallyDrop, MaybeUninit};
30use core::ops::{Deref, DerefMut};
31use core::pin::Pin;
32use core::task::{Context, Poll, Waker, ready};
33use futures::channel::oneshot;
34use futures::{FutureExt as _, stream};
35use wasmtime_environ::component::{
36    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
37    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
38    TypeFutureTableIndex, TypeStreamTableIndex,
39};
40
41pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
42
43mod buffers;
44
45/// Enum for distinguishing between a stream or future in functions that handle
46/// both.
47#[derive(Copy, Clone, Debug)]
48pub enum TransmitKind {
49    Stream,
50    Future,
51}
52
53/// Represents `{stream,future}.{read,write}` results.
54#[derive(Copy, Clone, Debug, PartialEq)]
55pub enum ReturnCode {
56    Blocked,
57    Completed(ItemCount),
58    Dropped(ItemCount),
59    Cancelled(ItemCount),
60}
61
62impl ReturnCode {
63    /// Pack `self` into a single 32-bit integer that may be returned to the
64    /// guest.
65    ///
66    /// This corresponds to `pack_copy_result` in the Component Model spec.
67    pub fn encode(&self) -> u32 {
68        const BLOCKED: u32 = 0xffff_ffff;
69        const COMPLETED: u32 = 0x0;
70        const DROPPED: u32 = 0x1;
71        const CANCELLED: u32 = 0x2;
72        match self {
73            ReturnCode::Blocked => BLOCKED,
74            ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED,
75            ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED,
76            ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED,
77        }
78    }
79
80    /// Returns `Self::Completed` with the specified count (or zero if
81    /// `matches!(kind, TransmitKind::Future)`)
82    fn completed(kind: TransmitKind, count: ItemCount) -> Self {
83        Self::Completed(if let TransmitKind::Future = kind {
84            ItemCount::ZERO
85        } else {
86            count
87        })
88    }
89}
90
91/// Representation of how many items are being operated on in a stream read or
92/// write.
93///
94/// The component model requires that stream operations are limited to `1<<28`
95/// items in one go. This type is a newtype wrapper around `u32` with the
96/// invariant that the internal value is limited by this amount.
97#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
98#[repr(transparent)]
99pub struct ItemCount {
100    raw: u32,
101}
102
103impl ItemCount {
104    const MAX: u32 = 1 << 28;
105    const ZERO: ItemCount = ItemCount { raw: 0 };
106
107    /// Creates a new `ItemCount` with the specified count, or a trap if it's
108    /// too large.
109    fn new(count: u32) -> Result<Self, Trap> {
110        if count < Self::MAX {
111            Ok(Self { raw: count })
112        } else {
113            Err(Trap::StreamOpTooBig)
114        }
115    }
116
117    /// Same as `Self::new` but takes a `usize`.
118    fn new_usize(count: usize) -> Result<Self, Trap> {
119        let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?;
120        Self::new(count)
121    }
122
123    fn as_u32(&self) -> u32 {
124        self.raw
125    }
126
127    fn as_usize(&self) -> usize {
128        usize::try_from(self.raw).unwrap()
129    }
130
131    /// Increments `self` by `amt`, returning a trap if the amount would exceed
132    /// the maximum item count.
133    fn inc(&mut self, amt: usize) -> Result<(), Trap> {
134        let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?;
135        let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?;
136        if new_raw < Self::MAX {
137            self.raw = new_raw;
138            Ok(())
139        } else {
140            Err(Trap::StreamOpTooBig)
141        }
142    }
143
144    /// Helper to add two `ItemCount`s together, fallibly.
145    ///
146    /// It's considered a bug if this overflows, so this is only suitable in
147    /// situations where overflow and/or exceeding the total item count is known
148    /// that it may be possible.
149    fn add(&self, other: ItemCount) -> Result<ItemCount> {
150        match self.raw.checked_add(other.raw) {
151            Some(raw) => Ok(ItemCount::new(raw)?),
152            None => bail_bug!("overflow in `ItemCount::add`"),
153        }
154    }
155
156    /// Same as `add`, but for subtraction.
157    ///
158    /// Like with `add` this is only suitable for situations where the result is
159    /// known to not underflow.
160    fn sub(&self, other: ItemCount) -> Result<ItemCount> {
161        match self.raw.checked_sub(other.raw) {
162            Some(raw) => Ok(ItemCount { raw }),
163            None => bail_bug!("underflow in `ItemCount::sub`"),
164        }
165    }
166}
167
168impl fmt::Display for ItemCount {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        self.raw.fmt(f)
171    }
172}
173
174impl fmt::Debug for ItemCount {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        self.raw.fmt(f)
177    }
178}
179
180impl PartialEq<u32> for ItemCount {
181    fn eq(&self, other: &u32) -> bool {
182        self.raw == *other
183    }
184}
185
186impl PartialOrd<u32> for ItemCount {
187    fn partial_cmp(&self, other: &u32) -> Option<core::cmp::Ordering> {
188        self.raw.partial_cmp(other)
189    }
190}
191
192/// Represents a stream or future type index.
193///
194/// This is useful as a parameter type for functions which operate on either a
195/// future or a stream.
196#[derive(Copy, Clone, Debug)]
197pub enum TransmitIndex {
198    Stream(TypeStreamTableIndex),
199    Future(TypeFutureTableIndex),
200}
201
202impl TransmitIndex {
203    pub fn kind(&self) -> TransmitKind {
204        match self {
205            TransmitIndex::Stream(_) => TransmitKind::Stream,
206            TransmitIndex::Future(_) => TransmitKind::Future,
207        }
208    }
209
210    /// Retrieve the payload type of the specified stream or future, or `None`
211    /// if it has no payload type.
212    fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
213        match self {
214            TransmitIndex::Stream(i) => {
215                let ty = types[*i].ty;
216                types[ty].payload.as_ref()
217            }
218            TransmitIndex::Future(i) => {
219                let ty = types[*i].ty;
220                types[ty].payload.as_ref()
221            }
222        }
223    }
224}
225
226/// Retrieve the host rep and state for the specified guest-visible waitable
227/// handle.
228fn get_mut_by_index_from(
229    handle_table: &mut HandleTable,
230    ty: TransmitIndex,
231    index: u32,
232) -> Result<(u32, &mut TransmitLocalState)> {
233    match ty {
234        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
235        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
236    }
237}
238
239fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
240    mut store: StoreContextMut<U>,
241    instance: Instance,
242    caller_thread: QualifiedThreadId,
243    options: OptionsIndex,
244    ty: TransmitIndex,
245    address: usize,
246    count: usize,
247    buffer: &mut B,
248) -> Result<()> {
249    let count = buffer.remaining().len().min(count);
250
251    // If lowering may call realloc in the guest, then the guest may need
252    // to access its thread context, so we need to set the current thread before lowering
253    // and restore the old one afterward.
254    let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
255        let old_thread = store.0.set_thread(caller_thread)?;
256        (
257            &mut LowerContext::new(store.as_context_mut(), options, instance),
258            Some(old_thread),
259        )
260    } else {
261        (
262            &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
263            None,
264        )
265    };
266
267    if address % usize::try_from(T::ALIGN32)? != 0 {
268        bail!("read pointer not aligned");
269    }
270    lower
271        .as_slice_mut()
272        .get_mut(address..)
273        .and_then(|b| b.get_mut(..T::SIZE32 * count))
274        .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
275
276    if let Some(ty) = ty.payload(lower.types) {
277        T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
278    }
279
280    if let Some(old_thread) = old_thread {
281        store.0.set_thread(old_thread)?;
282    }
283
284    buffer.skip(count);
285
286    Ok(())
287}
288
289fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
290    lift: &mut LiftContext<'_>,
291    ty: Option<InterfaceType>,
292    buffer: &mut B,
293    address: usize,
294    count: usize,
295) -> Result<()> {
296    let count = count.min(buffer.remaining_capacity());
297    if T::IS_RUST_UNIT_TYPE {
298        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
299        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
300        // is a valid way to populate the zero-sized buffer.
301        buffer.extend(
302            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
303        )
304    } else {
305        let ty = match ty {
306            Some(ty) => ty,
307            None => bail_bug!("type required for non-unit lift"),
308        };
309        if address % usize::try_from(T::ALIGN32)? != 0 {
310            bail!("write pointer not aligned");
311        }
312        lift.memory()
313            .get(address..)
314            .and_then(|b| b.get(..T::SIZE32 * count))
315            .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
316
317        let list = &WasmList::new(address, count, lift, ty)?;
318        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
319    }
320    Ok(())
321}
322
323/// Represents the state associated with an error context
324#[derive(Debug, PartialEq, Eq, PartialOrd)]
325pub(super) struct ErrorContextState {
326    /// Debug message associated with the error context
327    pub(crate) debug_msg: String,
328}
329
330/// Represents the size and alignment for a "flat" Component Model type,
331/// i.e. one containing no pointers or handles.
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub(super) struct FlatAbi {
334    pub(super) size: u32,
335    pub(super) align: u32,
336}
337
338struct HostBuffer<'a> {
339    dst: &'a mut Vec<u8>,
340    marked_written: &'a mut usize,
341}
342
343impl HostBuffer<'_> {
344    fn reborrow(&mut self) -> HostBuffer<'_> {
345        HostBuffer {
346            dst: &mut *self.dst,
347            marked_written: &mut *self.marked_written,
348        }
349    }
350}
351
352/// Represents the buffer for a host- or guest-initiated stream read.
353pub struct Destination<'a, T, B> {
354    id: TableId<TransmitState>,
355    buffer: &'a mut B,
356    host_buffer: Option<HostBuffer<'a>>,
357    _phantom: PhantomData<fn() -> T>,
358}
359
360impl<'a, T, B> Destination<'a, T, B> {
361    /// Reborrow `self` so it can be used again later.
362    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
363        Destination {
364            id: self.id,
365            buffer: &mut *self.buffer,
366            host_buffer: self.host_buffer.as_mut().map(|b| b.reborrow()),
367            _phantom: PhantomData,
368        }
369    }
370
371    /// Take the buffer out of `self`, leaving a default-initialized one in its
372    /// place.
373    ///
374    /// This can be useful for reusing the previously-stored buffer's capacity
375    /// instead of allocating a fresh one.
376    pub fn take_buffer(&mut self) -> B
377    where
378        B: Default,
379    {
380        mem::take(self.buffer)
381    }
382
383    /// Store the specified buffer in `self`.
384    ///
385    /// Any items contained in the buffer will be delivered to the reader after
386    /// the `StreamProducer::poll_produce` call to which this `Destination` was
387    /// passed returns (unless overwritten by another call to `set_buffer`).
388    ///
389    /// If items are stored via this buffer _and_ written via a
390    /// `DirectDestination` view of `self`, then the items in the buffer will be
391    /// delivered after the ones written using `DirectDestination`.
392    pub fn set_buffer(&mut self, buffer: B) {
393        *self.buffer = buffer;
394    }
395
396    /// Return the remaining number of items the current read has capacity to
397    /// accept, if known.
398    ///
399    /// This will return `Some(_)` if the reader is a guest; it will return
400    /// `None` if the reader is the host.
401    ///
402    /// Note that this can return `Some(0)`. This means that the guest is
403    /// attempting to perform a zero-length read which typically means that it's
404    /// trying to wait for this stream to be ready-to-read but is not actually
405    /// ready to receive the items yet. The host in this case is allowed to
406    /// either block waiting for readiness or immediately complete the
407    /// operation. The guest is expected to handle both cases. Some more
408    /// discussion about this case can be found in the discussion of ["Stream
409    /// Readiness" in the component-model repo][docs].
410    ///
411    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
412    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
413        // Note that this unwrap should only trigger for bugs in Wasmtime, and
414        // this is modeled here to centralize the `.unwrap()` for this method in
415        // one location.
416        self.remaining_(store.as_context_mut().0).unwrap()
417    }
418
419    fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
420        let transmit = store.concurrent_state_mut()?.get_mut(self.id)?;
421
422        if let &ReadState::GuestReady { count, .. } = &transmit.read {
423            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
424                bail_bug!("expected WriteState::HostReady")
425            };
426
427            Ok(Some(count.as_usize() - guest_offset.as_usize()))
428        } else {
429            Ok(None)
430        }
431    }
432}
433
434impl<'a, B> Destination<'a, u8, B> {
435    /// Return a `DirectDestination` view of `self`.
436    ///
437    /// If the reader is a guest, this will provide direct access to the guest's
438    /// read buffer.  If the reader is a host, this will provide access to a
439    /// buffer which will be delivered to the host before any items stored using
440    /// `Destination::set_buffer`.
441    ///
442    /// `capacity` will only be used if the reader is a host, in which case it
443    /// will update the length of the buffer, possibly zero-initializing the new
444    /// elements if the new length is larger than the old length.
445    pub fn as_direct<D>(
446        mut self,
447        store: StoreContextMut<'a, D>,
448        capacity: usize,
449    ) -> DirectDestination<'a, D> {
450        if let Some(buffer) = &mut self.host_buffer {
451            *buffer.marked_written = 0;
452            buffer.dst.resize(capacity, 0);
453        }
454
455        DirectDestination {
456            id: self.id,
457            host_buffer: self.host_buffer,
458            store,
459        }
460    }
461}
462
463/// Represents a read from a `stream<u8>`, providing direct access to the
464/// writer's buffer.
465pub struct DirectDestination<'a, D: 'static> {
466    id: TableId<TransmitState>,
467    host_buffer: Option<HostBuffer<'a>>,
468    store: StoreContextMut<'a, D>,
469}
470
471#[cfg(feature = "std")]
472impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
473    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
474        let rem = self.remaining();
475        let n = rem.len().min(buf.len());
476        rem[..n].copy_from_slice(&buf[..n]);
477        self.mark_written(n);
478        Ok(n)
479    }
480
481    fn flush(&mut self) -> std::io::Result<()> {
482        Ok(())
483    }
484}
485
486impl<D: 'static> DirectDestination<'_, D> {
487    /// Provide direct access to the writer's buffer.
488    pub fn remaining(&mut self) -> &mut [u8] {
489        // Note that this unwrap should only trigger for bugs in Wasmtime, and
490        // this is modeled here to centralize the `.unwrap()` for this method in
491        // one location.
492        self.remaining_().unwrap()
493    }
494
495    fn remaining_(&mut self) -> Result<&mut [u8]> {
496        if let Some(buffer) = self.host_buffer.as_mut() {
497            return Ok(buffer.dst);
498        }
499        let transmit = self
500            .store
501            .as_context_mut()
502            .0
503            .concurrent_state_mut()?
504            .get_mut(self.id)?;
505
506        let &ReadState::GuestReady {
507            address,
508            count,
509            options,
510            instance,
511            ..
512        } = &transmit.read
513        else {
514            bail_bug!("expected ReadState::GuestReady")
515        };
516
517        let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
518            bail_bug!("expected WriteState::HostReady")
519        };
520
521        let memory = instance
522            .options_memory_mut(self.store.0, options)
523            .get_mut((address + guest_offset.as_usize())..)
524            .and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize())));
525        match memory {
526            Some(memory) => Ok(memory),
527            None => bail_bug!("guest buffer unexpectedly out of bounds"),
528        }
529    }
530
531    /// Mark the specified number of bytes as written to the writer's buffer.
532    ///
533    /// # Panics
534    ///
535    /// This will panic if the count is larger than the size of the
536    /// buffer returned by `Self::remaining`.
537    pub fn mark_written(&mut self, count: usize) {
538        // Note that this unwrap should only trigger for bugs in Wasmtime, and
539        // this is modeled here to centralize the `.unwrap()` for this method in
540        // one location.
541        self.mark_written_(count).unwrap()
542    }
543
544    fn mark_written_(&mut self, count: usize) -> Result<()> {
545        if let Some(buffer) = self.host_buffer.as_mut() {
546            // Note that this `.unwrap` is a documented panic condition of
547            // `mark_written`.
548            *buffer.marked_written = buffer.marked_written.checked_add(count).unwrap();
549        } else {
550            let transmit = self
551                .store
552                .as_context_mut()
553                .0
554                .concurrent_state_mut()?
555                .get_mut(self.id)?;
556
557            let ReadState::GuestReady {
558                count: read_count, ..
559            } = &transmit.read
560            else {
561                bail_bug!("expected ReadState::GuestReady")
562            };
563
564            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
565                bail_bug!("expected WriteState::HostReady");
566            };
567
568            if guest_offset.as_usize() + count > read_count.as_usize() {
569                // Note that this `panic` is a documented panic condition of
570                // `mark_written`.
571                panic!(
572                    "write count ({count}) must be less than or equal to read count ({read_count})"
573                )
574            } else {
575                guest_offset.inc(count)?;
576            }
577        }
578        Ok(())
579    }
580}
581
582/// Represents the state of a `Stream{Producer,Consumer}`.
583#[derive(Copy, Clone, Debug)]
584pub enum StreamResult {
585    /// The operation completed normally, and the producer or consumer may be
586    /// able to produce or consume more items, respectively.
587    Completed,
588    /// The operation was interrupted (i.e. it wrapped up early after receiving
589    /// a `finish` parameter value of true in a call to `poll_produce` or
590    /// `poll_consume`), and the producer or consumer may be able to produce or
591    /// consume more items, respectively.
592    Cancelled,
593    /// The operation completed normally, but the producer or consumer will
594    /// _not_ able to produce or consume more items, respectively.
595    Dropped,
596}
597
598/// Represents the host-owned write end of a stream.
599pub trait StreamProducer<D>: Send + 'static {
600    /// The payload type of this stream.
601    type Item;
602
603    /// The `WriteBuffer` type to use when delivering items.
604    type Buffer: WriteBuffer<Self::Item> + Default;
605
606    /// Handle a host- or guest-initiated read by delivering zero or more items
607    /// to the specified destination.
608    ///
609    /// This will be called whenever the reader starts a read.
610    ///
611    /// # Arguments
612    ///
613    /// * `self` - a `Pin`'d version of self to perform Rust-level
614    ///   future-related operations on.
615    /// * `cx` - a Rust-related [`Context`] which is passed to other
616    ///   future-related operations or used to acquire a waker.
617    /// * `store` - the Wasmtime store that this operation is happening within.
618    ///   Used, for example, to consult the state `D` associated with the store.
619    /// * `destination` - the location that items are to be written to.
620    /// * `finish` - a flag indicating whether the host should strive to
621    ///   immediately complete/cancel any pending operation. See below for more
622    ///   details.
623    ///
624    /// # Behavior
625    ///
626    /// If the implementation is able to produce one or more items immediately,
627    /// it should write them to `destination` and return either
628    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
629    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
630    /// any more items.
631    ///
632    /// If the implementation is unable to produce any items immediately, but
633    /// expects to do so later, and `finish` is _false_, it should store the
634    /// waker from `cx` for later and return `Poll::Pending` without writing
635    /// anything to `destination`.  Later, it should alert the waker when either
636    /// the items arrive, the stream has ended, or an error occurs.
637    ///
638    /// If more items are written to `destination` than the reader has immediate
639    /// capacity to accept, they will be retained in memory by the caller and
640    /// used to satisfy future reads, in which case `poll_produce` will only be
641    /// called again once all those items have been delivered.
642    ///
643    /// # Zero-length reads
644    ///
645    /// This function may be called with a zero-length capacity buffer
646    /// (i.e. `Destination::remaining` returns `Some(0)`). This indicates that
647    /// the guest wants to wait to see if an item is ready without actually
648    /// reading the item. For example think of a UNIX `poll` function run on a
649    /// TCP stream, seeing if it's readable without actually reading it.
650    ///
651    /// In this situation the host is allowed to either return immediately or
652    /// wait for readiness. Note that waiting for readiness is not always
653    /// possible. For example it's impossible to test if a Rust-native `Future`
654    /// is ready without actually reading the item. Stream-specific
655    /// optimizations, such as testing if a TCP stream is readable, may be
656    /// possible however.
657    ///
658    /// For a zero-length read, the host is allowed to:
659    ///
660    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
661    ///   anything if it expects to be able to produce items immediately (i.e.
662    ///   without first returning `Poll::Pending`) the next time `poll_produce`
663    ///   is called with non-zero capacity. This is the best-case scenario of
664    ///   fulfilling the guest's desire -- items aren't read/buffered but the
665    ///   host is saying it's ready when the guest is.
666    ///
667    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without actually
668    ///   testing for readiness. The guest doesn't know this yet, but the guest
669    ///   will realize that zero-length reads won't work on this stream when a
670    ///   subsequent nonzero read attempt is made which returns `Poll::Pending`
671    ///   here.
672    ///
673    /// - Return `Poll::Pending` if the host has performed necessary async work
674    ///   to wait for this stream to be readable without actually reading
675    ///   anything. This is also a best-case scenario where the host is letting
676    ///   the guest know that nothing is ready yet. Later the zero-length read
677    ///   will complete and then the guest will attempt a nonzero-length read to
678    ///   actually read some bytes.
679    ///
680    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
681    ///   `Destination::set_buffer` with one more more items. Note, however,
682    ///   that this creates the hazard that the items will never be received by
683    ///   the guest if it decides not to do another non-zero-length read before
684    ///   closing the stream.  Moreover, if `Self::Item` is e.g. a
685    ///   `Resource<_>`, they may end up leaking in that scenario. It is not
686    ///   recommended to do this and it's better to return
687    ///   `StreamResult::Completed` without buffering anything instead.
688    ///
689    /// For more discussion on zero-length reads see the [documentation in the
690    /// component-model repo itself][docs].
691    ///
692    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
693    ///
694    /// # Return
695    ///
696    /// This function can return a number of possible cases from this function:
697    ///
698    /// * `Poll::Pending` - this operation cannot complete at this time. The
699    ///   Rust-level `Future::poll` contract applies here where a waker should
700    ///   be stored from the `cx` argument and be arranged to receive a
701    ///   notification when this implementation can make progress. For example
702    ///   if you call `Future::poll` on a sub-future, that's enough. If items
703    ///   were written to `destination` then a trap in the guest will be raised.
704    ///
705    ///   Note that implementations should strive to avoid this return value
706    ///   when `finish` is `true`. In such a situation the guest is attempting
707    ///   to, for example, cancel a previous operation. By returning
708    ///   `Poll::Pending` the guest will be blocked during the cancellation
709    ///   request. If `finish` is `true` then `StreamResult::Cancelled` is
710    ///   favored to indicate that no items were read. If a short read happened,
711    ///   however, it's ok to return `StreamResult::Completed` indicating some
712    ///   items were read.
713    ///
714    /// * `Poll::Ok(StreamResult::Completed)` - items, if applicable, were
715    ///   written to the `destination`.
716    ///
717    /// * `Poll::Ok(StreamResult::Cancelled)` - used when `finish` is `true` and
718    ///   the implementation was able to successfully cancel any async work that
719    ///   a previous read kicked off, if any. The host should not buffer values
720    ///   received after returning `Cancelled` because the guest will not be
721    ///   aware of these values and the guest could close the stream after
722    ///   cancelling a read. Hosts should only return `Cancelled` when there are
723    ///   no more async operations in flight for a previous read.
724    ///
725    ///   If items were written to `destination` then a trap in the guest will
726    ///   be raised. If `finish` is `false` then this return value will raise a
727    ///   trap in the guest.
728    ///
729    /// * `Poll::Ok(StreamResult::Dropped)` - end-of-stream marker, indicating
730    ///   that this producer should not be polled again. Note that items may
731    ///   still be written to `destination`.
732    ///
733    /// # Errors
734    ///
735    /// The implementation may alternatively choose to return `Err(_)` to
736    /// indicate an unrecoverable error. This will cause the guest (if any) to
737    /// trap and render the component instance (if any) unusable. The
738    /// implementation should report errors that _are_ recoverable by other
739    /// means (e.g. by writing to a `future`) and return
740    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
741    fn poll_produce<'a>(
742        self: Pin<&mut Self>,
743        cx: &mut Context<'_>,
744        store: StoreContextMut<'a, D>,
745        destination: Destination<'a, Self::Item, Self::Buffer>,
746        finish: bool,
747    ) -> Poll<Result<StreamResult>>;
748
749    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
750    /// be downcast to the specified type.
751    ///
752    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
753    /// to the specified type is guaranteed to succeed.
754    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
755        Err(me)
756    }
757}
758
759impl<T, D> StreamProducer<D> for iter::Empty<T>
760where
761    T: Send + Sync + 'static,
762{
763    type Item = T;
764    type Buffer = Option<Self::Item>;
765
766    fn poll_produce<'a>(
767        self: Pin<&mut Self>,
768        _: &mut Context<'_>,
769        _: StoreContextMut<'a, D>,
770        _: Destination<'a, Self::Item, Self::Buffer>,
771        _: bool,
772    ) -> Poll<Result<StreamResult>> {
773        Poll::Ready(Ok(StreamResult::Dropped))
774    }
775}
776
777impl<T, D> StreamProducer<D> for stream::Empty<T>
778where
779    T: Send + Sync + 'static,
780{
781    type Item = T;
782    type Buffer = Option<Self::Item>;
783
784    fn poll_produce<'a>(
785        self: Pin<&mut Self>,
786        _: &mut Context<'_>,
787        _: StoreContextMut<'a, D>,
788        _: Destination<'a, Self::Item, Self::Buffer>,
789        _: bool,
790    ) -> Poll<Result<StreamResult>> {
791        Poll::Ready(Ok(StreamResult::Dropped))
792    }
793}
794
795impl<T, D> StreamProducer<D> for Vec<T>
796where
797    T: Unpin + Send + Sync + 'static,
798{
799    type Item = T;
800    type Buffer = VecBuffer<T>;
801
802    fn poll_produce<'a>(
803        self: Pin<&mut Self>,
804        _: &mut Context<'_>,
805        _: StoreContextMut<'a, D>,
806        mut dst: Destination<'a, Self::Item, Self::Buffer>,
807        _: bool,
808    ) -> Poll<Result<StreamResult>> {
809        dst.set_buffer(mem::take(self.get_mut()).into());
810        Poll::Ready(Ok(StreamResult::Dropped))
811    }
812}
813
814impl<T, D> StreamProducer<D> for Box<[T]>
815where
816    T: Unpin + Send + Sync + 'static,
817{
818    type Item = T;
819    type Buffer = VecBuffer<T>;
820
821    fn poll_produce<'a>(
822        self: Pin<&mut Self>,
823        _: &mut Context<'_>,
824        _: StoreContextMut<'a, D>,
825        mut dst: Destination<'a, Self::Item, Self::Buffer>,
826        _: bool,
827    ) -> Poll<Result<StreamResult>> {
828        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
829        Poll::Ready(Ok(StreamResult::Dropped))
830    }
831}
832
833#[cfg(feature = "component-model-bytes")]
834impl<D> StreamProducer<D> for bytes::Bytes {
835    type Item = u8;
836    type Buffer = Self;
837
838    fn poll_produce<'a>(
839        self: Pin<&mut Self>,
840        _: &mut Context<'_>,
841        _store: StoreContextMut<'a, D>,
842        mut dst: Destination<'a, Self::Item, Self::Buffer>,
843        _: bool,
844    ) -> Poll<Result<StreamResult>> {
845        dst.set_buffer(mem::take(self.get_mut()));
846        Poll::Ready(Ok(StreamResult::Dropped))
847    }
848}
849
850#[cfg(feature = "component-model-bytes")]
851impl<D> StreamProducer<D> for bytes::BytesMut {
852    type Item = u8;
853    type Buffer = Self;
854
855    fn poll_produce<'a>(
856        self: Pin<&mut Self>,
857        _: &mut Context<'_>,
858        _store: StoreContextMut<'a, D>,
859        mut dst: Destination<'a, Self::Item, Self::Buffer>,
860        _: bool,
861    ) -> Poll<Result<StreamResult>> {
862        dst.set_buffer(mem::take(self.get_mut()));
863        Poll::Ready(Ok(StreamResult::Dropped))
864    }
865}
866
867/// Represents the buffer for a host- or guest-initiated stream write.
868pub struct Source<'a, T> {
869    id: TableId<TransmitState>,
870    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
871}
872
873impl<'a, T> Source<'a, T> {
874    /// Reborrow `self` so it can be used again later.
875    pub fn reborrow(&mut self) -> Source<'_, T> {
876        Source {
877            id: self.id,
878            host_buffer: self.host_buffer.as_deref_mut(),
879        }
880    }
881
882    /// Accept zero or more items from the writer.
883    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
884    where
885        T: func::Lift + 'static,
886        B: ReadBuffer<T>,
887    {
888        if let Some(input) = &mut self.host_buffer {
889            let count = input.remaining().len().min(buffer.remaining_capacity());
890            buffer.move_from(*input, count);
891        } else {
892            let store = store.as_context_mut();
893            let transmit = store.0.concurrent_state_mut()?.get_mut(self.id)?;
894
895            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
896                bail_bug!("expected ReadState::HostReady");
897            };
898
899            let &WriteState::GuestReady {
900                ty,
901                address,
902                count,
903                options,
904                instance,
905                ..
906            } = &transmit.write
907            else {
908                bail_bug!("expected WriteState::GuestReady");
909            };
910
911            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
912            let ty = ty.payload(cx.types);
913            let old_remaining = buffer.remaining_capacity();
914            lift::<T, B>(
915                cx,
916                ty.copied(),
917                buffer,
918                address + (T::SIZE32 * guest_offset.as_usize()),
919                count.as_usize() - guest_offset.as_usize(),
920            )?;
921
922            let transmit = store.0.concurrent_state_mut()?.get_mut(self.id)?;
923
924            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
925                bail_bug!("expected ReadState::HostReady");
926            };
927
928            guest_offset.inc(old_remaining - buffer.remaining_capacity())?;
929        }
930
931        Ok(())
932    }
933
934    /// Return the number of items remaining to be read from the current write
935    /// operation.
936    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
937    where
938        T: 'static,
939    {
940        // Note that this unwrap should only trigger for bugs in Wasmtime, and
941        // this is modeled here to centralize the `.unwrap()` for this method in
942        // one location.
943        self.remaining_(store.as_context_mut().0).unwrap()
944    }
945
946    fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
947    where
948        T: 'static,
949    {
950        let transmit = store.concurrent_state_mut()?.get_mut(self.id)?;
951
952        if let &WriteState::GuestReady { count, .. } = &transmit.write {
953            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
954                bail_bug!("expected ReadState::HostReady")
955            };
956
957            Ok(count.as_usize() - guest_offset.as_usize())
958        } else if let Some(host_buffer) = &self.host_buffer {
959            Ok(host_buffer.remaining().len())
960        } else {
961            bail_bug!("expected either WriteState::GuestReady or host buffer")
962        }
963    }
964}
965
966impl<'a> Source<'a, u8> {
967    /// Return a `DirectSource` view of `self`.
968    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
969        DirectSource {
970            id: self.id,
971            host_buffer: self.host_buffer,
972            store,
973        }
974    }
975}
976
977/// Represents a write to a `stream<u8>`, providing direct access to the
978/// writer's buffer.
979pub struct DirectSource<'a, D: 'static> {
980    id: TableId<TransmitState>,
981    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
982    store: StoreContextMut<'a, D>,
983}
984
985#[cfg(feature = "std")]
986impl<D: 'static> std::io::Read for DirectSource<'_, D> {
987    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
988        let rem = self.remaining();
989        let n = rem.len().min(buf.len());
990        buf[..n].copy_from_slice(&rem[..n]);
991        self.mark_read(n);
992        Ok(n)
993    }
994}
995
996impl<D: 'static> DirectSource<'_, D> {
997    /// Provide direct access to the writer's buffer.
998    pub fn remaining(&mut self) -> &[u8] {
999        // Note that this unwrap should only trigger for bugs in Wasmtime, and
1000        // this is modeled here to centralize the `.unwrap()` for this method in
1001        // one location.
1002        self.remaining_().unwrap()
1003    }
1004
1005    fn remaining_(&mut self) -> Result<&[u8]> {
1006        if let Some(buffer) = self.host_buffer.as_deref_mut() {
1007            return Ok(buffer.remaining());
1008        }
1009        let transmit = self
1010            .store
1011            .as_context_mut()
1012            .0
1013            .concurrent_state_mut()?
1014            .get_mut(self.id)?;
1015
1016        let &WriteState::GuestReady {
1017            address,
1018            count,
1019            options,
1020            instance,
1021            ..
1022        } = &transmit.write
1023        else {
1024            bail_bug!("expected WriteState::GuestReady")
1025        };
1026
1027        let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
1028            bail_bug!("expected ReadState::HostReady")
1029        };
1030
1031        let memory = instance
1032            .options_memory(self.store.0, options)
1033            .get((address + guest_offset.as_usize())..)
1034            .and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize())));
1035        match memory {
1036            Some(memory) => Ok(memory),
1037            None => bail_bug!("guest buffer unexpectedly out of bounds"),
1038        }
1039    }
1040
1041    /// Mark the specified number of bytes as read from the writer's buffer.
1042    ///
1043    /// # Panics
1044    ///
1045    /// This will panic if the count is larger than the size of the buffer
1046    /// returned by `Self::remaining`.
1047    pub fn mark_read(&mut self, count: usize) {
1048        // Note that this unwrap should only trigger for bugs in Wasmtime, and
1049        // this is modeled here to centralize the `.unwrap()` for this method in
1050        // one location.
1051        self.mark_read_(count).unwrap()
1052    }
1053
1054    fn mark_read_(&mut self, count: usize) -> Result<()> {
1055        if let Some(buffer) = self.host_buffer.as_deref_mut() {
1056            buffer.skip(count);
1057            return Ok(());
1058        }
1059
1060        let transmit = self
1061            .store
1062            .as_context_mut()
1063            .0
1064            .concurrent_state_mut()?
1065            .get_mut(self.id)?;
1066
1067        let WriteState::GuestReady {
1068            count: write_count, ..
1069        } = &transmit.write
1070        else {
1071            bail_bug!("expected WriteState::GuestReady");
1072        };
1073
1074        let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
1075            bail_bug!("expected ReadState::HostReady");
1076        };
1077
1078        if guest_offset.as_usize() + count > write_count.as_usize() {
1079            // Note that this is a documented panic condition of `mark_read`.
1080            panic!("read count ({count}) must be less than or equal to write count ({write_count})")
1081        } else {
1082            guest_offset.inc(count)?;
1083        }
1084        Ok(())
1085    }
1086}
1087
1088/// Represents the host-owned read end of a stream.
1089pub trait StreamConsumer<D>: Send + 'static {
1090    /// The payload type of this stream.
1091    type Item;
1092
1093    /// Handle a host- or guest-initiated write by accepting zero or more items
1094    /// from the specified source.
1095    ///
1096    /// This will be called whenever the writer starts a write.
1097    ///
1098    /// If the implementation is able to consume one or more items immediately,
1099    /// it should take them from `source` and return either
1100    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
1101    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
1102    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
1103    /// indicate that the caller should delay sending a `COMPLETED` event to the
1104    /// writer until a later call to this function returns `Poll::Ready(_)`.
1105    /// For more about that, see the `Backpressure` section below.
1106    ///
1107    /// If the implementation cannot consume any items immediately and `finish`
1108    /// is _false_, it should store the waker from `cx` for later and return
1109    /// `Poll::Pending` without writing anything to `destination`.  Later, it
1110    /// should alert the waker when either (1) the items arrive, (2) the stream
1111    /// has ended, or (3) an error occurs.
1112    ///
1113    /// If the implementation cannot consume any items immediately and `finish`
1114    /// is _true_, it should, if possible, return
1115    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
1116    /// anything from `source`.  However, that might not be possible if an
1117    /// earlier call to `poll_consume` kicked off an asynchronous operation
1118    /// which needs to be completed (and possibly interrupted) gracefully, in
1119    /// which case the implementation may return `Poll::Pending` and later alert
1120    /// the waker as described above.  In other words, when `finish` is true,
1121    /// the implementation should prioritize returning a result to the reader
1122    /// (even if no items can be consumed) rather than wait indefinitely for at
1123    /// capacity to free up.
1124    ///
1125    /// In all of the above cases, the implementation may alternatively choose
1126    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
1127    /// the guest (if any) to trap and render the component instance (if any)
1128    /// unusable.  The implementation should report errors that _are_
1129    /// recoverable by other means (e.g. by writing to a `future`) and return
1130    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
1131    ///
1132    /// Note that the implementation should only return
1133    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
1134    /// items from `source` if called with `finish` set to true.  If it does so
1135    /// when `finish` is false, the caller will trap.  Additionally, it should
1136    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
1137    /// least one item from `source` if there is an item available; otherwise,
1138    /// the caller will trap.  If `poll_consume` is called with no items in
1139    /// `source`, it should only return `Poll::Ready(_)` once it is able to
1140    /// accept at least one item during the next call to `poll_consume`.
1141    ///
1142    /// Note that any items which the implementation of this trait takes from
1143    /// `source` become the responsibility of that implementation.  For that
1144    /// reason, an implementation which forwards items to an upstream sink
1145    /// should reserve capacity in that sink before taking items out of
1146    /// `source`, if possible.  Alternatively, it might buffer items which can't
1147    /// be forwarded immediately and send them once capacity is freed up.
1148    ///
1149    /// ## Backpressure
1150    ///
1151    /// As mentioned above, an implementation might choose to return
1152    /// `Poll::Pending` after taking items from `source`, which tells the caller
1153    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
1154    /// a form of backpressure when the items are forwarded to an upstream sink
1155    /// asynchronously.  Note, however, that it's not possible to "put back"
1156    /// items into `source` once they've been taken out, so if the upstream sink
1157    /// is unable to accept all the items, that cannot be communicated to the
1158    /// writer at this level of abstraction.  Just as with application-specific,
1159    /// recoverable errors, information about which items could be forwarded and
1160    /// which could not must be communicated out-of-band, e.g. by writing to an
1161    /// application-specific `future`.
1162    ///
1163    /// Similarly, if the writer cancels the write after items have been taken
1164    /// from `source` but before the items have all been forwarded to an
1165    /// upstream sink, `poll_consume` will be called with `finish` set to true,
1166    /// and the implementation may either:
1167    ///
1168    /// - Interrupt the forwarding process gracefully.  This may be preferable
1169    /// if there is an out-of-band channel for communicating to the writer how
1170    /// many items were forwarded before being interrupted.
1171    ///
1172    /// - Allow the forwarding to complete without interrupting it.  This is
1173    /// usually preferable if there's no out-of-band channel for reporting back
1174    /// to the writer how many items were forwarded.
1175    fn poll_consume(
1176        self: Pin<&mut Self>,
1177        cx: &mut Context<'_>,
1178        store: StoreContextMut<D>,
1179        source: Source<'_, Self::Item>,
1180        finish: bool,
1181    ) -> Poll<Result<StreamResult>>;
1182}
1183
1184/// Represents a host-owned write end of a future.
1185pub trait FutureProducer<D>: Send + 'static {
1186    /// The payload type of this future.
1187    type Item;
1188
1189    /// Handle a host- or guest-initiated read by producing a value.
1190    ///
1191    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1192    /// simplified interface for futures.
1193    ///
1194    /// If `finish` is true, the implementation may return
1195    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
1196    /// could produce a value.  Otherwise, it must either return
1197    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
1198    fn poll_produce(
1199        self: Pin<&mut Self>,
1200        cx: &mut Context<'_>,
1201        store: StoreContextMut<D>,
1202        finish: bool,
1203    ) -> Poll<Result<Option<Self::Item>>>;
1204}
1205
1206impl<T, E, D, Fut> FutureProducer<D> for Fut
1207where
1208    E: Into<Error>,
1209    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1210{
1211    type Item = T;
1212
1213    fn poll_produce<'a>(
1214        self: Pin<&mut Self>,
1215        cx: &mut Context<'_>,
1216        _: StoreContextMut<'a, D>,
1217        finish: bool,
1218    ) -> Poll<Result<Option<T>>> {
1219        match self.poll(cx) {
1220            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1221            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1222            Poll::Pending if finish => Poll::Ready(Ok(None)),
1223            Poll::Pending => Poll::Pending,
1224        }
1225    }
1226}
1227
1228/// Represents a host-owned read end of a future.
1229pub trait FutureConsumer<D>: Send + 'static {
1230    /// The payload type of this future.
1231    type Item;
1232
1233    /// Handle a host- or guest-initiated write by consuming a value.
1234    ///
1235    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1236    /// simplified interface for futures.
1237    ///
1238    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
1239    /// without taking the item from `source`, which indicates the operation was
1240    /// canceled before it could consume the value.  Otherwise, it must either
1241    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
1242    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
1243    /// the item).
1244    fn poll_consume(
1245        self: Pin<&mut Self>,
1246        cx: &mut Context<'_>,
1247        store: StoreContextMut<D>,
1248        source: Source<'_, Self::Item>,
1249        finish: bool,
1250    ) -> Poll<Result<()>>;
1251}
1252
1253/// Represents the readable end of a Component Model `future`.
1254///
1255/// Note that `FutureReader` instances must be disposed of using either `pipe`
1256/// or `close`; otherwise the in-store representation will leak and the writer
1257/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1258/// ensure that disposal happens automatically.
1259pub struct FutureReader<T> {
1260    id: TableId<TransmitHandle>,
1261    _phantom: PhantomData<T>,
1262}
1263
1264impl<T> FutureReader<T> {
1265    /// Create a new future with the specified producer.
1266    ///
1267    /// # Errors
1268    ///
1269    /// Returns an error if the resource table for this store is full or if
1270    /// [`Config::concurrency_support`] is not enabled.
1271    ///
1272    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1273    pub fn new<S: AsContextMut>(
1274        mut store: S,
1275        producer: impl FutureProducer<S::Data, Item = T>,
1276    ) -> Result<Self>
1277    where
1278        T: func::Lower + func::Lift + Send + Sync + 'static,
1279    {
1280        ensure!(
1281            store.as_context().0.concurrency_support(),
1282            "concurrency support is not enabled"
1283        );
1284
1285        struct Producer<P>(P);
1286
1287        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1288            for Producer<P>
1289        {
1290            type Item = P::Item;
1291            type Buffer = Option<P::Item>;
1292
1293            fn poll_produce<'a>(
1294                self: Pin<&mut Self>,
1295                cx: &mut Context<'_>,
1296                store: StoreContextMut<D>,
1297                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1298                finish: bool,
1299            ) -> Poll<Result<StreamResult>> {
1300                // SAFETY: This is a standard pin-projection, and we never move
1301                // out of `self`.
1302                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1303
1304                Poll::Ready(Ok(
1305                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1306                        destination.set_buffer(Some(value));
1307
1308                        // Here we return `StreamResult::Completed` even though
1309                        // we've produced the last item we'll ever produce.
1310                        // That's because the ABI expects
1311                        // `ReturnCode::Completed(1)` rather than
1312                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1313                        // called again since the future will have resolved.
1314                        StreamResult::Completed
1315                    } else {
1316                        StreamResult::Cancelled
1317                    },
1318                ))
1319            }
1320        }
1321
1322        Ok(Self::new_(
1323            store
1324                .as_context_mut()
1325                .new_transmit(TransmitKind::Future, Producer(producer))?,
1326        ))
1327    }
1328
1329    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1330        Self {
1331            id,
1332            _phantom: PhantomData,
1333        }
1334    }
1335
1336    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1337        self.id
1338    }
1339
1340    /// Set the consumer that accepts the result of this future.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if this future has already been closed.
1345    ///
1346    /// # Panics
1347    ///
1348    /// Panics if this future does not belong to `store`.
1349    pub fn pipe<S: AsContextMut>(
1350        self,
1351        mut store: S,
1352        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1353    ) -> Result<()>
1354    where
1355        T: func::Lift + 'static,
1356    {
1357        struct Consumer<C>(C);
1358
1359        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1360            for Consumer<C>
1361        {
1362            type Item = T;
1363
1364            fn poll_consume(
1365                self: Pin<&mut Self>,
1366                cx: &mut Context<'_>,
1367                mut store: StoreContextMut<D>,
1368                mut source: Source<Self::Item>,
1369                finish: bool,
1370            ) -> Poll<Result<StreamResult>> {
1371                // SAFETY: This is a standard pin-projection, and we never move
1372                // out of `self`.
1373                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1374
1375                ready!(consumer.poll_consume(
1376                    cx,
1377                    store.as_context_mut(),
1378                    source.reborrow(),
1379                    finish
1380                ))?;
1381
1382                Poll::Ready(Ok(if source.remaining(store) == 0 {
1383                    // Here we return `StreamResult::Completed` even though
1384                    // we've consumed the last item we'll ever consume.  That's
1385                    // because the ABI expects `ReturnCode::Completed(1)` rather
1386                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1387                    // called again since the future will have resolved.
1388                    StreamResult::Completed
1389                } else {
1390                    StreamResult::Cancelled
1391                }))
1392            }
1393        }
1394
1395        store
1396            .as_context_mut()
1397            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1398    }
1399
1400    /// Transfer ownership of the read end of a future from a guest to the host.
1401    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1402        let id = lift_index_to_future(cx, ty, index)?;
1403        Ok(Self::new_(id))
1404    }
1405
1406    /// Close this `FutureReader`.
1407    ///
1408    /// This will close this half of the future which will signal to a pending
1409    /// write, if any, that the reader side is dropped. If the writer half has
1410    /// not yet written a value then when it attempts to write a value it will
1411    /// see that this end is closed.
1412    ///
1413    /// # Errors
1414    ///
1415    /// Returns an error if this future has already been closed.
1416    ///
1417    /// # Panics
1418    ///
1419    /// Panics if the store that the [`Accessor`] is derived from does not own
1420    /// this future.
1421    ///
1422    /// [`Accessor`]: crate::component::Accessor
1423    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1424        future_close(store.as_context_mut().0, &mut self.id)
1425    }
1426
1427    /// Convenience method around [`Self::close`].
1428    pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1429        accessor.as_accessor().with(|access| self.close(access))
1430    }
1431
1432    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1433    /// drop and clean it up from the store.
1434    ///
1435    /// Note that the `accessor` provided must own this future and is
1436    /// additionally transferred to the `GuardedFutureReader` return value.
1437    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1438    where
1439        A: AsAccessor,
1440    {
1441        GuardedFutureReader::new(accessor, self)
1442    }
1443
1444    /// Attempts to convert this [`FutureReader<T>`] to a [`FutureAny`].
1445    ///
1446    /// # Errors
1447    ///
1448    /// This function will return an error if `self` does not belong to
1449    /// `store`.
1450    pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1451    where
1452        T: ComponentType + 'static,
1453    {
1454        FutureAny::try_from_future_reader(store, self)
1455    }
1456
1457    /// Attempts to convert a [`FutureAny`] into a [`FutureReader<T>`].
1458    ///
1459    /// # Errors
1460    ///
1461    /// This function will fail if `T` doesn't match the type of the value that
1462    /// `future` is sending.
1463    pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1464    where
1465        T: ComponentType + 'static,
1466    {
1467        future.try_into_future_reader()
1468    }
1469}
1470
1471impl<T> fmt::Debug for FutureReader<T> {
1472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1473        f.debug_struct("FutureReader")
1474            .field("id", &self.id)
1475            .finish()
1476    }
1477}
1478
1479pub(super) fn future_close(
1480    store: &mut StoreOpaque,
1481    id: &mut TableId<TransmitHandle>,
1482) -> Result<()> {
1483    let id = mem::replace(id, TableId::new(u32::MAX));
1484    store.host_drop_reader(id, TransmitKind::Future)
1485}
1486
1487/// Transfer ownership of the read end of a future from the host to a guest.
1488pub(super) fn lift_index_to_future(
1489    cx: &mut LiftContext<'_>,
1490    ty: InterfaceType,
1491    index: u32,
1492) -> Result<TableId<TransmitHandle>> {
1493    match ty {
1494        InterfaceType::Future(src) => {
1495            let (state, instance) = cx.concurrent_state_and_instance_mut();
1496            lift_index_to_transmit(instance, state, TransmitIndex::Future(src), index)
1497        }
1498        _ => func::bad_type_info(),
1499    }
1500}
1501
1502/// Transfer ownership of the read end of a future from the host to a guest.
1503pub(super) fn lower_future_to_index<U>(
1504    id: TableId<TransmitHandle>,
1505    cx: &mut LowerContext<'_, U>,
1506    ty: InterfaceType,
1507) -> Result<u32> {
1508    match ty {
1509        InterfaceType::Future(dst) => {
1510            cx.instance_handle()
1511                .lower_transmit_to_index(cx.store.0, TransmitIndex::Future(dst), id)
1512        }
1513        _ => func::bad_type_info(),
1514    }
1515}
1516
1517// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1518// safe and correct since we lift and lower future handles as `u32`s.
1519unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1520    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1521
1522    type Lower = <u32 as func::ComponentType>::Lower;
1523
1524    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1525        match ty {
1526            InterfaceType::Future(ty) => {
1527                let ty = types.types[*ty].ty;
1528                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1529            }
1530            other => bail!("expected `future`, found `{}`", func::desc(other)),
1531        }
1532    }
1533}
1534
1535// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1536unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1537    fn linear_lower_to_flat<U>(
1538        &self,
1539        cx: &mut LowerContext<'_, U>,
1540        ty: InterfaceType,
1541        dst: &mut MaybeUninit<Self::Lower>,
1542    ) -> Result<()> {
1543        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1544    }
1545
1546    fn linear_lower_to_memory<U>(
1547        &self,
1548        cx: &mut LowerContext<'_, U>,
1549        ty: InterfaceType,
1550        offset: usize,
1551    ) -> Result<()> {
1552        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1553            cx,
1554            InterfaceType::U32,
1555            offset,
1556        )
1557    }
1558}
1559
1560// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1561unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1562    fn linear_lift_from_flat(
1563        cx: &mut LiftContext<'_>,
1564        ty: InterfaceType,
1565        src: &Self::Lower,
1566    ) -> Result<Self> {
1567        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1568        Self::lift_from_index(cx, ty, index)
1569    }
1570
1571    fn linear_lift_from_memory(
1572        cx: &mut LiftContext<'_>,
1573        ty: InterfaceType,
1574        bytes: &[u8],
1575    ) -> Result<Self> {
1576        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1577        Self::lift_from_index(cx, ty, index)
1578    }
1579}
1580
1581/// A [`FutureReader`] paired with an [`Accessor`].
1582///
1583/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1584/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1585/// [`FutureReader::guard`].
1586///
1587/// [`Accessor`]: crate::component::Accessor
1588pub struct GuardedFutureReader<T, A>
1589where
1590    A: AsAccessor,
1591{
1592    // This field is `None` to implement the conversion from this guard back to
1593    // `FutureReader`. When `None` is seen in the destructor it will cause the
1594    // destructor to do nothing.
1595    reader: Option<FutureReader<T>>,
1596    accessor: A,
1597}
1598
1599impl<T, A> GuardedFutureReader<T, A>
1600where
1601    A: AsAccessor,
1602{
1603    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1604    ///
1605    /// # Panics
1606    ///
1607    /// Panics if [`Config::concurrency_support`] is not enabled.
1608    ///
1609    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1610    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1611        assert!(
1612            accessor
1613                .as_accessor()
1614                .with(|a| a.as_context().0.concurrency_support())
1615        );
1616        Self {
1617            reader: Some(reader),
1618            accessor,
1619        }
1620    }
1621
1622    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1623    /// back.
1624    pub fn into_future(self) -> FutureReader<T> {
1625        self.into()
1626    }
1627}
1628
1629impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1630where
1631    A: AsAccessor,
1632{
1633    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1634        guard.reader.take().unwrap()
1635    }
1636}
1637
1638impl<T, A> Drop for GuardedFutureReader<T, A>
1639where
1640    A: AsAccessor,
1641{
1642    fn drop(&mut self) {
1643        if let Some(reader) = &mut self.reader {
1644            // Currently this can only fail if the future is closed twice, which
1645            // this guard prevents, so this error shouldn't happen.
1646            let result = reader.close_with(&self.accessor);
1647            debug_assert!(result.is_ok());
1648        }
1649    }
1650}
1651
1652/// Represents the readable end of a Component Model `stream`.
1653///
1654/// Note that `StreamReader` instances must be disposed of using `close`;
1655/// otherwise the in-store representation will leak and the writer end will hang
1656/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1657/// disposal happens automatically.
1658pub struct StreamReader<T> {
1659    id: TableId<TransmitHandle>,
1660    _phantom: PhantomData<T>,
1661}
1662
1663impl<T> StreamReader<T> {
1664    /// Create a new stream with the specified producer.
1665    ///
1666    /// # Errors
1667    ///
1668    /// Returns an error if the resource table for this store is full or if
1669    /// [`Config::concurrency_support`] is not enabled.
1670    ///
1671    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1672    pub fn new<S: AsContextMut>(
1673        mut store: S,
1674        producer: impl StreamProducer<S::Data, Item = T>,
1675    ) -> Result<Self>
1676    where
1677        T: func::Lower + func::Lift + Send + Sync + 'static,
1678    {
1679        ensure!(
1680            store.as_context().0.concurrency_support(),
1681            "concurrency support is not enabled",
1682        );
1683        Ok(Self::new_(
1684            store
1685                .as_context_mut()
1686                .new_transmit(TransmitKind::Stream, producer)?,
1687        ))
1688    }
1689
1690    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1691        Self {
1692            id,
1693            _phantom: PhantomData,
1694        }
1695    }
1696
1697    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1698        self.id
1699    }
1700
1701    /// Attempt to consume this object by converting it into the specified type.
1702    ///
1703    /// This can be useful for "short-circuiting" host-to-host streams,
1704    /// bypassing the guest entirely.  For example, if a guest task returns a
1705    /// host-created stream and then exits, this function may be used to
1706    /// retrieve the write end, after which the guest instance and store may be
1707    /// disposed of if no longer needed.
1708    ///
1709    /// This will return `Ok(_)` if and only if the following conditions are
1710    /// met:
1711    ///
1712    /// - The stream was created by the host (i.e. not by the guest).
1713    ///
1714    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1715    /// producer provided to `StreamReader::new` when the stream was created,
1716    /// along with `TypeId::of::<V>()`.
1717    ///
1718    /// # Panics
1719    ///
1720    /// Panics if this stream has already been closed, or if this stream doesn't
1721    /// belong to the specified `store`.
1722    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1723        let store = store.as_context_mut();
1724        let state = store.0.concurrent_state_mut_already_forced_current_thread();
1725        let id = state.get_mut(self.id).unwrap().state;
1726        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1727            match try_into(TypeId::of::<V>()) {
1728                Some(result) => {
1729                    self.close(store).unwrap();
1730                    Ok(*result.downcast::<V>().unwrap())
1731                }
1732                None => Err(self),
1733            }
1734        } else {
1735            Err(self)
1736        }
1737    }
1738
1739    /// Set the consumer that accepts the items delivered to this stream.
1740    ///
1741    /// # Errors
1742    ///
1743    /// Returns an error if this stream has already been closed.
1744    ///
1745    /// # Panics
1746    ///
1747    /// Panics if this stream does not belong to `store`.
1748    pub fn pipe<S: AsContextMut>(
1749        self,
1750        mut store: S,
1751        consumer: impl StreamConsumer<S::Data, Item = T>,
1752    ) -> Result<()>
1753    where
1754        T: 'static,
1755    {
1756        store
1757            .as_context_mut()
1758            .set_consumer(self.id, TransmitKind::Stream, consumer)
1759    }
1760
1761    /// Transfer ownership of the read end of a stream from a guest to the host.
1762    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1763        let id = lift_index_to_stream(cx, ty, index)?;
1764        Ok(Self::new_(id))
1765    }
1766
1767    /// Close this `StreamReader`.
1768    ///
1769    /// This will signal that this portion of the stream is closed causing all
1770    /// future writes to return immediately with "DROPPED".
1771    ///
1772    /// # Errors
1773    ///
1774    /// Returns an error if this stream has already been closed.
1775    ///
1776    /// # Panics
1777    ///
1778    /// Panics if the store that the [`Accessor`] is derived from does not own
1779    /// this stream.
1780    ///
1781    /// [`Accessor`]: crate::component::Accessor
1782    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1783        stream_close(store.as_context_mut().0, &mut self.id)
1784    }
1785
1786    /// Convenience method around [`Self::close`].
1787    pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1788        accessor.as_accessor().with(|access| self.close(access))
1789    }
1790
1791    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1792    /// drop and clean it up from the store.
1793    ///
1794    /// Note that the `accessor` provided must own this future and is
1795    /// additionally transferred to the `GuardedStreamReader` return value.
1796    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1797    where
1798        A: AsAccessor,
1799    {
1800        GuardedStreamReader::new(accessor, self)
1801    }
1802
1803    /// Attempts to convert this [`StreamReader<T>`] to a [`StreamAny`].
1804    ///
1805    /// # Errors
1806    ///
1807    /// This function will return an error if `self` does not belong to
1808    /// `store`.
1809    pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1810    where
1811        T: ComponentType + 'static,
1812    {
1813        StreamAny::try_from_stream_reader(store, self)
1814    }
1815
1816    /// Attempts to convert a [`StreamAny`] into a [`StreamReader<T>`].
1817    ///
1818    /// # Errors
1819    ///
1820    /// This function will fail if `T` doesn't match the type of the value that
1821    /// `stream` is sending.
1822    pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1823    where
1824        T: ComponentType + 'static,
1825    {
1826        stream.try_into_stream_reader()
1827    }
1828}
1829
1830impl<T> fmt::Debug for StreamReader<T> {
1831    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1832        f.debug_struct("StreamReader")
1833            .field("id", &self.id)
1834            .finish()
1835    }
1836}
1837
1838pub(super) fn stream_close(
1839    store: &mut StoreOpaque,
1840    id: &mut TableId<TransmitHandle>,
1841) -> Result<()> {
1842    let id = mem::replace(id, TableId::new(u32::MAX));
1843    store.host_drop_reader(id, TransmitKind::Stream)
1844}
1845
1846/// Transfer ownership of the read end of a stream from a guest to the host.
1847pub(super) fn lift_index_to_stream(
1848    cx: &mut LiftContext<'_>,
1849    ty: InterfaceType,
1850    index: u32,
1851) -> Result<TableId<TransmitHandle>> {
1852    match ty {
1853        InterfaceType::Stream(src) => {
1854            let (state, instance) = cx.concurrent_state_and_instance_mut();
1855            lift_index_to_transmit(instance, state, TransmitIndex::Stream(src), index)
1856        }
1857        _ => func::bad_type_info(),
1858    }
1859}
1860
1861/// Transfer ownership of the read end of a stream from the host to a guest.
1862pub(super) fn lower_stream_to_index<U>(
1863    id: TableId<TransmitHandle>,
1864    cx: &mut LowerContext<'_, U>,
1865    ty: InterfaceType,
1866) -> Result<u32> {
1867    match ty {
1868        InterfaceType::Stream(dst) => {
1869            cx.instance_handle()
1870                .lower_transmit_to_index(cx.store.0, TransmitIndex::Stream(dst), id)
1871        }
1872        _ => func::bad_type_info(),
1873    }
1874}
1875
1876// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1877// safe and correct since we lift and lower stream handles as `u32`s.
1878unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1879    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1880
1881    type Lower = <u32 as func::ComponentType>::Lower;
1882
1883    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1884        match ty {
1885            InterfaceType::Stream(ty) => {
1886                let ty = types.types[*ty].ty;
1887                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1888            }
1889            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1890        }
1891    }
1892}
1893
1894// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1895unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1896    fn linear_lower_to_flat<U>(
1897        &self,
1898        cx: &mut LowerContext<'_, U>,
1899        ty: InterfaceType,
1900        dst: &mut MaybeUninit<Self::Lower>,
1901    ) -> Result<()> {
1902        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1903    }
1904
1905    fn linear_lower_to_memory<U>(
1906        &self,
1907        cx: &mut LowerContext<'_, U>,
1908        ty: InterfaceType,
1909        offset: usize,
1910    ) -> Result<()> {
1911        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1912            cx,
1913            InterfaceType::U32,
1914            offset,
1915        )
1916    }
1917}
1918
1919// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1920unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1921    fn linear_lift_from_flat(
1922        cx: &mut LiftContext<'_>,
1923        ty: InterfaceType,
1924        src: &Self::Lower,
1925    ) -> Result<Self> {
1926        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1927        Self::lift_from_index(cx, ty, index)
1928    }
1929
1930    fn linear_lift_from_memory(
1931        cx: &mut LiftContext<'_>,
1932        ty: InterfaceType,
1933        bytes: &[u8],
1934    ) -> Result<Self> {
1935        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1936        Self::lift_from_index(cx, ty, index)
1937    }
1938}
1939
1940/// A [`StreamReader`] paired with an [`Accessor`].
1941///
1942/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1943/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1944/// [`StreamReader::guard`].
1945///
1946/// [`Accessor`]: crate::component::Accessor
1947pub struct GuardedStreamReader<T, A>
1948where
1949    A: AsAccessor,
1950{
1951    // This field is `None` to implement the conversion from this guard back to
1952    // `StreamReader`. When `None` is seen in the destructor it will cause the
1953    // destructor to do nothing.
1954    reader: Option<StreamReader<T>>,
1955    accessor: A,
1956}
1957
1958impl<T, A> GuardedStreamReader<T, A>
1959where
1960    A: AsAccessor,
1961{
1962    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1963    /// `reader`.
1964    ///
1965    /// # Panics
1966    ///
1967    /// Panics if [`Config::concurrency_support`] is not enabled.
1968    ///
1969    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1970    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1971        assert!(
1972            accessor
1973                .as_accessor()
1974                .with(|a| a.as_context().0.concurrency_support())
1975        );
1976        Self {
1977            reader: Some(reader),
1978            accessor,
1979        }
1980    }
1981
1982    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1983    /// back.
1984    pub fn into_stream(self) -> StreamReader<T> {
1985        self.into()
1986    }
1987}
1988
1989impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1990where
1991    A: AsAccessor,
1992{
1993    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1994        guard.reader.take().unwrap()
1995    }
1996}
1997
1998impl<T, A> Drop for GuardedStreamReader<T, A>
1999where
2000    A: AsAccessor,
2001{
2002    fn drop(&mut self) {
2003        if let Some(reader) = &mut self.reader {
2004            // Currently this can only fail if the future is closed twice, which
2005            // this guard prevents, so this error shouldn't happen.
2006            let result = reader.close_with(&self.accessor);
2007            debug_assert!(result.is_ok());
2008        }
2009    }
2010}
2011
2012/// Represents a Component Model `error-context`.
2013pub struct ErrorContext {
2014    rep: u32,
2015}
2016
2017impl ErrorContext {
2018    pub(crate) fn new(rep: u32) -> Self {
2019        Self { rep }
2020    }
2021
2022    /// Convert this `ErrorContext` into a [`Val`].
2023    pub fn into_val(self) -> Val {
2024        Val::ErrorContext(ErrorContextAny(self.rep))
2025    }
2026
2027    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
2028    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
2029        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
2030            bail!("expected `error-context`; got `{}`", value.desc());
2031        };
2032        Ok(Self::new(*rep))
2033    }
2034
2035    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
2036        match ty {
2037            InterfaceType::ErrorContext(src) => {
2038                let rep = cx
2039                    .instance_mut()
2040                    .table_for_error_context(src)
2041                    .error_context_rep(index)?;
2042
2043                Ok(Self { rep })
2044            }
2045            _ => func::bad_type_info(),
2046        }
2047    }
2048}
2049
2050pub(crate) fn lower_error_context_to_index<U>(
2051    rep: u32,
2052    cx: &mut LowerContext<'_, U>,
2053    ty: InterfaceType,
2054) -> Result<u32> {
2055    match ty {
2056        InterfaceType::ErrorContext(dst) => {
2057            let tbl = cx.instance_mut().table_for_error_context(dst);
2058            tbl.error_context_insert(rep)
2059        }
2060        _ => func::bad_type_info(),
2061    }
2062}
2063// SAFETY: This relies on the `ComponentType` implementation for `u32` being
2064// safe and correct since we lift and lower future handles as `u32`s.
2065unsafe impl func::ComponentType for ErrorContext {
2066    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2067
2068    type Lower = <u32 as func::ComponentType>::Lower;
2069
2070    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2071        match ty {
2072            InterfaceType::ErrorContext(_) => Ok(()),
2073            other => bail!("expected `error`, found `{}`", func::desc(other)),
2074        }
2075    }
2076}
2077
2078// SAFETY: See the comment on the `ComponentType` `impl` for this type.
2079unsafe impl func::Lower for ErrorContext {
2080    fn linear_lower_to_flat<T>(
2081        &self,
2082        cx: &mut LowerContext<'_, T>,
2083        ty: InterfaceType,
2084        dst: &mut MaybeUninit<Self::Lower>,
2085    ) -> Result<()> {
2086        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2087            cx,
2088            InterfaceType::U32,
2089            dst,
2090        )
2091    }
2092
2093    fn linear_lower_to_memory<T>(
2094        &self,
2095        cx: &mut LowerContext<'_, T>,
2096        ty: InterfaceType,
2097        offset: usize,
2098    ) -> Result<()> {
2099        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2100            cx,
2101            InterfaceType::U32,
2102            offset,
2103        )
2104    }
2105}
2106
2107// SAFETY: See the comment on the `ComponentType` `impl` for this type.
2108unsafe impl func::Lift for ErrorContext {
2109    fn linear_lift_from_flat(
2110        cx: &mut LiftContext<'_>,
2111        ty: InterfaceType,
2112        src: &Self::Lower,
2113    ) -> Result<Self> {
2114        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2115        Self::lift_from_index(cx, ty, index)
2116    }
2117
2118    fn linear_lift_from_memory(
2119        cx: &mut LiftContext<'_>,
2120        ty: InterfaceType,
2121        bytes: &[u8],
2122    ) -> Result<Self> {
2123        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2124        Self::lift_from_index(cx, ty, index)
2125    }
2126}
2127
2128/// Represents the read or write end of a stream or future.
2129pub(super) struct TransmitHandle {
2130    pub(super) common: WaitableCommon,
2131    /// See `TransmitState`
2132    state: TableId<TransmitState>,
2133}
2134
2135impl TransmitHandle {
2136    fn new(state: TableId<TransmitState>) -> Self {
2137        Self {
2138            common: WaitableCommon::default(),
2139            state,
2140        }
2141    }
2142}
2143
2144impl TableDebug for TransmitHandle {
2145    fn type_name() -> &'static str {
2146        "TransmitHandle"
2147    }
2148}
2149
2150/// Represents the state of a stream or future.
2151struct TransmitState {
2152    /// The write end of the stream or future.
2153    write_handle: TableId<TransmitHandle>,
2154    /// The read end of the stream or future.
2155    read_handle: TableId<TransmitHandle>,
2156    /// See `WriteState`
2157    write: WriteState,
2158    /// See `ReadState`
2159    read: ReadState,
2160    /// Whether further values may be transmitted via this stream or future.
2161    done: bool,
2162    /// The original creator of this stream, used for type-checking with
2163    /// `{Future,Stream}Any`.
2164    pub(super) origin: TransmitOrigin,
2165}
2166
2167#[derive(Copy, Clone)]
2168pub(super) enum TransmitOrigin {
2169    Host,
2170    GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2171    GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2172}
2173
2174impl TransmitState {
2175    fn new(origin: TransmitOrigin) -> Self {
2176        Self {
2177            write_handle: TableId::new(u32::MAX),
2178            read_handle: TableId::new(u32::MAX),
2179            read: ReadState::Open,
2180            write: WriteState::Open,
2181            done: false,
2182            origin,
2183        }
2184    }
2185}
2186
2187impl TableDebug for TransmitState {
2188    fn type_name() -> &'static str {
2189        "TransmitState"
2190    }
2191}
2192
2193impl TransmitOrigin {
2194    fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2195        match index {
2196            TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2197            TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2198        }
2199    }
2200}
2201
2202type PollStream = Box<
2203    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2204>;
2205
2206type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2207
2208/// Represents the state of the write end of a stream or future.
2209enum WriteState {
2210    /// The write end is open, but no write is pending.
2211    Open,
2212    /// The write end is owned by a guest task and a write is pending.
2213    GuestReady {
2214        instance: Instance,
2215        caller: RuntimeComponentInstanceIndex,
2216        ty: TransmitIndex,
2217        flat_abi: Option<FlatAbi>,
2218        options: OptionsIndex,
2219        address: usize,
2220        count: ItemCount,
2221        handle: u32,
2222    },
2223    /// The write end is owned by the host, which is ready to produce items.
2224    HostReady {
2225        produce: PollStream,
2226        try_into: TryInto,
2227        guest_offset: ItemCount,
2228        cancel: bool,
2229        cancel_waker: Option<Waker>,
2230    },
2231    /// The write end has been dropped.
2232    Dropped,
2233}
2234
2235impl fmt::Debug for WriteState {
2236    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2237        match self {
2238            Self::Open => f.debug_tuple("Open").finish(),
2239            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2240            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2241            Self::Dropped => f.debug_tuple("Dropped").finish(),
2242        }
2243    }
2244}
2245
2246/// Represents the state of the read end of a stream or future.
2247enum ReadState {
2248    /// The read end is open, but no read is pending.
2249    Open,
2250    /// The read end is owned by a guest task and a read is pending.
2251    GuestReady {
2252        ty: TransmitIndex,
2253        caller_instance: RuntimeComponentInstanceIndex,
2254        caller_thread: QualifiedThreadId,
2255        flat_abi: Option<FlatAbi>,
2256        instance: Instance,
2257        options: OptionsIndex,
2258        address: usize,
2259        count: ItemCount,
2260        handle: u32,
2261    },
2262    /// The read end is owned by a host task, and it is ready to consume items.
2263    HostReady {
2264        consume: PollStream,
2265        guest_offset: ItemCount,
2266        cancel: bool,
2267        cancel_waker: Option<Waker>,
2268    },
2269    /// Both the read and write ends are owned by the host.
2270    HostToHost {
2271        accept: Box<
2272            dyn for<'a> Fn(
2273                    &'a mut UntypedWriteBuffer<'a>,
2274                )
2275                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2276                + Send
2277                + Sync,
2278        >,
2279        buffer: Vec<u8>,
2280        limit: usize,
2281    },
2282    /// The read end has been dropped.
2283    Dropped,
2284}
2285
2286impl fmt::Debug for ReadState {
2287    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2288        match self {
2289            Self::Open => f.debug_tuple("Open").finish(),
2290            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2291            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2292            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2293            Self::Dropped => f.debug_tuple("Dropped").finish(),
2294        }
2295    }
2296}
2297
2298fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result<ReturnCode> {
2299    Ok(match state {
2300        StreamResult::Dropped => ReturnCode::Dropped(count),
2301        StreamResult::Completed => ReturnCode::completed(kind, count),
2302        StreamResult::Cancelled => ReturnCode::Cancelled(count),
2303    })
2304}
2305
2306impl StoreOpaque {
2307    fn pipe_from_guest(
2308        &mut self,
2309        kind: TransmitKind,
2310        id: TableId<TransmitState>,
2311        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2312    ) {
2313        let future = async move {
2314            let stream_state = future.await?;
2315            tls::get(|store| {
2316                let state = store.concurrent_state_mut()?;
2317                let transmit = state.get_mut(id)?;
2318                let ReadState::HostReady {
2319                    consume,
2320                    guest_offset,
2321                    ..
2322                } = mem::replace(&mut transmit.read, ReadState::Open)
2323                else {
2324                    bail_bug!("expected ReadState::HostReady")
2325                };
2326                let code = return_code(kind, stream_state, guest_offset)?;
2327                transmit.read = match stream_state {
2328                    StreamResult::Dropped => ReadState::Dropped,
2329                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2330                        consume,
2331                        guest_offset: ItemCount::ZERO,
2332                        cancel: false,
2333                        cancel_waker: None,
2334                    },
2335                };
2336                let WriteState::GuestReady { ty, handle, .. } =
2337                    mem::replace(&mut transmit.write, WriteState::Open)
2338                else {
2339                    bail_bug!("expected WriteState::GuestReady")
2340                };
2341                state.send_write_result(ty, id, handle, code)?;
2342                Ok(())
2343            })
2344        };
2345
2346        self.concurrent_state_mut_already_forced_current_thread()
2347            .push_future(future.boxed());
2348    }
2349
2350    fn pipe_to_guest(
2351        &mut self,
2352        kind: TransmitKind,
2353        id: TableId<TransmitState>,
2354        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2355    ) {
2356        let future = async move {
2357            let stream_state = future.await?;
2358            tls::get(|store| {
2359                let state = store.concurrent_state_mut()?;
2360                let transmit = state.get_mut(id)?;
2361                let WriteState::HostReady {
2362                    produce,
2363                    try_into,
2364                    guest_offset,
2365                    ..
2366                } = mem::replace(&mut transmit.write, WriteState::Open)
2367                else {
2368                    bail_bug!("expected WriteState::HostReady")
2369                };
2370                let code = return_code(kind, stream_state, guest_offset)?;
2371                transmit.write = match stream_state {
2372                    StreamResult::Dropped => WriteState::Dropped,
2373                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2374                        produce,
2375                        try_into,
2376                        guest_offset: ItemCount::ZERO,
2377                        cancel: false,
2378                        cancel_waker: None,
2379                    },
2380                };
2381                let ReadState::GuestReady { ty, handle, .. } =
2382                    mem::replace(&mut transmit.read, ReadState::Open)
2383                else {
2384                    bail_bug!("expected ReadState::GuestReady")
2385                };
2386                state.send_read_result(ty, id, handle, code)?;
2387                Ok(())
2388            })
2389        };
2390
2391        self.concurrent_state_mut_already_forced_current_thread()
2392            .push_future(future.boxed());
2393    }
2394
2395    /// Drop the read end of a stream or future read from the host.
2396    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2397        let state = self.concurrent_state_mut()?;
2398        Waitable::Transmit(id).join(state, None)?;
2399        let transmit_id = state.get_mut(id)?.state;
2400        let transmit = state
2401            .get_mut(transmit_id)
2402            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2403        log::trace!(
2404            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2405            transmit.read,
2406            transmit.write
2407        );
2408
2409        transmit.read = ReadState::Dropped;
2410
2411        // If the write end is already dropped, it should stay dropped,
2412        // otherwise, it should be opened.
2413        let new_state = if let WriteState::Dropped = &transmit.write {
2414            WriteState::Dropped
2415        } else {
2416            WriteState::Open
2417        };
2418
2419        let write_handle = transmit.write_handle;
2420
2421        match mem::replace(&mut transmit.write, new_state) {
2422            // If a guest is waiting to write, notify it that the read end has
2423            // been dropped.
2424            WriteState::GuestReady { ty, handle, .. } => {
2425                state.update_event(
2426                    write_handle.rep(),
2427                    match ty {
2428                        TransmitIndex::Future(ty) => Event::FutureWrite {
2429                            code: ReturnCode::Dropped(ItemCount::ZERO),
2430                            pending: Some((ty, handle)),
2431                        },
2432                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2433                            code: ReturnCode::Dropped(ItemCount::ZERO),
2434                            pending: Some((ty, handle)),
2435                        },
2436                    },
2437                )?;
2438            }
2439
2440            WriteState::Open => {
2441                state.update_event(
2442                    write_handle.rep(),
2443                    match kind {
2444                        TransmitKind::Future => Event::FutureWrite {
2445                            code: ReturnCode::Dropped(ItemCount::ZERO),
2446                            pending: None,
2447                        },
2448                        TransmitKind::Stream => Event::StreamWrite {
2449                            code: ReturnCode::Dropped(ItemCount::ZERO),
2450                            pending: None,
2451                        },
2452                    },
2453                )?;
2454            }
2455
2456            // If the writer has already been dropped, then this cleans out the
2457            // state that the reader is using. If the write is host-owned then
2458            // by cleaning this out we run the host's `Drop` implementation
2459            // which notifies it of this drop.
2460            WriteState::Dropped | WriteState::HostReady { .. } => {
2461                log::trace!("host_drop_reader delete {transmit_id:?}");
2462                state.delete_transmit(transmit_id)?;
2463            }
2464        }
2465        Ok(())
2466    }
2467
2468    /// Drop the write end of a stream or future read from the host.
2469    fn host_drop_writer(
2470        &mut self,
2471        id: TableId<TransmitHandle>,
2472        on_drop_open: Option<fn() -> Result<()>>,
2473    ) -> Result<()> {
2474        let state = self.concurrent_state_mut()?;
2475        Waitable::Transmit(id).join(state, None)?;
2476        let transmit_id = state.get_mut(id)?.state;
2477        let transmit = state
2478            .get_mut(transmit_id)
2479            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2480        log::trace!(
2481            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2482            transmit.read,
2483            transmit.write
2484        );
2485
2486        // Existing queued transmits must be updated with information for the impending writer closure
2487        match &mut transmit.write {
2488            WriteState::GuestReady { .. } => {
2489                bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2490            }
2491            WriteState::HostReady { .. } => {}
2492            v @ WriteState::Open => {
2493                if let (Some(on_drop_open), false) = (on_drop_open, transmit.done) {
2494                    on_drop_open()?;
2495                } else {
2496                    *v = WriteState::Dropped;
2497                }
2498            }
2499            WriteState::Dropped => bail_bug!("write state is already dropped"),
2500        }
2501
2502        let transmit = self.concurrent_state_mut()?.get_mut(transmit_id)?;
2503
2504        // If the existing read state is dropped, then there's nothing to read
2505        // and we can keep it that way.
2506        //
2507        // If the read state was any other state, then we must set the new state to open
2508        // to indicate that there *is* data to be read
2509        let new_state = if let ReadState::Dropped = &transmit.read {
2510            ReadState::Dropped
2511        } else {
2512            ReadState::Open
2513        };
2514
2515        let read_handle = transmit.read_handle;
2516
2517        // Swap in the new read state
2518        match mem::replace(&mut transmit.read, new_state) {
2519            // If the guest was ready to read, then we cannot drop the reader (or writer);
2520            // we must deliver the event, and update the state associated with the handle to
2521            // represent that a read must be performed
2522            ReadState::GuestReady { ty, handle, .. } => {
2523                // Ensure the final read of the guest is queued, with appropriate closure indicator
2524                self.concurrent_state_mut()?.update_event(
2525                    read_handle.rep(),
2526                    match ty {
2527                        TransmitIndex::Future(ty) => Event::FutureRead {
2528                            code: ReturnCode::Dropped(ItemCount::ZERO),
2529                            pending: Some((ty, handle)),
2530                        },
2531                        TransmitIndex::Stream(ty) => Event::StreamRead {
2532                            code: ReturnCode::Dropped(ItemCount::ZERO),
2533                            pending: Some((ty, handle)),
2534                        },
2535                    },
2536                )?;
2537            }
2538
2539            // If the read state is open, then there are no registered readers of the stream/future
2540            ReadState::Open => {
2541                self.concurrent_state_mut()?.update_event(
2542                    read_handle.rep(),
2543                    match on_drop_open {
2544                        Some(_) => Event::FutureRead {
2545                            code: ReturnCode::Dropped(ItemCount::ZERO),
2546                            pending: None,
2547                        },
2548                        None => Event::StreamRead {
2549                            code: ReturnCode::Dropped(ItemCount::ZERO),
2550                            pending: None,
2551                        },
2552                    },
2553                )?;
2554            }
2555
2556            // If the read state was already dropped, then we can remove the
2557            // transmit state completely (both writer and reader have been
2558            // dropped). If the read state is host-owned then it's additionally
2559            // deleted here as a notification that the read end has gone away.
2560            // Running the host's `Drop` implementation is what notifies it of
2561            // this event.
2562            ReadState::Dropped | ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
2563                log::trace!("host_drop_writer delete {transmit_id:?}");
2564                self.concurrent_state_mut()?.delete_transmit(transmit_id)?;
2565            }
2566        }
2567        Ok(())
2568    }
2569
2570    pub(super) fn transmit_origin(
2571        &mut self,
2572        id: TableId<TransmitHandle>,
2573    ) -> Result<TransmitOrigin> {
2574        let state = self.concurrent_state_mut()?;
2575        let state_id = state.get_mut(id)?.state;
2576        Ok(state.get_mut(state_id)?.origin)
2577    }
2578}
2579
2580impl<T> StoreContextMut<'_, T> {
2581    fn new_transmit<P: StreamProducer<T>>(
2582        mut self,
2583        kind: TransmitKind,
2584        producer: P,
2585    ) -> Result<TableId<TransmitHandle>>
2586    where
2587        P::Item: func::Lower,
2588    {
2589        let token = StoreToken::new(self.as_context_mut());
2590        let state = self.0.concurrent_state_mut()?;
2591        let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2592        let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2593        let id = state.get_mut(read)?.state;
2594        let mut dropped = false;
2595        let produce = Box::new({
2596            let producer = producer.clone();
2597            move || {
2598                let producer = producer.clone();
2599                async move {
2600                    let mut state = producer.take()?;
2601                    let (mine, buffer) = &mut *state;
2602
2603                    let (result, cancelled) = if buffer.remaining().is_empty() {
2604                        future::poll_fn(|cx| {
2605                            tls::get(|store| {
2606                                let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2607
2608                                let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2609                                    bail_bug!("expected WriteState::HostReady")
2610                                };
2611
2612                                let mut host_written = 0;
2613                                let mut host_buffer =
2614                                    if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2615                                        Some(mem::take(buffer))
2616                                    } else {
2617                                        None
2618                                    };
2619
2620                                let poll = mine.as_mut().poll_produce(
2621                                    cx,
2622                                    token.as_context_mut(store),
2623                                    Destination {
2624                                        id,
2625                                        buffer,
2626                                        host_buffer: host_buffer.as_mut().map(|b| {
2627                                            HostBuffer {
2628                                                dst: b,
2629                                                marked_written: &mut host_written,
2630                                            }
2631                                        }),
2632                                        _phantom: PhantomData,
2633                                    },
2634                                    cancel,
2635                                );
2636
2637                                let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2638
2639                                let host_offset = if let (
2640                                    Some(host_buffer),
2641                                    ReadState::HostToHost { buffer, limit, .. },
2642                                ) = (host_buffer, &mut transmit.read)
2643                                {
2644                                    *limit = host_written;
2645                                    *buffer = host_buffer;
2646                                    *limit
2647                                } else {
2648                                    0
2649                                };
2650
2651                                {
2652                                    let WriteState::HostReady {
2653                                        guest_offset,
2654                                        cancel,
2655                                        cancel_waker,
2656                                        ..
2657                                    } = &mut transmit.write
2658                                    else {
2659                                        bail_bug!("expected WriteState::HostReady")
2660                                    };
2661
2662                                    if poll.is_pending() {
2663                                        if !buffer.remaining().is_empty()
2664                                            || *guest_offset > 0
2665                                            || host_offset > 0
2666                                        {
2667                                            bail!(
2668                                                "StreamProducer::poll_produce returned Poll::Pending \
2669                                                 after producing at least one item"
2670                                            )
2671                                        }
2672                                        *cancel_waker = Some(cx.waker().clone());
2673                                    } else {
2674                                        *cancel_waker = None;
2675                                        *cancel = false;
2676                                    }
2677                                }
2678
2679                                Ok(poll.map(|v| v.map(|result| (result, cancel))))
2680                            })?
2681                        })
2682                            .await?
2683                    } else {
2684                        (StreamResult::Completed, false)
2685                    };
2686
2687                    let (guest_offset, host_offset, count) = tls::get(|store| {
2688                        let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2689                        let (count, host_offset) = match &transmit.read {
2690                            &ReadState::GuestReady { count, .. } => (count.as_u32(), 0),
2691                            &ReadState::HostToHost { limit, .. } => (1, limit),
2692                            _ => bail_bug!("invalid read state"),
2693                        };
2694                        let guest_offset = match &transmit.write {
2695                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2696                            _ => bail_bug!("invalid write state"),
2697                        };
2698                        Ok((guest_offset, host_offset, count))
2699                    })?;
2700
2701                    match result {
2702                        StreamResult::Completed => {
2703                            if count > 1
2704                                && buffer.remaining().is_empty()
2705                                && guest_offset == 0
2706                                && host_offset == 0
2707                            {
2708                                bail!(
2709                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2710                                     without producing any items"
2711                                );
2712                            }
2713                        }
2714                        StreamResult::Cancelled => {
2715                            if !cancelled {
2716                                bail!(
2717                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2718                                     without being given a `finish` parameter value of true"
2719                                );
2720                            }
2721                        }
2722                        StreamResult::Dropped => {
2723                            dropped = true;
2724                        }
2725                    }
2726
2727                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2728
2729                    drop(state);
2730
2731                    if write_buffer {
2732                        write(token, id, producer.clone(), kind).await?;
2733                    }
2734
2735                    Ok(if dropped {
2736                        if producer.with(|p| p.1.remaining().is_empty())?  {
2737                            StreamResult::Dropped
2738                        } else {
2739                            StreamResult::Completed
2740                        }
2741                    } else {
2742                        result
2743                    })
2744                }
2745                .boxed()
2746            }
2747        });
2748        let try_into = Box::new(move |ty| {
2749            let (mine, buffer) = producer.try_lock().ok()?.take()?;
2750            match P::try_into(mine, ty) {
2751                Ok(value) => Some(value),
2752                Err(mine) => {
2753                    *producer.try_lock().ok()? = Some((mine, buffer));
2754                    None
2755                }
2756            }
2757        });
2758        state.get_mut(id)?.write = WriteState::HostReady {
2759            produce,
2760            try_into,
2761            guest_offset: ItemCount::ZERO,
2762            cancel: false,
2763            cancel_waker: None,
2764        };
2765        Ok(read)
2766    }
2767
2768    fn set_consumer<C: StreamConsumer<T>>(
2769        mut self,
2770        id: TableId<TransmitHandle>,
2771        kind: TransmitKind,
2772        consumer: C,
2773    ) -> Result<()> {
2774        let token = StoreToken::new(self.as_context_mut());
2775        let state = self.0.concurrent_state_mut()?;
2776        let id = state.get_mut(id)?.state;
2777        let transmit = state.get_mut(id)?;
2778        let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2779        let consume_with_buffer = {
2780            let consumer = consumer.clone();
2781            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2782                let mut mine = consumer.take()?;
2783
2784                let host_buffer_remaining_before =
2785                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2786
2787                let (result, cancelled) = future::poll_fn(|cx| {
2788                    tls::get(|store| {
2789                        let cancel = match &store.concurrent_state_mut()?.get_mut(id)?.read {
2790                            &ReadState::HostReady { cancel, .. } => cancel,
2791                            ReadState::Open => false,
2792                            _ => bail_bug!("unexpected read state"),
2793                        };
2794
2795                        let poll = mine.as_mut().poll_consume(
2796                            cx,
2797                            token.as_context_mut(store),
2798                            Source {
2799                                id,
2800                                host_buffer: host_buffer.as_deref_mut(),
2801                            },
2802                            cancel,
2803                        );
2804
2805                        if let ReadState::HostReady {
2806                            cancel_waker,
2807                            cancel,
2808                            ..
2809                        } = &mut store.concurrent_state_mut()?.get_mut(id)?.read
2810                        {
2811                            if poll.is_pending() {
2812                                *cancel_waker = Some(cx.waker().clone());
2813                            } else {
2814                                *cancel_waker = None;
2815                                *cancel = false;
2816                            }
2817                        }
2818
2819                        Ok(poll.map(|v| v.map(|result| (result, cancel))))
2820                    })?
2821                })
2822                .await?;
2823
2824                let (guest_offset, count) = tls::get(|store| {
2825                    let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2826                    Ok((
2827                        match &transmit.read {
2828                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2829                            ReadState::Open => ItemCount::ZERO,
2830                            _ => bail_bug!("invalid read state"),
2831                        },
2832                        match &transmit.write {
2833                            WriteState::GuestReady { count, .. } => count.as_usize(),
2834                            WriteState::HostReady { .. } => match host_buffer_remaining_before {
2835                                Some(n) => n,
2836                                None => bail_bug!("host_buffer_remaining_before should be set"),
2837                            },
2838                            _ => bail_bug!("invalid write state"),
2839                        },
2840                    ))
2841                })?;
2842
2843                match result {
2844                    StreamResult::Completed => {
2845                        if count > 0
2846                            && guest_offset == 0
2847                            && host_buffer_remaining_before
2848                                .zip(host_buffer.map(|v| v.remaining().len()))
2849                                .map(|(before, after)| before == after)
2850                                .unwrap_or(false)
2851                        {
2852                            bail!(
2853                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2854                                 without consuming any items"
2855                            );
2856                        }
2857
2858                        if let TransmitKind::Future = kind {
2859                            tls::get(|store| {
2860                                store.concurrent_state_mut()?.get_mut(id)?.done = true;
2861                                crate::error::Ok(())
2862                            })?;
2863                        }
2864                    }
2865                    StreamResult::Cancelled => {
2866                        if !cancelled {
2867                            bail!(
2868                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2869                                 without being given a `finish` parameter value of true"
2870                            );
2871                        }
2872                    }
2873                    StreamResult::Dropped => {}
2874                }
2875
2876                Ok(result)
2877            }
2878        };
2879        let consume = {
2880            let consume = consume_with_buffer.clone();
2881            Box::new(move || {
2882                let consume = consume.clone();
2883                async move { consume(None).await }.boxed()
2884            })
2885        };
2886
2887        match &transmit.write {
2888            WriteState::Open => {
2889                transmit.read = ReadState::HostReady {
2890                    consume,
2891                    guest_offset: ItemCount::ZERO,
2892                    cancel: false,
2893                    cancel_waker: None,
2894                };
2895            }
2896            &WriteState::GuestReady { .. } => {
2897                let future = consume();
2898                transmit.read = ReadState::HostReady {
2899                    consume,
2900                    guest_offset: ItemCount::ZERO,
2901                    cancel: false,
2902                    cancel_waker: None,
2903                };
2904                self.0.pipe_from_guest(kind, id, future);
2905            }
2906            WriteState::HostReady { .. } => {
2907                let WriteState::HostReady { produce, .. } = mem::replace(
2908                    &mut transmit.write,
2909                    WriteState::HostReady {
2910                        produce: Box::new(|| {
2911                            Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2912                        }),
2913                        try_into: Box::new(|_| None),
2914                        guest_offset: ItemCount::ZERO,
2915                        cancel: false,
2916                        cancel_waker: None,
2917                    },
2918                ) else {
2919                    bail_bug!("expected WriteState::HostReady")
2920                };
2921
2922                transmit.read = ReadState::HostToHost {
2923                    accept: Box::new(move |input| {
2924                        let consume = consume_with_buffer.clone();
2925                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2926                    }),
2927                    buffer: Vec::new(),
2928                    limit: 0,
2929                };
2930
2931                let future = async move {
2932                    loop {
2933                        if tls::get(|store| {
2934                            crate::error::Ok(matches!(
2935                                store.concurrent_state_mut()?.get_mut(id)?.read,
2936                                ReadState::Dropped
2937                            ))
2938                        })? {
2939                            break Ok(());
2940                        }
2941
2942                        match produce().await? {
2943                            StreamResult::Completed | StreamResult::Cancelled => {}
2944                            StreamResult::Dropped => break Ok(()),
2945                        }
2946
2947                        if let TransmitKind::Future = kind {
2948                            break Ok(());
2949                        }
2950                    }
2951                }
2952                .map(move |result| {
2953                    tls::get(|store| store.concurrent_state_mut()?.delete_transmit(id))?;
2954                    result
2955                });
2956
2957                state.push_future(Box::pin(future));
2958            }
2959            WriteState::Dropped => {
2960                let reader = transmit.read_handle;
2961                self.0.host_drop_reader(reader, kind)?;
2962            }
2963        }
2964        Ok(())
2965    }
2966}
2967
2968async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2969    token: StoreToken<D>,
2970    id: TableId<TransmitState>,
2971    pair: Arc<LockedState<(P, B)>>,
2972    kind: TransmitKind,
2973) -> Result<()> {
2974    let (read, guest_offset) = tls::get(|store| {
2975        let transmit = store.concurrent_state_mut()?.get_mut(id)?;
2976
2977        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2978            Some(guest_offset)
2979        } else {
2980            None
2981        };
2982
2983        crate::error::Ok((
2984            mem::replace(&mut transmit.read, ReadState::Open),
2985            guest_offset,
2986        ))
2987    })?;
2988
2989    match read {
2990        ReadState::GuestReady {
2991            ty,
2992            flat_abi,
2993            options,
2994            address,
2995            count,
2996            handle,
2997            instance,
2998            caller_instance,
2999            caller_thread,
3000        } => {
3001            let guest_offset = match guest_offset {
3002                Some(i) => i,
3003                None => bail_bug!("guest_offset should be present if ready"),
3004            };
3005
3006            if let TransmitKind::Future = kind {
3007                tls::get(|store| {
3008                    store.concurrent_state_mut()?.get_mut(id)?.done = true;
3009                    crate::error::Ok(())
3010                })?;
3011            }
3012
3013            let old_remaining = pair.with(|p| p.1.remaining().len())?;
3014            let accept = {
3015                let pair = pair.clone();
3016                move |mut store: StoreContextMut<D>| {
3017                    let mut state = pair.take()?;
3018                    lower::<T, B, D>(
3019                        store.as_context_mut(),
3020                        instance,
3021                        caller_thread,
3022                        options,
3023                        ty,
3024                        address + (T::SIZE32 * guest_offset.as_usize()),
3025                        count.as_usize() - guest_offset.as_usize(),
3026                        &mut state.1,
3027                    )?;
3028                    crate::error::Ok(())
3029                }
3030            };
3031
3032            if guest_offset < count {
3033                if T::MAY_REQUIRE_REALLOC {
3034                    // For payloads which may require a realloc call, use a
3035                    // oneshot::channel and background task.  This is
3036                    // necessary because calling the guest while there are
3037                    // host embedder frames on the stack is unsound.
3038                    let (tx, rx) = oneshot::channel();
3039                    tls::get(move |store| {
3040                        store
3041                            .concurrent_state_mut()?
3042                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(
3043                                Box::new(move |store| {
3044                                    _ = tx.send(accept(token.as_context_mut(store))?);
3045                                    Ok(())
3046                                }),
3047                            )));
3048                        crate::error::Ok(())
3049                    })?;
3050                    match rx.await {
3051                        Ok(r) => r,
3052                        Err(oneshot::Canceled) => bail_bug!("work cancelled"),
3053                    }
3054                } else {
3055                    // Optimize flat payloads (i.e. those which do not
3056                    // require calling the guest's realloc function) by
3057                    // lowering directly instead of using a oneshot::channel
3058                    // and background task.
3059                    tls::get(|store| accept(token.as_context_mut(store)))?
3060                }
3061            }
3062
3063            tls::get(|store| {
3064                let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3065
3066                let transmit = store.concurrent_state_mut()?.get_mut(id)?;
3067
3068                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3069                    bail_bug!("expected WriteState::HostReady")
3070                };
3071
3072                guest_offset.inc(count)?;
3073
3074                transmit.read = ReadState::GuestReady {
3075                    ty,
3076                    flat_abi,
3077                    options,
3078                    address,
3079                    count: ItemCount::new_usize(count)?,
3080                    handle,
3081                    instance,
3082                    caller_instance,
3083                    caller_thread,
3084                };
3085
3086                crate::error::Ok(())
3087            })?;
3088
3089            Ok(())
3090        }
3091
3092        ReadState::HostToHost {
3093            accept,
3094            mut buffer,
3095            limit,
3096        } => {
3097            let mut state = StreamResult::Completed;
3098            let mut position = 0;
3099
3100            while !matches!(state, StreamResult::Dropped) && position < limit {
3101                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3102                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3103                (buffer, position, _) = slice_buffer.into_parts();
3104            }
3105
3106            {
3107                let mut pair = pair.take()?;
3108                let (_, buffer) = &mut *pair;
3109
3110                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3111                    state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3112                }
3113            }
3114
3115            tls::get(|store| {
3116                store.concurrent_state_mut()?.get_mut(id)?.read = match state {
3117                    StreamResult::Dropped => ReadState::Dropped,
3118                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3119                        accept,
3120                        buffer,
3121                        limit: 0,
3122                    },
3123                };
3124
3125                crate::error::Ok(())
3126            })?;
3127            Ok(())
3128        }
3129
3130        _ => bail_bug!("unexpected read state"),
3131    }
3132}
3133
3134impl Instance {
3135    /// Handle a host- or guest-initiated write by delivering the item(s) to the
3136    /// `StreamConsumer` for the specified stream or future.
3137    fn consume(
3138        self,
3139        store: &mut dyn VMStore,
3140        kind: TransmitKind,
3141        transmit_id: TableId<TransmitState>,
3142        consume: PollStream,
3143        guest_offset: ItemCount,
3144        cancel: bool,
3145    ) -> Result<ReturnCode> {
3146        let mut future = consume();
3147        store.concurrent_state_mut()?.get_mut(transmit_id)?.read = ReadState::HostReady {
3148            consume,
3149            guest_offset,
3150            cancel,
3151            cancel_waker: None,
3152        };
3153        let poll = tls::set(store, || {
3154            future
3155                .as_mut()
3156                .poll(&mut Context::from_waker(&Waker::noop()))
3157        });
3158
3159        Ok(match poll {
3160            Poll::Ready(state) => {
3161                let transmit = store.concurrent_state_mut()?.get_mut(transmit_id)?;
3162                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3163                    bail_bug!("expected ReadState::HostReady")
3164                };
3165                let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3166                transmit.write = WriteState::Open;
3167                code
3168            }
3169            Poll::Pending => {
3170                store.pipe_from_guest(kind, transmit_id, future);
3171                ReturnCode::Blocked
3172            }
3173        })
3174    }
3175
3176    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
3177    /// for the specified stream or future for items.
3178    fn produce(
3179        self,
3180        store: &mut dyn VMStore,
3181        kind: TransmitKind,
3182        transmit_id: TableId<TransmitState>,
3183        produce: PollStream,
3184        try_into: TryInto,
3185        guest_offset: ItemCount,
3186        cancel: bool,
3187    ) -> Result<ReturnCode> {
3188        let mut future = produce();
3189        store.concurrent_state_mut()?.get_mut(transmit_id)?.write = WriteState::HostReady {
3190            produce,
3191            try_into,
3192            guest_offset,
3193            cancel,
3194            cancel_waker: None,
3195        };
3196        let poll = tls::set(store, || {
3197            future
3198                .as_mut()
3199                .poll(&mut Context::from_waker(&Waker::noop()))
3200        });
3201
3202        Ok(match poll {
3203            Poll::Ready(state) => {
3204                let transmit = store.concurrent_state_mut()?.get_mut(transmit_id)?;
3205                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3206                    bail_bug!("expected WriteState::HostReady")
3207                };
3208                let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3209                transmit.read = ReadState::Open;
3210                code
3211            }
3212            Poll::Pending => {
3213                store.pipe_to_guest(kind, transmit_id, future);
3214                ReturnCode::Blocked
3215            }
3216        })
3217    }
3218
3219    /// Drop the writable end of the specified stream or future from the guest.
3220    pub(super) fn guest_drop_writable(
3221        self,
3222        store: &mut StoreOpaque,
3223        ty: TransmitIndex,
3224        writer: u32,
3225    ) -> Result<()> {
3226        let table = self.id().get_mut(store).table_for_transmit(ty);
3227        let transmit_rep = match ty {
3228            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3229            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3230        };
3231
3232        let id = TableId::<TransmitHandle>::new(transmit_rep);
3233        log::trace!("guest_drop_writable: drop writer {id:?}");
3234        match ty {
3235            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3236            TransmitIndex::Future(_) => store.host_drop_writer(
3237                id,
3238                Some(|| {
3239                    Err(format_err!(
3240                        "cannot drop future write end without first writing a value"
3241                    ))
3242                }),
3243            ),
3244        }
3245    }
3246
3247    /// Copy `count` items from `read_address` to `write_address` for the
3248    /// specified stream or future.
3249    fn copy<T: 'static>(
3250        store: StoreContextMut<T>,
3251        flat_abi: Option<FlatAbi>,
3252        write_instance: Instance,
3253        write_caller_instance: RuntimeComponentInstanceIndex,
3254        write_ty: TransmitIndex,
3255        write_options: OptionsIndex,
3256        write_address: usize,
3257        read_instance: Instance,
3258        read_caller_instance: RuntimeComponentInstanceIndex,
3259        read_caller_thread: QualifiedThreadId,
3260        read_ty: TransmitIndex,
3261        read_options: OptionsIndex,
3262        read_address: usize,
3263        count: ItemCount,
3264        rep: u32,
3265    ) -> Result<()> {
3266        let (write_component, store) = write_instance.component_and_store_mut(store.0);
3267        let (read_component, mut store) = read_instance.component_and_store_mut(store);
3268        let write_types = write_component.types();
3269        let read_types = read_component.types();
3270        let count = count.as_usize();
3271
3272        // Validate `write_ty` w.r.t. `write_address` to ensure it's properly
3273        // aligned and in-bounds.
3274        let write_payload_ty = write_ty.payload(write_types);
3275        let write_abi = match write_payload_ty {
3276            Some(ty) => write_types.canonical_abi(ty),
3277            None => &CanonicalAbiInfo::ZERO,
3278        };
3279        let write_length_in_bytes = match flat_abi {
3280            Some(abi) => usize::try_from(abi.size)? * count,
3281            None => usize::try_from(write_abi.size32)? * count,
3282        };
3283        if write_length_in_bytes > 0 {
3284            if write_address % usize::try_from(write_abi.align32)? != 0 {
3285                bail!("write pointer not aligned");
3286            }
3287            write_instance
3288                .options_memory(store, write_options)
3289                .get(write_address..)
3290                .and_then(|b| b.get(..write_length_in_bytes))
3291                .ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
3292        }
3293
3294        let read_payload_ty = read_ty.payload(read_types);
3295        let read_abi = match read_payload_ty {
3296            Some(ty) => read_types.canonical_abi(ty),
3297            None => &CanonicalAbiInfo::ZERO,
3298        };
3299        let read_length_in_bytes = match flat_abi {
3300            Some(abi) => usize::try_from(abi.size)? * count,
3301            None => usize::try_from(read_abi.size32)? * count,
3302        };
3303        if read_length_in_bytes > 0 {
3304            if read_address % usize::try_from(read_abi.align32)? != 0 {
3305                bail!("read pointer not aligned");
3306            }
3307            read_instance
3308                .options_memory(store, read_options)
3309                .get(read_address..)
3310                .and_then(|b| b.get(..read_length_in_bytes))
3311                .ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
3312        }
3313
3314        if write_caller_instance == read_caller_instance
3315            && !allow_intra_component_read_write(write_payload_ty)
3316        {
3317            bail!(
3318                "cannot read from and write to intra-component future/stream with non-numeric payload"
3319            )
3320        }
3321
3322        match (write_ty, read_ty) {
3323            (TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
3324                if count != 1 {
3325                    bail_bug!("futures can only send 1 item");
3326                }
3327
3328                let val = write_payload_ty
3329                    .map(|ty| {
3330                        let lift = &mut LiftContext::new(store, write_options, write_instance);
3331                        let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3332                        Val::load(lift, *ty, bytes)
3333                    })
3334                    .transpose()?;
3335
3336                if let Some(val) = val {
3337                    // Serializing the value may require calling the guest's realloc function, so we
3338                    // set the guest's thread context in case realloc requires it, and restore the original
3339                    // thread context after the copy is complete.
3340                    let old_thread = store.set_thread(read_caller_thread)?;
3341                    let lower =
3342                        &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3343                    let ptr = func::validate_inbounds_dynamic(
3344                        read_abi,
3345                        lower.as_slice_mut(),
3346                        &ValRaw::u32(read_address.try_into()?),
3347                    )?;
3348                    let ty = match read_payload_ty {
3349                        Some(ty) => ty,
3350                        None => bail_bug!("expected read payload type to be present"),
3351                    };
3352                    val.store(lower, *ty, ptr)?;
3353                    store.set_thread(old_thread)?;
3354                }
3355            }
3356            (TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
3357                if write_length_in_bytes == 0 {
3358                    return Ok(());
3359                }
3360                let write_payload_ty = match write_payload_ty {
3361                    Some(ty) => ty,
3362                    None => bail_bug!("expected write payload type to be present"),
3363                };
3364                let read_payload_ty = match read_payload_ty {
3365                    Some(ty) => ty,
3366                    None => bail_bug!("expected read payload type to be present"),
3367                };
3368                if flat_abi.is_some() {
3369                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
3370                    let store_opaque = store.store_opaque_mut();
3371
3372                    assert_eq!(read_length_in_bytes, write_length_in_bytes);
3373
3374                    if read_instance
3375                        .options_memory(store_opaque, read_options)
3376                        .as_ptr()
3377                        == write_instance
3378                            .options_memory(store_opaque, write_options)
3379                            .as_ptr()
3380                    {
3381                        let memory = read_instance.options_memory_mut(store_opaque, read_options);
3382                        memory.copy_within(
3383                            write_address..write_address + write_length_in_bytes,
3384                            read_address,
3385                        );
3386                    } else {
3387                        let src = write_instance.options_memory(store_opaque, write_options)
3388                            [write_address..][..write_length_in_bytes]
3389                            .as_ptr();
3390                        let dst = read_instance.options_memory_mut(store_opaque, read_options)
3391                            [read_address..][..read_length_in_bytes]
3392                            .as_mut_ptr();
3393
3394                        // SAFETY: Both `src` and `dst` have been validated
3395                        // above to be valid pointers as they're derived from
3396                        // slices that have the desired length with the desired
3397                        // read/write permission. The `unsafe` bit here is that
3398                        // the memories are disjoint (different base pointers)
3399                        // and there's no easy way to borrow both
3400                        // simultaneously from the store. Different memories
3401                        // are guaranteed to be disjoint, however, so the
3402                        // `unsafe` here should be ok.
3403                        unsafe {
3404                            src.copy_to_nonoverlapping(dst, write_length_in_bytes);
3405                        }
3406                    }
3407                } else {
3408                    let store_opaque = store.store_opaque_mut();
3409                    let lift = &mut LiftContext::new(store_opaque, write_options, write_instance);
3410                    let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3411                    lift.consume_fuel_array(count, size_of::<Val>())?;
3412
3413                    let values = (0..count)
3414                        .map(|index| {
3415                            let size = usize::try_from(write_abi.size32)?;
3416                            Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
3417                        })
3418                        .collect::<Result<Vec<_>>>()?;
3419
3420                    let id = TableId::<TransmitHandle>::new(rep);
3421                    log::trace!("copy values {values:?} for {id:?}");
3422
3423                    // Serializing the value may require calling the guest's realloc function, so we
3424                    // set the guest's thread context in case realloc requires it, and restore the original
3425                    // thread context after the copy is complete.
3426                    let old_thread = store.set_thread(read_caller_thread)?;
3427                    let lower =
3428                        &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3429                    let mut ptr = read_address;
3430                    for value in values {
3431                        value.store(lower, *read_payload_ty, ptr)?;
3432                        ptr += usize::try_from(read_abi.size32)?;
3433                    }
3434                    store.set_thread(old_thread)?;
3435                }
3436            }
3437            _ => bail_bug!("mismatched transmit types in copy"),
3438        }
3439
3440        Ok(())
3441    }
3442
3443    fn check_bounds(
3444        self,
3445        store: &StoreOpaque,
3446        options: OptionsIndex,
3447        ty: TransmitIndex,
3448        address: usize,
3449        count: usize,
3450    ) -> Result<()> {
3451        let types = self.id().get(store).component().types();
3452        let size = usize::try_from(
3453            match ty {
3454                TransmitIndex::Future(ty) => types[types[ty].ty]
3455                    .payload
3456                    .map(|ty| types.canonical_abi(&ty).size32),
3457                TransmitIndex::Stream(ty) => types[types[ty].ty]
3458                    .payload
3459                    .map(|ty| types.canonical_abi(&ty).size32),
3460            }
3461            .unwrap_or(0),
3462        )?;
3463
3464        if count > 0 && size > 0 {
3465            self.options_memory(store, options)
3466                .get(address..)
3467                .and_then(|b| b.get(..size.checked_mul(count)?))
3468                .map(drop)
3469                .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3470        } else {
3471            Ok(())
3472        }
3473    }
3474
3475    /// Write to the specified stream or future from the guest.
3476    pub(super) fn guest_write<T: 'static>(
3477        self,
3478        mut store: StoreContextMut<T>,
3479        caller: RuntimeComponentInstanceIndex,
3480        ty: TransmitIndex,
3481        options: OptionsIndex,
3482        flat_abi: Option<FlatAbi>,
3483        handle: u32,
3484        address: u32,
3485        count: u32,
3486    ) -> Result<ReturnCode> {
3487        let count = ItemCount::new(count)?;
3488
3489        if !self.options(store.0, options).async_ {
3490            // The caller may only sync call `{stream,future}.write` from an
3491            // async task (i.e. a task created via a call to an async export).
3492            // Otherwise, we'll trap.
3493            store.0.check_blocking()?;
3494        }
3495
3496        let address = usize::try_from(address)?;
3497        self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3498        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3499        let TransmitLocalState::Write { done } = *state else {
3500            bail!(Trap::ConcurrentFutureStreamOp);
3501        };
3502
3503        if done {
3504            bail!("cannot write after being notified that the readable end dropped");
3505        }
3506
3507        *state = TransmitLocalState::Busy;
3508        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3509        let concurrent_state = store.0.concurrent_state_mut()?;
3510        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3511        let transmit = concurrent_state.get_mut(transmit_id)?;
3512        log::trace!(
3513            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3514            transmit.read
3515        );
3516
3517        if transmit.done {
3518            bail!("cannot write to future after previous write succeeded or readable end dropped");
3519        }
3520
3521        let new_state = if let ReadState::Dropped = &transmit.read {
3522            ReadState::Dropped
3523        } else {
3524            ReadState::Open
3525        };
3526
3527        let set_guest_ready = |me: &mut ConcurrentState| {
3528            let transmit = me.get_mut(transmit_id)?;
3529            if !matches!(&transmit.write, WriteState::Open) {
3530                bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3531            }
3532            transmit.write = WriteState::GuestReady {
3533                instance: self,
3534                caller,
3535                ty,
3536                flat_abi,
3537                options,
3538                address,
3539                count,
3540                handle,
3541            };
3542            Ok::<_, crate::Error>(())
3543        };
3544
3545        let mut result = match mem::replace(&mut transmit.read, new_state) {
3546            ReadState::GuestReady {
3547                ty: read_ty,
3548                flat_abi: read_flat_abi,
3549                options: read_options,
3550                address: read_address,
3551                count: read_count,
3552                handle: read_handle,
3553                instance: read_instance,
3554                caller_instance: read_caller_instance,
3555                caller_thread: read_caller_thread,
3556            } => {
3557                if flat_abi != read_flat_abi {
3558                    bail_bug!("expected flat ABI calculations to be the same");
3559                }
3560
3561                if let TransmitIndex::Future(_) = ty {
3562                    transmit.done = true;
3563                }
3564
3565                // Note that zero-length reads and writes are handling specially
3566                // by the spec to allow each end to signal readiness to the
3567                // other.  Quoting the spec:
3568                //
3569                // ```
3570                // The meaning of a read or write when the length is 0 is that
3571                // the caller is querying the "readiness" of the other
3572                // side. When a 0-length read/write rendezvous with a
3573                // non-0-length read/write, only the 0-length read/write
3574                // completes; the non-0-length read/write is kept pending (and
3575                // ready for a subsequent rendezvous).
3576                //
3577                // In the corner case where a 0-length read and write
3578                // rendezvous, only the writer is notified of readiness. To
3579                // avoid livelock, the Canonical ABI requires that a writer must
3580                // (eventually) follow a completed 0-length write with a
3581                // non-0-length write that is allowed to block (allowing the
3582                // reader end to run and rendezvous with its own non-0-length
3583                // read).
3584                // ```
3585
3586                let write_complete = count == 0 || read_count > 0;
3587                let read_complete = count > 0;
3588                let read_buffer_remaining = count < read_count;
3589
3590                let read_handle_rep = transmit.read_handle.rep();
3591
3592                let count = count.min(read_count);
3593
3594                Instance::copy(
3595                    store.as_context_mut(),
3596                    flat_abi,
3597                    self,
3598                    caller,
3599                    ty,
3600                    options,
3601                    address,
3602                    read_instance,
3603                    read_caller_instance,
3604                    read_caller_thread,
3605                    read_ty,
3606                    read_options,
3607                    read_address,
3608                    count,
3609                    rep,
3610                )?;
3611
3612                let instance = read_instance.id().get(store.0);
3613                let types = instance.component().types();
3614                let item_size = match read_ty.payload(types) {
3615                    Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3616                    None => 0,
3617                };
3618                let concurrent_state = store.0.concurrent_state_mut()?;
3619                if read_complete {
3620                    let total = if let Some(Event::StreamRead {
3621                        code: ReturnCode::Completed(old_total),
3622                        ..
3623                    }) = concurrent_state.take_event(read_handle_rep)?
3624                    {
3625                        count.add(old_total)?
3626                    } else {
3627                        count
3628                    };
3629
3630                    let code = ReturnCode::completed(ty.kind(), total);
3631
3632                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3633                }
3634
3635                // If the reader still has buffer remaining, or if this was a
3636                // zero-length rendezvous, then restore the state of the reader
3637                // back to what it was when we found it. Note that for the
3638                // zero-length rendezvous case this specifically won't execute
3639                // the `read_complete` logic above, which is intentional, as the
3640                // reader remains blocked.
3641                if read_buffer_remaining || (count == 0 && read_count == 0) {
3642                    let transmit = concurrent_state.get_mut(transmit_id)?;
3643                    transmit.read = ReadState::GuestReady {
3644                        ty: read_ty,
3645                        flat_abi: read_flat_abi,
3646                        options: read_options,
3647                        address: read_address + (count.as_usize() * item_size),
3648                        count: read_count.sub(count)?,
3649                        handle: read_handle,
3650                        instance: read_instance,
3651                        caller_instance: read_caller_instance,
3652                        caller_thread: read_caller_thread,
3653                    };
3654                }
3655
3656                if write_complete {
3657                    ReturnCode::completed(ty.kind(), count)
3658                } else {
3659                    set_guest_ready(concurrent_state)?;
3660                    ReturnCode::Blocked
3661                }
3662            }
3663
3664            ReadState::HostReady {
3665                consume,
3666                guest_offset,
3667                cancel,
3668                cancel_waker,
3669            } => {
3670                if cancel_waker.is_some() {
3671                    bail_bug!("expected cancel_waker to be none");
3672                }
3673                if cancel {
3674                    bail_bug!("expected cancel to be false");
3675                }
3676                if guest_offset != 0 {
3677                    bail_bug!("expected guest_offset to be 0");
3678                }
3679
3680                if let TransmitIndex::Future(_) = ty {
3681                    transmit.done = true;
3682                }
3683
3684                set_guest_ready(concurrent_state)?;
3685                self.consume(
3686                    store.0,
3687                    ty.kind(),
3688                    transmit_id,
3689                    consume,
3690                    ItemCount::ZERO,
3691                    false,
3692                )?
3693            }
3694
3695            ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3696
3697            ReadState::Open => {
3698                set_guest_ready(concurrent_state)?;
3699                ReturnCode::Blocked
3700            }
3701
3702            ReadState::Dropped => {
3703                if let TransmitIndex::Future(_) = ty {
3704                    transmit.done = true;
3705                }
3706
3707                ReturnCode::Dropped(ItemCount::ZERO)
3708            }
3709        };
3710
3711        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3712            result = self.wait_for_write(store.0, transmit_handle)?;
3713        }
3714
3715        if result != ReturnCode::Blocked {
3716            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3717                TransmitLocalState::Write {
3718                    done: matches!(result, ReturnCode::Dropped(_)),
3719                };
3720        }
3721
3722        log::trace!(
3723            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3724        );
3725
3726        Ok(result)
3727    }
3728
3729    /// Read from the specified stream or future from the guest.
3730    pub(super) fn guest_read<T: 'static>(
3731        self,
3732        mut store: StoreContextMut<T>,
3733        caller_instance: RuntimeComponentInstanceIndex,
3734        ty: TransmitIndex,
3735        options: OptionsIndex,
3736        flat_abi: Option<FlatAbi>,
3737        handle: u32,
3738        address: u32,
3739        count: u32,
3740    ) -> Result<ReturnCode> {
3741        let count = ItemCount::new(count)?;
3742
3743        if !self.options(store.0, options).async_ {
3744            // The caller may only sync call `{stream,future}.read` from an
3745            // async task (i.e. a task created via a call to an async export).
3746            // Otherwise, we'll trap.
3747            store.0.check_blocking()?;
3748        }
3749
3750        let address = usize::try_from(address)?;
3751        self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3752        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3753        let TransmitLocalState::Read { done } = *state else {
3754            bail!(Trap::ConcurrentFutureStreamOp);
3755        };
3756
3757        if done {
3758            bail!("cannot read after being notified that the writable end dropped");
3759        }
3760
3761        *state = TransmitLocalState::Busy;
3762        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3763        let caller_thread = store.0.current_guest_thread()?;
3764        let concurrent_state = store.0.concurrent_state_mut()?;
3765        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3766        let transmit = concurrent_state.get_mut(transmit_id)?;
3767        log::trace!(
3768            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3769            transmit.write
3770        );
3771
3772        if transmit.done {
3773            bail!("cannot read from future after previous read succeeded");
3774        }
3775
3776        let new_state = if let WriteState::Dropped = &transmit.write {
3777            WriteState::Dropped
3778        } else {
3779            WriteState::Open
3780        };
3781
3782        let set_guest_ready = |me: &mut ConcurrentState| {
3783            let transmit = me.get_mut(transmit_id)?;
3784            if !matches!(&transmit.read, ReadState::Open) {
3785                bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3786            }
3787            transmit.read = ReadState::GuestReady {
3788                ty,
3789                flat_abi,
3790                options,
3791                address,
3792                count,
3793                handle,
3794                instance: self,
3795                caller_instance,
3796                caller_thread,
3797            };
3798            Ok::<_, crate::Error>(())
3799        };
3800
3801        let mut result = match mem::replace(&mut transmit.write, new_state) {
3802            WriteState::GuestReady {
3803                instance: write_instance,
3804                ty: write_ty,
3805                flat_abi: write_flat_abi,
3806                options: write_options,
3807                address: write_address,
3808                count: write_count,
3809                handle: write_handle,
3810                caller: write_caller,
3811            } => {
3812                if flat_abi != write_flat_abi {
3813                    bail_bug!("expected flat ABI calculations to be the same");
3814                }
3815
3816                if let TransmitIndex::Future(_) = ty {
3817                    transmit.done = true;
3818                }
3819
3820                let write_handle_rep = transmit.write_handle.rep();
3821
3822                // See the comment in `guest_write` for the
3823                // `ReadState::GuestReady` case concerning zero-length reads and
3824                // writes.
3825
3826                let write_complete = write_count == 0 || count > 0;
3827                let read_complete = write_count > 0;
3828                let write_buffer_remaining = count < write_count;
3829
3830                let count = count.min(write_count);
3831
3832                Instance::copy(
3833                    store.as_context_mut(),
3834                    flat_abi,
3835                    write_instance,
3836                    write_caller,
3837                    write_ty,
3838                    write_options,
3839                    write_address,
3840                    self,
3841                    caller_instance,
3842                    caller_thread,
3843                    ty,
3844                    options,
3845                    address,
3846                    count,
3847                    rep,
3848                )?;
3849
3850                let instance = write_instance.id().get(store.0);
3851                let types = instance.component().types();
3852                let item_size = match write_ty.payload(types) {
3853                    Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3854                    None => 0,
3855                };
3856                let concurrent_state = store.0.concurrent_state_mut()?;
3857
3858                if write_complete {
3859                    let total = if let Some(Event::StreamWrite {
3860                        code: ReturnCode::Completed(old_total),
3861                        ..
3862                    }) = concurrent_state.take_event(write_handle_rep)?
3863                    {
3864                        count.add(old_total)?
3865                    } else {
3866                        count
3867                    };
3868
3869                    let code = ReturnCode::completed(ty.kind(), total);
3870
3871                    concurrent_state.send_write_result(
3872                        write_ty,
3873                        transmit_id,
3874                        write_handle,
3875                        code,
3876                    )?;
3877                }
3878
3879                if write_buffer_remaining {
3880                    let transmit = concurrent_state.get_mut(transmit_id)?;
3881                    transmit.write = WriteState::GuestReady {
3882                        instance: write_instance,
3883                        caller: write_caller,
3884                        ty: write_ty,
3885                        flat_abi: write_flat_abi,
3886                        options: write_options,
3887                        address: write_address + (count.as_usize() * item_size),
3888                        count: write_count.sub(count)?,
3889                        handle: write_handle,
3890                    };
3891                }
3892
3893                if read_complete {
3894                    ReturnCode::completed(ty.kind(), count)
3895                } else {
3896                    set_guest_ready(concurrent_state)?;
3897                    ReturnCode::Blocked
3898                }
3899            }
3900
3901            WriteState::HostReady {
3902                produce,
3903                try_into,
3904                guest_offset,
3905                cancel,
3906                cancel_waker,
3907            } => {
3908                if cancel_waker.is_some() {
3909                    bail_bug!("expected cancel_waker to be none");
3910                }
3911                if cancel {
3912                    bail_bug!("expected cancel to be false");
3913                }
3914                if guest_offset != 0 {
3915                    bail_bug!("expected guest_offset to be 0");
3916                }
3917
3918                set_guest_ready(concurrent_state)?;
3919
3920                let code = self.produce(
3921                    store.0,
3922                    ty.kind(),
3923                    transmit_id,
3924                    produce,
3925                    try_into,
3926                    ItemCount::ZERO,
3927                    false,
3928                )?;
3929
3930                if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3931                    store.0.concurrent_state_mut()?.get_mut(transmit_id)?.done = true;
3932                }
3933
3934                code
3935            }
3936
3937            WriteState::Open => {
3938                set_guest_ready(concurrent_state)?;
3939                ReturnCode::Blocked
3940            }
3941
3942            WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO),
3943        };
3944
3945        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3946            result = self.wait_for_read(store.0, transmit_handle)?;
3947        }
3948
3949        if result != ReturnCode::Blocked {
3950            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3951                TransmitLocalState::Read {
3952                    done: matches!(
3953                        (result, ty),
3954                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3955                    ),
3956                };
3957        }
3958
3959        log::trace!(
3960            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3961        );
3962
3963        Ok(result)
3964    }
3965
3966    fn wait_for_write(
3967        self,
3968        store: &mut StoreOpaque,
3969        handle: TableId<TransmitHandle>,
3970    ) -> Result<ReturnCode> {
3971        let waitable = Waitable::Transmit(handle);
3972        store.wait_for_event(waitable)?;
3973        let event = waitable.take_event(store.concurrent_state_mut()?)?;
3974        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3975            event
3976        {
3977            waitable.on_delivery(store, self, event)?;
3978            Ok(code)
3979        } else {
3980            bail_bug!("expected either a stream or future write event")
3981        }
3982    }
3983
3984    /// Cancel a pending stream or future write.
3985    fn cancel_write(
3986        self,
3987        store: &mut StoreOpaque,
3988        transmit_id: TableId<TransmitState>,
3989        async_: bool,
3990    ) -> Result<ReturnCode> {
3991        let state = store.concurrent_state_mut()?;
3992        let transmit = state.get_mut(transmit_id)?;
3993        log::trace!(
3994            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3995            transmit.read,
3996            transmit.write
3997        );
3998        let waitable = Waitable::Transmit(transmit.write_handle);
3999
4000        let code = if let Some(event) = waitable.take_event(state)? {
4001            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
4002                bail_bug!("expected either a stream or future write event")
4003            };
4004            waitable.on_delivery(store, self, event)?;
4005            match (code, event) {
4006                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
4007                    ReturnCode::Cancelled(count)
4008                }
4009                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4010                _ => bail_bug!("unexpected code/event combo"),
4011            }
4012        } else if let ReadState::HostReady {
4013            cancel,
4014            cancel_waker,
4015            ..
4016        } = &mut state.get_mut(transmit_id)?.read
4017        {
4018            *cancel = true;
4019            if let Some(waker) = cancel_waker.take() {
4020                waker.wake();
4021            }
4022
4023            if async_ {
4024                ReturnCode::Blocked
4025            } else {
4026                let handle = store
4027                    .concurrent_state_mut()?
4028                    .get_mut(transmit_id)?
4029                    .write_handle;
4030                self.wait_for_write(store, handle)?
4031            }
4032        } else {
4033            ReturnCode::Cancelled(ItemCount::ZERO)
4034        };
4035
4036        if !matches!(code, ReturnCode::Blocked) {
4037            let transmit = store.concurrent_state_mut()?.get_mut(transmit_id)?;
4038
4039            match &transmit.write {
4040                WriteState::GuestReady { .. } => {
4041                    transmit.write = WriteState::Open;
4042                }
4043                WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
4044                WriteState::Open | WriteState::Dropped => {}
4045            }
4046        }
4047
4048        log::trace!("cancelled write {transmit_id:?}: {code:?}");
4049
4050        Ok(code)
4051    }
4052
4053    fn wait_for_read(
4054        self,
4055        store: &mut StoreOpaque,
4056        handle: TableId<TransmitHandle>,
4057    ) -> Result<ReturnCode> {
4058        let waitable = Waitable::Transmit(handle);
4059        store.wait_for_event(waitable)?;
4060        let event = waitable.take_event(store.concurrent_state_mut()?)?;
4061        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
4062            event
4063        {
4064            waitable.on_delivery(store, self, event)?;
4065            Ok(code)
4066        } else {
4067            bail_bug!("expected either a stream or future read event")
4068        }
4069    }
4070
4071    /// Cancel a pending stream or future read.
4072    fn cancel_read(
4073        self,
4074        store: &mut StoreOpaque,
4075        transmit_id: TableId<TransmitState>,
4076        async_: bool,
4077    ) -> Result<ReturnCode> {
4078        let state = store.concurrent_state_mut()?;
4079        let transmit = state.get_mut(transmit_id)?;
4080        log::trace!(
4081            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4082            transmit.read,
4083            transmit.write
4084        );
4085
4086        let waitable = Waitable::Transmit(transmit.read_handle);
4087        let code = if let Some(event) = waitable.take_event(state)? {
4088            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4089                bail_bug!("expected either a stream or future read event")
4090            };
4091            waitable.on_delivery(store, self, event)?;
4092            match (code, event) {
4093                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4094                    ReturnCode::Cancelled(count)
4095                }
4096                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4097                _ => bail_bug!("unexpected code/event combo"),
4098            }
4099        } else if let WriteState::HostReady {
4100            cancel,
4101            cancel_waker,
4102            ..
4103        } = &mut state.get_mut(transmit_id)?.write
4104        {
4105            *cancel = true;
4106            if let Some(waker) = cancel_waker.take() {
4107                waker.wake();
4108            }
4109
4110            if async_ {
4111                ReturnCode::Blocked
4112            } else {
4113                let handle = store
4114                    .concurrent_state_mut()?
4115                    .get_mut(transmit_id)?
4116                    .read_handle;
4117                self.wait_for_read(store, handle)?
4118            }
4119        } else {
4120            ReturnCode::Cancelled(ItemCount::ZERO)
4121        };
4122
4123        if !matches!(code, ReturnCode::Blocked) {
4124            let transmit = store.concurrent_state_mut()?.get_mut(transmit_id)?;
4125
4126            match &transmit.read {
4127                ReadState::GuestReady { .. } => {
4128                    transmit.read = ReadState::Open;
4129                }
4130                ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4131                    bail_bug!("support host read cancellation")
4132                }
4133                ReadState::Open | ReadState::Dropped => {}
4134            }
4135        }
4136
4137        log::trace!("cancelled read {transmit_id:?}: {code:?}");
4138
4139        Ok(code)
4140    }
4141
4142    /// Cancel a pending write for the specified stream or future from the guest.
4143    fn guest_cancel_write(
4144        self,
4145        store: &mut StoreOpaque,
4146        ty: TransmitIndex,
4147        async_: bool,
4148        writer: u32,
4149    ) -> Result<ReturnCode> {
4150        if !async_ {
4151            // The caller may only sync call `{stream,future}.cancel-write` from
4152            // an async task (i.e. a task created via a call to an async
4153            // export).  Otherwise, we'll trap.
4154            store.check_blocking()?;
4155        }
4156
4157        let (rep, state) =
4158            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4159        let id = TableId::<TransmitHandle>::new(rep);
4160        log::trace!("guest cancel write {id:?} (handle {writer})");
4161        match state {
4162            TransmitLocalState::Write { .. } => {
4163                bail!("stream or future write cancelled when no write is pending")
4164            }
4165            TransmitLocalState::Read { .. } => {
4166                bail!("passed read end to `{{stream|future}}.cancel-write`")
4167            }
4168            TransmitLocalState::Busy => {}
4169        }
4170        let transmit_id = store.concurrent_state_mut()?.get_mut(id)?.state;
4171        let code = self.cancel_write(store, transmit_id, async_)?;
4172        if !matches!(code, ReturnCode::Blocked) {
4173            let state =
4174                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4175                    .1;
4176            if let TransmitLocalState::Busy = state {
4177                *state = TransmitLocalState::Write { done: false };
4178            }
4179        }
4180        Ok(code)
4181    }
4182
4183    /// Cancel a pending read for the specified stream or future from the guest.
4184    fn guest_cancel_read(
4185        self,
4186        store: &mut StoreOpaque,
4187        ty: TransmitIndex,
4188        async_: bool,
4189        reader: u32,
4190    ) -> Result<ReturnCode> {
4191        if !async_ {
4192            // The caller may only sync call `{stream,future}.cancel-read` from
4193            // an async task (i.e. a task created via a call to an async
4194            // export).  Otherwise, we'll trap.
4195            store.check_blocking()?;
4196        }
4197
4198        let (rep, state) =
4199            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4200        let id = TableId::<TransmitHandle>::new(rep);
4201        log::trace!("guest cancel read {id:?} (handle {reader})");
4202        match state {
4203            TransmitLocalState::Read { .. } => {
4204                bail!("stream or future read cancelled when no read is pending")
4205            }
4206            TransmitLocalState::Write { .. } => {
4207                bail!("passed write end to `{{stream|future}}.cancel-read`")
4208            }
4209            TransmitLocalState::Busy => {}
4210        }
4211        let transmit_id = store.concurrent_state_mut()?.get_mut(id)?.state;
4212        let code = self.cancel_read(store, transmit_id, async_)?;
4213        if !matches!(code, ReturnCode::Blocked) {
4214            let state =
4215                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4216                    .1;
4217            if let TransmitLocalState::Busy = state {
4218                *state = TransmitLocalState::Read { done: false };
4219            }
4220        }
4221        Ok(code)
4222    }
4223
4224    /// Drop the readable end of the specified stream or future from the guest.
4225    fn guest_drop_readable(
4226        self,
4227        store: &mut StoreOpaque,
4228        ty: TransmitIndex,
4229        reader: u32,
4230    ) -> Result<()> {
4231        let table = self.id().get_mut(store).table_for_transmit(ty);
4232        let (rep, _is_done) = match ty {
4233            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4234            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4235        };
4236        let kind = match ty {
4237            TransmitIndex::Stream(_) => TransmitKind::Stream,
4238            TransmitIndex::Future(_) => TransmitKind::Future,
4239        };
4240        let id = TableId::<TransmitHandle>::new(rep);
4241        log::trace!("guest_drop_readable: drop reader {id:?}");
4242        store.host_drop_reader(id, kind)
4243    }
4244
4245    /// Create a new error context for the given component.
4246    pub(crate) fn error_context_new(
4247        self,
4248        store: &mut StoreOpaque,
4249        ty: TypeComponentLocalErrorContextTableIndex,
4250        options: OptionsIndex,
4251        debug_msg_address: u32,
4252        debug_msg_len: u32,
4253    ) -> Result<u32> {
4254        let lift_ctx = &mut LiftContext::new(store, options, self);
4255        let debug_msg = String::linear_lift_from_flat(
4256            lift_ctx,
4257            InterfaceType::String,
4258            &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4259        )?;
4260
4261        // Create a new ErrorContext that is tracked along with other concurrent state
4262        let err_ctx = ErrorContextState { debug_msg };
4263        let state = store.concurrent_state_mut()?;
4264        let table_id = state.push(err_ctx)?;
4265        let global_ref_count_idx =
4266            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4267
4268        // Add to the global error context ref counts
4269        let _ = state
4270            .global_error_context_ref_counts
4271            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4272
4273        // Error context are tracked both locally (to a single component instance) and globally
4274        // the counts for both must stay in sync.
4275        //
4276        // Here we reflect the newly created global concurrent error context state into the
4277        // component instance's locally tracked count, along with the appropriate key into the global
4278        // ref tracking data structures to enable later lookup
4279        let local_idx = self
4280            .id()
4281            .get_mut(store)
4282            .table_for_error_context(ty)
4283            .error_context_insert(table_id.rep())?;
4284
4285        Ok(local_idx)
4286    }
4287
4288    /// Retrieve the debug message from the specified error context.
4289    pub(super) fn error_context_debug_message<T>(
4290        self,
4291        store: StoreContextMut<T>,
4292        ty: TypeComponentLocalErrorContextTableIndex,
4293        options: OptionsIndex,
4294        err_ctx_handle: u32,
4295        debug_msg_address: u32,
4296    ) -> Result<()> {
4297        // Retrieve the error context and internal debug message
4298        let handle_table_id_rep = self
4299            .id()
4300            .get_mut(store.0)
4301            .table_for_error_context(ty)
4302            .error_context_rep(err_ctx_handle)?;
4303
4304        let state = store.0.concurrent_state_mut()?;
4305        // Get the state associated with the error context
4306        let ErrorContextState { debug_msg } =
4307            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4308        let debug_msg = debug_msg.clone();
4309
4310        let lower_cx = &mut LowerContext::new(store, options, self);
4311        let debug_msg_address = usize::try_from(debug_msg_address)?;
4312        // Lower the string into the component's memory.
4313        //
4314        // Note that the "8" here is the size of a WIT `string` in linear
4315        // memory, the ptr+length. This'll need to be updated when `memory64`
4316        // comes along. (FIXME(#4311))
4317        let offset = lower_cx
4318            .as_slice_mut()
4319            .get(debug_msg_address..)
4320            .and_then(|b| b.get(..8))
4321            .map(|_| debug_msg_address)
4322            .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4323        debug_msg
4324            .as_str()
4325            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4326
4327        Ok(())
4328    }
4329
4330    /// Implements the `future.cancel-read` intrinsic.
4331    pub(crate) fn future_cancel_read(
4332        self,
4333        store: &mut StoreOpaque,
4334        ty: TypeFutureTableIndex,
4335        async_: bool,
4336        reader: u32,
4337    ) -> Result<u32> {
4338        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4339            .map(|v| v.encode())
4340    }
4341
4342    /// Implements the `future.cancel-write` intrinsic.
4343    pub(crate) fn future_cancel_write(
4344        self,
4345        store: &mut StoreOpaque,
4346        ty: TypeFutureTableIndex,
4347        async_: bool,
4348        writer: u32,
4349    ) -> Result<u32> {
4350        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4351            .map(|v| v.encode())
4352    }
4353
4354    /// Implements the `stream.cancel-read` intrinsic.
4355    pub(crate) fn stream_cancel_read(
4356        self,
4357        store: &mut StoreOpaque,
4358        ty: TypeStreamTableIndex,
4359        async_: bool,
4360        reader: u32,
4361    ) -> Result<u32> {
4362        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4363            .map(|v| v.encode())
4364    }
4365
4366    /// Implements the `stream.cancel-write` intrinsic.
4367    pub(crate) fn stream_cancel_write(
4368        self,
4369        store: &mut StoreOpaque,
4370        ty: TypeStreamTableIndex,
4371        async_: bool,
4372        writer: u32,
4373    ) -> Result<u32> {
4374        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4375            .map(|v| v.encode())
4376    }
4377
4378    /// Implements the `future.drop-readable` intrinsic.
4379    pub(crate) fn future_drop_readable(
4380        self,
4381        store: &mut StoreOpaque,
4382        ty: TypeFutureTableIndex,
4383        reader: u32,
4384    ) -> Result<()> {
4385        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4386    }
4387
4388    /// Implements the `stream.drop-readable` intrinsic.
4389    pub(crate) fn stream_drop_readable(
4390        self,
4391        store: &mut StoreOpaque,
4392        ty: TypeStreamTableIndex,
4393        reader: u32,
4394    ) -> Result<()> {
4395        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4396    }
4397
4398    /// Allocate a new future or stream and grant ownership of both the read and
4399    /// write ends to the (sub-)component instance to which the specified
4400    /// `TransmitIndex` belongs.
4401    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4402        let (write, read) = store
4403            .concurrent_state_mut()?
4404            .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4405
4406        let table = self.id().get_mut(store).table_for_transmit(ty);
4407        let (read_handle, write_handle) = match ty {
4408            TransmitIndex::Future(ty) => (
4409                table.future_insert_read(ty, read.rep())?,
4410                table.future_insert_write(ty, write.rep())?,
4411            ),
4412            TransmitIndex::Stream(ty) => (
4413                table.stream_insert_read(ty, read.rep())?,
4414                table.stream_insert_write(ty, write.rep())?,
4415            ),
4416        };
4417
4418        let state = store.concurrent_state_mut()?;
4419        state.get_mut(read)?.common.handle = Some(read_handle);
4420        state.get_mut(write)?.common.handle = Some(write_handle);
4421
4422        Ok(ResourcePair {
4423            write: write_handle,
4424            read: read_handle,
4425        })
4426    }
4427
4428    /// Drop the specified error context.
4429    pub(crate) fn error_context_drop(
4430        self,
4431        store: &mut StoreOpaque,
4432        ty: TypeComponentLocalErrorContextTableIndex,
4433        error_context: u32,
4434    ) -> Result<()> {
4435        let instance = self.id().get_mut(store);
4436
4437        let local_handle_table = instance.table_for_error_context(ty);
4438
4439        let rep = local_handle_table.error_context_drop(error_context)?;
4440
4441        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4442
4443        let state = store.concurrent_state_mut()?;
4444        let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4445            .global_error_context_ref_counts
4446            .get_mut(&global_ref_count_idx)
4447        else {
4448            bail_bug!("retrieve concurrent state for error context during drop")
4449        };
4450
4451        // Reduce the component-global ref count, removing tracking if necessary
4452        if *global_ref_count < 1 {
4453            bail_bug!("ref count unexpectedly zero");
4454        }
4455        *global_ref_count -= 1;
4456        if *global_ref_count == 0 {
4457            state
4458                .global_error_context_ref_counts
4459                .remove(&global_ref_count_idx);
4460
4461            state
4462                .delete(TableId::<ErrorContextState>::new(rep))
4463                .context("deleting component-global error context data")?;
4464        }
4465
4466        Ok(())
4467    }
4468
4469    /// Transfer ownership of the specified stream or future read end from one
4470    /// guest to another.
4471    fn guest_transfer(
4472        self,
4473        store: &mut StoreOpaque,
4474        src_idx: u32,
4475        src: TransmitIndex,
4476        dst: TransmitIndex,
4477    ) -> Result<u32> {
4478        let id = self.lift_index_to_transmit(store, src, src_idx)?;
4479        self.lower_transmit_to_index(store, dst, id)
4480    }
4481
4482    fn lift_index_to_transmit(
4483        self,
4484        store: &mut StoreOpaque,
4485        ty: TransmitIndex,
4486        src_idx: u32,
4487    ) -> Result<TableId<TransmitHandle>> {
4488        let (state, _, _, instance) = store.lift_context_parts(self);
4489        lift_index_to_transmit(instance, state.concurrent_state_mut(), ty, src_idx)
4490    }
4491
4492    fn lower_transmit_to_index(
4493        self,
4494        store: &mut StoreOpaque,
4495        ty: TransmitIndex,
4496        id: TableId<TransmitHandle>,
4497    ) -> Result<u32> {
4498        let (state, _, _, instance) = store.lift_context_parts(self);
4499        lower_transmit_to_index(instance, state.concurrent_state_mut(), ty, id)
4500    }
4501
4502    /// Implements the `future.new` intrinsic.
4503    pub(crate) fn future_new(
4504        self,
4505        store: &mut StoreOpaque,
4506        ty: TypeFutureTableIndex,
4507    ) -> Result<ResourcePair> {
4508        self.guest_new(store, TransmitIndex::Future(ty))
4509    }
4510
4511    /// Implements the `stream.new` intrinsic.
4512    pub(crate) fn stream_new(
4513        self,
4514        store: &mut StoreOpaque,
4515        ty: TypeStreamTableIndex,
4516    ) -> Result<ResourcePair> {
4517        self.guest_new(store, TransmitIndex::Stream(ty))
4518    }
4519
4520    /// Transfer ownership of the specified future read end from one guest to
4521    /// another.
4522    pub(crate) fn future_transfer(
4523        self,
4524        store: &mut StoreOpaque,
4525        src_idx: u32,
4526        src: TypeFutureTableIndex,
4527        dst: TypeFutureTableIndex,
4528    ) -> Result<u32> {
4529        self.guest_transfer(
4530            store,
4531            src_idx,
4532            TransmitIndex::Future(src),
4533            TransmitIndex::Future(dst),
4534        )
4535    }
4536
4537    /// Transfer ownership of the specified stream read end from one guest to
4538    /// another.
4539    pub(crate) fn stream_transfer(
4540        self,
4541        store: &mut StoreOpaque,
4542        src_idx: u32,
4543        src: TypeStreamTableIndex,
4544        dst: TypeStreamTableIndex,
4545    ) -> Result<u32> {
4546        self.guest_transfer(
4547            store,
4548            src_idx,
4549            TransmitIndex::Stream(src),
4550            TransmitIndex::Stream(dst),
4551        )
4552    }
4553
4554    /// Copy the specified error context from one component to another.
4555    pub(crate) fn error_context_transfer(
4556        self,
4557        store: &mut StoreOpaque,
4558        src_idx: u32,
4559        src: TypeComponentLocalErrorContextTableIndex,
4560        dst: TypeComponentLocalErrorContextTableIndex,
4561    ) -> Result<u32> {
4562        let mut instance = self.id().get_mut(store);
4563        let rep = instance
4564            .as_mut()
4565            .table_for_error_context(src)
4566            .error_context_rep(src_idx)?;
4567        let dst_idx = instance
4568            .table_for_error_context(dst)
4569            .error_context_insert(rep)?;
4570
4571        // Update the global (cross-subcomponent) count for error contexts
4572        // as the new component has essentially created a new reference that will
4573        // be dropped/handled independently
4574        let global_ref_count = store
4575            .concurrent_state_mut()?
4576            .global_error_context_ref_counts
4577            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4578            .context("global ref count present for existing (sub)component error context")?;
4579
4580        global_ref_count.0 = global_ref_count
4581            .0
4582            .checked_add(1)
4583            .ok_or_else(|| format_err!(Trap::ReferenceCountOverflow))?;
4584
4585        Ok(dst_idx)
4586    }
4587}
4588
4589/// Performs the opertion of lifting a future or stream from and instance into
4590/// the `TransmitHandle` for it.
4591///
4592/// The `src_idx` is the guest-specified index within `instance` and `ty` is the
4593/// expected type of future/stream.
4594fn lift_index_to_transmit(
4595    instance: Pin<&mut ComponentInstance>,
4596    concurrent_state: &mut ConcurrentState,
4597    ty: TransmitIndex,
4598    src_idx: u32,
4599) -> Result<TableId<TransmitHandle>> {
4600    let handle_table = instance.table_for_transmit(ty);
4601    let (rep, is_done) = match ty {
4602        TransmitIndex::Future(idx) => handle_table.future_remove_readable(idx, src_idx)?,
4603        TransmitIndex::Stream(idx) => handle_table.stream_remove_readable(idx, src_idx)?,
4604    };
4605    let desc = match ty {
4606        TransmitIndex::Future(_) => "future",
4607        TransmitIndex::Stream(_) => "stream",
4608    };
4609    if is_done {
4610        bail!("cannot lift {desc} after being notified that the writable end dropped");
4611    }
4612    let id = TableId::<TransmitHandle>::new(rep);
4613    let future = concurrent_state.get_mut(id)?;
4614    if future.common.set.is_some() {
4615        bail!("cannot lift {desc} while it's in a waitable set");
4616    }
4617    future.common.handle = None;
4618
4619    let state = future.state;
4620    if concurrent_state.get_mut(state)?.done {
4621        bail!("cannot lift {desc} after previous read succeeded");
4622    }
4623
4624    Ok(id)
4625}
4626
4627/// Performs the opertion of lowering a future or stream `TransmitHandle` into
4628/// an instance.
4629fn lower_transmit_to_index(
4630    instance: Pin<&mut ComponentInstance>,
4631    concurrent_state: &mut ConcurrentState,
4632    ty: TransmitIndex,
4633    id: TableId<TransmitHandle>,
4634) -> Result<u32> {
4635    let state = concurrent_state.get_mut(id)?.state;
4636    debug_assert_eq!(concurrent_state.get_mut(state)?.read_handle, id);
4637    let handle_table = instance.table_for_transmit(ty);
4638    let handle = match ty {
4639        TransmitIndex::Future(idx) => handle_table.future_insert_read(idx, id.rep()),
4640        TransmitIndex::Stream(idx) => handle_table.stream_insert_read(idx, id.rep()),
4641    }?;
4642    concurrent_state.get_mut(id)?.common.handle = Some(handle);
4643    Ok(handle)
4644}
4645
4646impl ComponentInstance {
4647    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4648        let (states, types) = self.instance_states();
4649        let runtime_instance = match ty {
4650            TransmitIndex::Stream(ty) => types[ty].instance,
4651            TransmitIndex::Future(ty) => types[ty].instance,
4652        };
4653        states[runtime_instance].handle_table()
4654    }
4655
4656    fn table_for_error_context(
4657        self: Pin<&mut Self>,
4658        ty: TypeComponentLocalErrorContextTableIndex,
4659    ) -> &mut HandleTable {
4660        let (states, types) = self.instance_states();
4661        let runtime_instance = types[ty].instance;
4662        states[runtime_instance].handle_table()
4663    }
4664
4665    fn get_mut_by_index(
4666        self: Pin<&mut Self>,
4667        ty: TransmitIndex,
4668        index: u32,
4669    ) -> Result<(u32, &mut TransmitLocalState)> {
4670        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4671    }
4672}
4673
4674impl ConcurrentState {
4675    fn send_write_result(
4676        &mut self,
4677        ty: TransmitIndex,
4678        id: TableId<TransmitState>,
4679        handle: u32,
4680        code: ReturnCode,
4681    ) -> Result<()> {
4682        let write_handle = self.get_mut(id)?.write_handle.rep();
4683        self.set_event(
4684            write_handle,
4685            match ty {
4686                TransmitIndex::Future(ty) => Event::FutureWrite {
4687                    code,
4688                    pending: Some((ty, handle)),
4689                },
4690                TransmitIndex::Stream(ty) => Event::StreamWrite {
4691                    code,
4692                    pending: Some((ty, handle)),
4693                },
4694            },
4695        )
4696    }
4697
4698    fn send_read_result(
4699        &mut self,
4700        ty: TransmitIndex,
4701        id: TableId<TransmitState>,
4702        handle: u32,
4703        code: ReturnCode,
4704    ) -> Result<()> {
4705        let read_handle = self.get_mut(id)?.read_handle.rep();
4706        self.set_event(
4707            read_handle,
4708            match ty {
4709                TransmitIndex::Future(ty) => Event::FutureRead {
4710                    code,
4711                    pending: Some((ty, handle)),
4712                },
4713                TransmitIndex::Stream(ty) => Event::StreamRead {
4714                    code,
4715                    pending: Some((ty, handle)),
4716                },
4717            },
4718        )
4719    }
4720
4721    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4722        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4723    }
4724
4725    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4726        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4727    }
4728
4729    /// Set or update the event for the specified waitable.
4730    ///
4731    /// If there is already an event set for this waitable, we assert that it is
4732    /// of the same variant as the new one and reuse the `ReturnCode` count and
4733    /// the `pending` field if applicable.
4734    // TODO: This is a bit awkward due to how
4735    // `Event::{Stream,Future}{Write,Read}` and
4736    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4737    // Consider updating those representations in a way that allows this
4738    // function to be simplified.
4739    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4740        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4741
4742        fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4743            let (ReturnCode::Completed(count)
4744            | ReturnCode::Dropped(count)
4745            | ReturnCode::Cancelled(count)) = old
4746            else {
4747                bail_bug!("unexpected old return code")
4748            };
4749
4750            Ok(match new {
4751                ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count),
4752                ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count),
4753                _ => bail_bug!("unexpected new return code"),
4754            })
4755        }
4756
4757        let event = match (waitable.take_event(self)?, event) {
4758            (None, _) => event,
4759            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4760            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4761            (
4762                Some(Event::StreamWrite {
4763                    code: old_code,
4764                    pending: old_pending,
4765                }),
4766                Event::StreamWrite { code, pending },
4767            ) => Event::StreamWrite {
4768                code: update_code(old_code, code)?,
4769                pending: old_pending.or(pending),
4770            },
4771            (
4772                Some(Event::StreamRead {
4773                    code: old_code,
4774                    pending: old_pending,
4775                }),
4776                Event::StreamRead { code, pending },
4777            ) => Event::StreamRead {
4778                code: update_code(old_code, code)?,
4779                pending: old_pending.or(pending),
4780            },
4781            _ => bail_bug!("unexpected event combination"),
4782        };
4783
4784        waitable.set_event(self, Some(event))
4785    }
4786
4787    /// Allocate a new future or stream, including the `TransmitState` and the
4788    /// `TransmitHandle`s corresponding to the read and write ends.
4789    fn new_transmit(
4790        &mut self,
4791        origin: TransmitOrigin,
4792    ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4793        let state_id = self.push(TransmitState::new(origin))?;
4794
4795        let write = self.push(TransmitHandle::new(state_id))?;
4796        let read = self.push(TransmitHandle::new(state_id))?;
4797
4798        let state = self.get_mut(state_id)?;
4799        state.write_handle = write;
4800        state.read_handle = read;
4801
4802        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4803
4804        Ok((write, read))
4805    }
4806
4807    /// Delete the specified future or stream, including the read and write ends.
4808    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4809        let state = self.delete(state_id)?;
4810        self.delete(state.write_handle)?;
4811        self.delete(state.read_handle)?;
4812
4813        log::trace!(
4814            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4815            state.write_handle,
4816            state.read_handle,
4817        );
4818
4819        Ok(())
4820    }
4821}
4822
4823pub(crate) struct ResourcePair {
4824    pub(crate) write: u32,
4825    pub(crate) read: u32,
4826}
4827
4828impl Waitable {
4829    /// Handle the imminent delivery of the specified event, e.g. by updating
4830    /// the state of the stream or future.
4831    pub(super) fn on_delivery(
4832        &self,
4833        store: &mut StoreOpaque,
4834        instance: Instance,
4835        event: Event,
4836    ) -> Result<()> {
4837        let instance = instance.id().get_mut(store);
4838        let (rep, state, code) = match event {
4839            Event::FutureRead {
4840                pending: Some((ty, handle)),
4841                code,
4842            }
4843            | Event::FutureWrite {
4844                pending: Some((ty, handle)),
4845                code,
4846            } => {
4847                let runtime_instance = instance.component().types()[ty].instance;
4848                let (rep, state) = instance.instance_states().0[runtime_instance]
4849                    .handle_table()
4850                    .future_rep(ty, handle)?;
4851                (rep, state, code)
4852            }
4853            Event::StreamRead {
4854                pending: Some((ty, handle)),
4855                code,
4856            }
4857            | Event::StreamWrite {
4858                pending: Some((ty, handle)),
4859                code,
4860            } => {
4861                let runtime_instance = instance.component().types()[ty].instance;
4862                let (rep, state) = instance.instance_states().0[runtime_instance]
4863                    .handle_table()
4864                    .stream_rep(ty, handle)?;
4865                (rep, state, code)
4866            }
4867            _ => return Ok(()),
4868        };
4869        if rep != self.rep() {
4870            bail_bug!("unexpected rep mismatch");
4871        }
4872        if *state != TransmitLocalState::Busy {
4873            bail_bug!("expected state to be busy");
4874        }
4875        let done = matches!(code, ReturnCode::Dropped(_));
4876        *state = match event {
4877            Event::FutureRead { .. } | Event::StreamRead { .. } => {
4878                TransmitLocalState::Read { done }
4879            }
4880            Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
4881                TransmitLocalState::Write { done }
4882            }
4883            _ => bail_bug!("unexpected event for stream"),
4884        };
4885
4886        let transmit_handle = TableId::<TransmitHandle>::new(rep);
4887        let state = store.concurrent_state_mut()?;
4888        let transmit_id = state.get_mut(transmit_handle)?.state;
4889        let transmit = state.get_mut(transmit_id)?;
4890
4891        match event {
4892            Event::StreamRead { .. } => {
4893                transmit.read = ReadState::Open;
4894            }
4895            Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4896            _ => {}
4897        }
4898        Ok(())
4899    }
4900}
4901
4902/// Determine whether an intra-component read/write is allowed for the specified
4903/// `stream` or `future` payload type according to the component model
4904/// specification.
4905fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
4906    matches!(
4907        ty,
4908        None | Some(
4909            InterfaceType::S8
4910                | InterfaceType::U8
4911                | InterfaceType::S16
4912                | InterfaceType::U16
4913                | InterfaceType::S32
4914                | InterfaceType::U32
4915                | InterfaceType::S64
4916                | InterfaceType::U64
4917                | InterfaceType::Float32
4918                | InterfaceType::Float64
4919        )
4920    )
4921}
4922
4923/// Helper structure to manage moving a `T` in/out of an interior `Mutex` which
4924/// contains an
4925/// `Option<T>`
4926struct LockedState<T> {
4927    inner: TryMutex<Option<T>>,
4928}
4929
4930impl<T> LockedState<T> {
4931    /// Creates a new initial state with `value` stored.
4932    fn new(value: T) -> Self {
4933        Self {
4934            inner: TryMutex::new(Some(value)),
4935        }
4936    }
4937
4938    /// Attempts to lock the inner mutex and return its guard.
4939    ///
4940    /// # Errors
4941    ///
4942    /// Fails if this lock is either poisoned or if it's currently locked.
4943    /// As-used in this file there should never actually be contention on this
4944    /// lock nor recursive access so failing to acquire the lock is a fatal
4945    /// error that gets propagated upwards.
4946    fn try_lock(&self) -> Result<TryMutexGuard<'_, Option<T>>> {
4947        match self.inner.try_lock() {
4948            Some(lock) => Ok(lock),
4949            None => bail_bug!("should not have contention on state lock"),
4950        }
4951    }
4952
4953    /// Takes the inner `T` out of this state, returning it as a guard which
4954    /// will put it back when finished.
4955    ///
4956    /// # Errors
4957    ///
4958    /// Returns an error if the state `T` isn't present.
4959    fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4960        let result = self.try_lock()?.take();
4961        match result {
4962            Some(result) => Ok(LockedStateGuard {
4963                value: ManuallyDrop::new(result),
4964                state: self,
4965            }),
4966            None => bail_bug!("lock value unexpectedly missing"),
4967        }
4968    }
4969
4970    /// Performs the operation `f` on the inner state `&mut T`.
4971    ///
4972    /// This will acquire the internal lock and invoke `f`, so `f` should not
4973    /// expect to be able to recursively acquire this lock.
4974    ///
4975    /// # Errors
4976    ///
4977    /// Returns an error if the state `T` isn't present.
4978    fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4979        let mut inner = self.try_lock()?;
4980        match &mut *inner {
4981            Some(state) => Ok(f(state)),
4982            None => bail_bug!("lock value unexpectedly missing"),
4983        }
4984    }
4985}
4986
4987/// Helper structure returned from [`LockedState::take`] which will put the
4988/// state specified by `value` back into the original lock once this is dropped.
4989struct LockedStateGuard<'a, T> {
4990    value: ManuallyDrop<T>,
4991    state: &'a LockedState<T>,
4992}
4993
4994impl<T> Deref for LockedStateGuard<'_, T> {
4995    type Target = T;
4996
4997    fn deref(&self) -> &T {
4998        &self.value
4999    }
5000}
5001
5002impl<T> DerefMut for LockedStateGuard<'_, T> {
5003    fn deref_mut(&mut self) -> &mut T {
5004        &mut self.value
5005    }
5006}
5007
5008impl<T> Drop for LockedStateGuard<'_, T> {
5009    fn drop(&mut self) {
5010        // SAFETY: `ManuallyDrop::take` requires that after invoked the
5011        // original value is not read. This is the `Drop` for this type which
5012        // means we have exclusive ownership and it is not read further in the
5013        // destructor, satisfying this requirement.
5014        let value = unsafe { ManuallyDrop::take(&mut self.value) };
5015
5016        // If this fails due to contention that's a bug, but we're not in a
5017        // position to panic due to this being a destructor nor return an error,
5018        // so defer the bug to showing up later.
5019        if let Ok(mut lock) = self.state.try_lock() {
5020            *lock = Some(value);
5021        }
5022    }
5023}
5024
5025#[cfg(test)]
5026mod tests {
5027    use super::*;
5028    use crate::{Engine, Store};
5029    use core::future::pending;
5030    use core::pin::pin;
5031    use std::sync::LazyLock;
5032
5033    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
5034
5035    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
5036    where
5037        T: FutureProducer<()>,
5038    {
5039        rx.poll_produce(
5040            &mut Context::from_waker(Waker::noop()),
5041            Store::new(&ENGINE, ()).as_context_mut(),
5042            finish,
5043        )
5044    }
5045
5046    #[test]
5047    fn future_producer() {
5048        let mut fut = pin!(async { crate::error::Ok(()) });
5049        assert!(matches!(
5050            poll_future_producer(fut.as_mut(), false),
5051            Poll::Ready(Ok(Some(()))),
5052        ));
5053
5054        let mut fut = pin!(async { crate::error::Ok(()) });
5055        assert!(matches!(
5056            poll_future_producer(fut.as_mut(), true),
5057            Poll::Ready(Ok(Some(()))),
5058        ));
5059
5060        let mut fut = pin!(pending::<Result<()>>());
5061        assert!(matches!(
5062            poll_future_producer(fut.as_mut(), false),
5063            Poll::Pending,
5064        ));
5065        assert!(matches!(
5066            poll_future_producer(fut.as_mut(), true),
5067            Poll::Ready(Ok(None)),
5068        ));
5069
5070        let (tx, rx) = oneshot::channel();
5071        let mut rx = pin!(rx);
5072        assert!(matches!(
5073            poll_future_producer(rx.as_mut(), false),
5074            Poll::Pending,
5075        ));
5076        assert!(matches!(
5077            poll_future_producer(rx.as_mut(), true),
5078            Poll::Ready(Ok(None)),
5079        ));
5080        tx.send(()).unwrap();
5081        assert!(matches!(
5082            poll_future_producer(rx.as_mut(), true),
5083            Poll::Ready(Ok(Some(()))),
5084        ));
5085
5086        let (tx, rx) = oneshot::channel();
5087        let mut rx = pin!(rx);
5088        tx.send(()).unwrap();
5089        assert!(matches!(
5090            poll_future_producer(rx.as_mut(), false),
5091            Poll::Ready(Ok(Some(()))),
5092        ));
5093
5094        let (tx, rx) = oneshot::channel::<()>();
5095        let mut rx = pin!(rx);
5096        drop(tx);
5097        assert!(matches!(
5098            poll_future_producer(rx.as_mut(), false),
5099            Poll::Ready(Err(..)),
5100        ));
5101
5102        let (tx, rx) = oneshot::channel::<()>();
5103        let mut rx = pin!(rx);
5104        drop(tx);
5105        assert!(matches!(
5106            poll_future_producer(rx.as_mut(), true),
5107            Poll::Ready(Err(..)),
5108        ));
5109    }
5110}