Skip to main content

wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9    AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10    Val, WasmList,
11};
12use crate::prelude::*;
13use crate::store::{StoreOpaque, StoreToken};
14use crate::try_mutex::{TryMutex, TryMutexGuard};
15use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
16use crate::vm::{AlwaysMut, VMStore};
17use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
18use crate::{
19    Error, Result, Trap, bail, bail_bug, ensure,
20    error::{Context as _, format_err},
21};
22use alloc::sync::Arc;
23use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
24use core::any::{Any, TypeId};
25use core::fmt;
26use core::future;
27use core::iter;
28use core::marker::PhantomData;
29use core::mem::{self, ManuallyDrop, MaybeUninit};
30use core::ops::{Deref, DerefMut};
31use core::pin::Pin;
32use core::task::{Context, Poll, Waker, ready};
33use futures::channel::oneshot;
34use futures::{FutureExt as _, stream};
35use wasmtime_environ::component::{
36    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
37    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
38    TypeFutureTableIndex, TypeStreamTableIndex,
39};
40
41pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
42
43mod buffers;
44
45/// Enum for distinguishing between a stream or future in functions that handle
46/// both.
47#[derive(Copy, Clone, Debug)]
48pub enum TransmitKind {
49    Stream,
50    Future,
51}
52
53/// Represents `{stream,future}.{read,write}` results.
54#[derive(Copy, Clone, Debug, PartialEq)]
55pub enum ReturnCode {
56    Blocked,
57    Completed(ItemCount),
58    Dropped(ItemCount),
59    Cancelled(ItemCount),
60}
61
62impl ReturnCode {
63    /// Pack `self` into a single 32-bit integer that may be returned to the
64    /// guest.
65    ///
66    /// This corresponds to `pack_copy_result` in the Component Model spec.
67    pub fn encode(&self) -> u32 {
68        const BLOCKED: u32 = 0xffff_ffff;
69        const COMPLETED: u32 = 0x0;
70        const DROPPED: u32 = 0x1;
71        const CANCELLED: u32 = 0x2;
72        match self {
73            ReturnCode::Blocked => BLOCKED,
74            ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED,
75            ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED,
76            ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED,
77        }
78    }
79
80    /// Returns `Self::Completed` with the specified count (or zero if
81    /// `matches!(kind, TransmitKind::Future)`)
82    fn completed(kind: TransmitKind, count: ItemCount) -> Self {
83        Self::Completed(if let TransmitKind::Future = kind {
84            ItemCount::ZERO
85        } else {
86            count
87        })
88    }
89}
90
91/// Representation of how many items are being operated on in a stream read or
92/// write.
93///
94/// The component model requires that stream operations are limited to `1<<28`
95/// items in one go. This type is a newtype wrapper around `u32` with the
96/// invariant that the internal value is limited by this amount.
97#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
98#[repr(transparent)]
99pub struct ItemCount {
100    raw: u32,
101}
102
103impl ItemCount {
104    const MAX: u32 = 1 << 28;
105    const ZERO: ItemCount = ItemCount { raw: 0 };
106
107    /// Creates a new `ItemCount` with the specified count, or a trap if it's
108    /// too large.
109    fn new(count: u32) -> Result<Self, Trap> {
110        if count < Self::MAX {
111            Ok(Self { raw: count })
112        } else {
113            Err(Trap::StreamOpTooBig)
114        }
115    }
116
117    /// Same as `Self::new` but takes a `usize`.
118    fn new_usize(count: usize) -> Result<Self, Trap> {
119        let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?;
120        Self::new(count)
121    }
122
123    fn as_u32(&self) -> u32 {
124        self.raw
125    }
126
127    fn as_usize(&self) -> usize {
128        usize::try_from(self.raw).unwrap()
129    }
130
131    /// Increments `self` by `amt`, returning a trap if the amount would exceed
132    /// the maximum item count.
133    fn inc(&mut self, amt: usize) -> Result<(), Trap> {
134        let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?;
135        let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?;
136        if new_raw < Self::MAX {
137            self.raw = new_raw;
138            Ok(())
139        } else {
140            Err(Trap::StreamOpTooBig)
141        }
142    }
143
144    /// Helper to add two `ItemCount`s together, fallibly.
145    ///
146    /// It's considered a bug if this overflows, so this is only suitable in
147    /// situations where overflow and/or exceeding the total item count is known
148    /// that it may be possible.
149    fn add(&self, other: ItemCount) -> Result<ItemCount> {
150        match self.raw.checked_add(other.raw) {
151            Some(raw) => Ok(ItemCount::new(raw)?),
152            None => bail_bug!("overflow in `ItemCount::add`"),
153        }
154    }
155
156    /// Same as `add`, but for subtraction.
157    ///
158    /// Like with `add` this is only suitable for situations where the result is
159    /// known to not underflow.
160    fn sub(&self, other: ItemCount) -> Result<ItemCount> {
161        match self.raw.checked_sub(other.raw) {
162            Some(raw) => Ok(ItemCount { raw }),
163            None => bail_bug!("underflow in `ItemCount::sub`"),
164        }
165    }
166}
167
168impl fmt::Display for ItemCount {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        self.raw.fmt(f)
171    }
172}
173
174impl fmt::Debug for ItemCount {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        self.raw.fmt(f)
177    }
178}
179
180impl PartialEq<u32> for ItemCount {
181    fn eq(&self, other: &u32) -> bool {
182        self.raw == *other
183    }
184}
185
186impl PartialOrd<u32> for ItemCount {
187    fn partial_cmp(&self, other: &u32) -> Option<core::cmp::Ordering> {
188        self.raw.partial_cmp(other)
189    }
190}
191
192/// Represents a stream or future type index.
193///
194/// This is useful as a parameter type for functions which operate on either a
195/// future or a stream.
196#[derive(Copy, Clone, Debug)]
197pub enum TransmitIndex {
198    Stream(TypeStreamTableIndex),
199    Future(TypeFutureTableIndex),
200}
201
202impl TransmitIndex {
203    pub fn kind(&self) -> TransmitKind {
204        match self {
205            TransmitIndex::Stream(_) => TransmitKind::Stream,
206            TransmitIndex::Future(_) => TransmitKind::Future,
207        }
208    }
209
210    /// Retrieve the payload type of the specified stream or future, or `None`
211    /// if it has no payload type.
212    fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
213        match self {
214            TransmitIndex::Stream(i) => {
215                let ty = types[*i].ty;
216                types[ty].payload.as_ref()
217            }
218            TransmitIndex::Future(i) => {
219                let ty = types[*i].ty;
220                types[ty].payload.as_ref()
221            }
222        }
223    }
224}
225
226/// Retrieve the host rep and state for the specified guest-visible waitable
227/// handle.
228fn get_mut_by_index_from(
229    handle_table: &mut HandleTable,
230    ty: TransmitIndex,
231    index: u32,
232) -> Result<(u32, &mut TransmitLocalState)> {
233    match ty {
234        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
235        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
236    }
237}
238
239fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
240    mut store: StoreContextMut<U>,
241    instance: Instance,
242    caller_thread: QualifiedThreadId,
243    options: OptionsIndex,
244    ty: TransmitIndex,
245    address: usize,
246    count: usize,
247    buffer: &mut B,
248) -> Result<()> {
249    let count = buffer.remaining().len().min(count);
250
251    // If lowering may call realloc in the guest, then the guest may need
252    // to access its thread context, so we need to set the current thread before lowering
253    // and restore the old one afterward.
254    let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
255        let old_thread = store.0.set_thread(caller_thread)?;
256        (
257            &mut LowerContext::new(store.as_context_mut(), options, instance),
258            Some(old_thread),
259        )
260    } else {
261        (
262            &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
263            None,
264        )
265    };
266
267    if address % usize::try_from(T::ALIGN32)? != 0 {
268        bail!("read pointer not aligned");
269    }
270    lower
271        .as_slice_mut()
272        .get_mut(address..)
273        .and_then(|b| b.get_mut(..T::SIZE32 * count))
274        .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
275
276    if let Some(ty) = ty.payload(lower.types) {
277        T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
278    }
279
280    if let Some(old_thread) = old_thread {
281        store.0.set_thread(old_thread)?;
282    }
283
284    buffer.skip(count);
285
286    Ok(())
287}
288
289fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
290    lift: &mut LiftContext<'_>,
291    ty: Option<InterfaceType>,
292    buffer: &mut B,
293    address: usize,
294    count: usize,
295) -> Result<()> {
296    let count = count.min(buffer.remaining_capacity());
297    if T::IS_RUST_UNIT_TYPE {
298        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
299        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
300        // is a valid way to populate the zero-sized buffer.
301        buffer.extend(
302            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
303        )
304    } else {
305        let ty = match ty {
306            Some(ty) => ty,
307            None => bail_bug!("type required for non-unit lift"),
308        };
309        if address % usize::try_from(T::ALIGN32)? != 0 {
310            bail!("write pointer not aligned");
311        }
312        lift.memory()
313            .get(address..)
314            .and_then(|b| b.get(..T::SIZE32 * count))
315            .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
316
317        let list = &WasmList::new(address, count, lift, ty)?;
318        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
319    }
320    Ok(())
321}
322
323/// Represents the state associated with an error context
324#[derive(Debug, PartialEq, Eq, PartialOrd)]
325pub(super) struct ErrorContextState {
326    /// Debug message associated with the error context
327    pub(crate) debug_msg: String,
328}
329
330/// Represents the size and alignment for a "flat" Component Model type,
331/// i.e. one containing no pointers or handles.
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub(super) struct FlatAbi {
334    pub(super) size: u32,
335    pub(super) align: u32,
336}
337
338struct HostBuffer<'a> {
339    dst: &'a mut Vec<u8>,
340    marked_written: &'a mut usize,
341}
342
343impl HostBuffer<'_> {
344    fn reborrow(&mut self) -> HostBuffer<'_> {
345        HostBuffer {
346            dst: &mut *self.dst,
347            marked_written: &mut *self.marked_written,
348        }
349    }
350}
351
352/// Represents the buffer for a host- or guest-initiated stream read.
353pub struct Destination<'a, T, B> {
354    id: TableId<TransmitState>,
355    buffer: &'a mut B,
356    host_buffer: Option<HostBuffer<'a>>,
357    _phantom: PhantomData<fn() -> T>,
358}
359
360impl<'a, T, B> Destination<'a, T, B> {
361    /// Reborrow `self` so it can be used again later.
362    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
363        Destination {
364            id: self.id,
365            buffer: &mut *self.buffer,
366            host_buffer: self.host_buffer.as_mut().map(|b| b.reborrow()),
367            _phantom: PhantomData,
368        }
369    }
370
371    /// Take the buffer out of `self`, leaving a default-initialized one in its
372    /// place.
373    ///
374    /// This can be useful for reusing the previously-stored buffer's capacity
375    /// instead of allocating a fresh one.
376    pub fn take_buffer(&mut self) -> B
377    where
378        B: Default,
379    {
380        mem::take(self.buffer)
381    }
382
383    /// Store the specified buffer in `self`.
384    ///
385    /// Any items contained in the buffer will be delivered to the reader after
386    /// the `StreamProducer::poll_produce` call to which this `Destination` was
387    /// passed returns (unless overwritten by another call to `set_buffer`).
388    ///
389    /// If items are stored via this buffer _and_ written via a
390    /// `DirectDestination` view of `self`, then the items in the buffer will be
391    /// delivered after the ones written using `DirectDestination`.
392    pub fn set_buffer(&mut self, buffer: B) {
393        *self.buffer = buffer;
394    }
395
396    /// Return the remaining number of items the current read has capacity to
397    /// accept, if known.
398    ///
399    /// This will return `Some(_)` if the reader is a guest; it will return
400    /// `None` if the reader is the host.
401    ///
402    /// Note that this can return `Some(0)`. This means that the guest is
403    /// attempting to perform a zero-length read which typically means that it's
404    /// trying to wait for this stream to be ready-to-read but is not actually
405    /// ready to receive the items yet. The host in this case is allowed to
406    /// either block waiting for readiness or immediately complete the
407    /// operation. The guest is expected to handle both cases. Some more
408    /// discussion about this case can be found in the discussion of ["Stream
409    /// Readiness" in the component-model repo][docs].
410    ///
411    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
412    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
413        // Note that this unwrap should only trigger for bugs in Wasmtime, and
414        // this is modeled here to centralize the `.unwrap()` for this method in
415        // one location.
416        self.remaining_(store.as_context_mut().0).unwrap()
417    }
418
419    fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
420        let transmit = store.concurrent_state_mut().get_mut(self.id)?;
421
422        if let &ReadState::GuestReady { count, .. } = &transmit.read {
423            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
424                bail_bug!("expected WriteState::HostReady")
425            };
426
427            Ok(Some(count.as_usize() - guest_offset.as_usize()))
428        } else {
429            Ok(None)
430        }
431    }
432}
433
434impl<'a, B> Destination<'a, u8, B> {
435    /// Return a `DirectDestination` view of `self`.
436    ///
437    /// If the reader is a guest, this will provide direct access to the guest's
438    /// read buffer.  If the reader is a host, this will provide access to a
439    /// buffer which will be delivered to the host before any items stored using
440    /// `Destination::set_buffer`.
441    ///
442    /// `capacity` will only be used if the reader is a host, in which case it
443    /// will update the length of the buffer, possibly zero-initializing the new
444    /// elements if the new length is larger than the old length.
445    pub fn as_direct<D>(
446        mut self,
447        store: StoreContextMut<'a, D>,
448        capacity: usize,
449    ) -> DirectDestination<'a, D> {
450        if let Some(buffer) = &mut self.host_buffer {
451            *buffer.marked_written = 0;
452            buffer.dst.resize(capacity, 0);
453        }
454
455        DirectDestination {
456            id: self.id,
457            host_buffer: self.host_buffer,
458            store,
459        }
460    }
461}
462
463/// Represents a read from a `stream<u8>`, providing direct access to the
464/// writer's buffer.
465pub struct DirectDestination<'a, D: 'static> {
466    id: TableId<TransmitState>,
467    host_buffer: Option<HostBuffer<'a>>,
468    store: StoreContextMut<'a, D>,
469}
470
471#[cfg(feature = "std")]
472impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
473    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
474        let rem = self.remaining();
475        let n = rem.len().min(buf.len());
476        rem[..n].copy_from_slice(&buf[..n]);
477        self.mark_written(n);
478        Ok(n)
479    }
480
481    fn flush(&mut self) -> std::io::Result<()> {
482        Ok(())
483    }
484}
485
486impl<D: 'static> DirectDestination<'_, D> {
487    /// Provide direct access to the writer's buffer.
488    pub fn remaining(&mut self) -> &mut [u8] {
489        // Note that this unwrap should only trigger for bugs in Wasmtime, and
490        // this is modeled here to centralize the `.unwrap()` for this method in
491        // one location.
492        self.remaining_().unwrap()
493    }
494
495    fn remaining_(&mut self) -> Result<&mut [u8]> {
496        if let Some(buffer) = self.host_buffer.as_mut() {
497            return Ok(buffer.dst);
498        }
499        let transmit = self
500            .store
501            .as_context_mut()
502            .0
503            .concurrent_state_mut()
504            .get_mut(self.id)?;
505
506        let &ReadState::GuestReady {
507            address,
508            count,
509            options,
510            instance,
511            ..
512        } = &transmit.read
513        else {
514            bail_bug!("expected ReadState::GuestReady")
515        };
516
517        let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
518            bail_bug!("expected WriteState::HostReady")
519        };
520
521        let memory = instance
522            .options_memory_mut(self.store.0, options)
523            .get_mut((address + guest_offset.as_usize())..)
524            .and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize())));
525        match memory {
526            Some(memory) => Ok(memory),
527            None => bail_bug!("guest buffer unexpectedly out of bounds"),
528        }
529    }
530
531    /// Mark the specified number of bytes as written to the writer's buffer.
532    ///
533    /// # Panics
534    ///
535    /// This will panic if the count is larger than the size of the
536    /// buffer returned by `Self::remaining`.
537    pub fn mark_written(&mut self, count: usize) {
538        // Note that this unwrap should only trigger for bugs in Wasmtime, and
539        // this is modeled here to centralize the `.unwrap()` for this method in
540        // one location.
541        self.mark_written_(count).unwrap()
542    }
543
544    fn mark_written_(&mut self, count: usize) -> Result<()> {
545        if let Some(buffer) = self.host_buffer.as_mut() {
546            // Note that this `.unwrap` is a documented panic condition of
547            // `mark_written`.
548            *buffer.marked_written = buffer.marked_written.checked_add(count).unwrap();
549        } else {
550            let transmit = self
551                .store
552                .as_context_mut()
553                .0
554                .concurrent_state_mut()
555                .get_mut(self.id)?;
556
557            let ReadState::GuestReady {
558                count: read_count, ..
559            } = &transmit.read
560            else {
561                bail_bug!("expected ReadState::GuestReady")
562            };
563
564            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
565                bail_bug!("expected WriteState::HostReady");
566            };
567
568            if guest_offset.as_usize() + count > read_count.as_usize() {
569                // Note that this `panic` is a documented panic condition of
570                // `mark_written`.
571                panic!(
572                    "write count ({count}) must be less than or equal to read count ({read_count})"
573                )
574            } else {
575                guest_offset.inc(count)?;
576            }
577        }
578        Ok(())
579    }
580}
581
582/// Represents the state of a `Stream{Producer,Consumer}`.
583#[derive(Copy, Clone, Debug)]
584pub enum StreamResult {
585    /// The operation completed normally, and the producer or consumer may be
586    /// able to produce or consume more items, respectively.
587    Completed,
588    /// The operation was interrupted (i.e. it wrapped up early after receiving
589    /// a `finish` parameter value of true in a call to `poll_produce` or
590    /// `poll_consume`), and the producer or consumer may be able to produce or
591    /// consume more items, respectively.
592    Cancelled,
593    /// The operation completed normally, but the producer or consumer will
594    /// _not_ able to produce or consume more items, respectively.
595    Dropped,
596}
597
598/// Represents the host-owned write end of a stream.
599pub trait StreamProducer<D>: Send + 'static {
600    /// The payload type of this stream.
601    type Item;
602
603    /// The `WriteBuffer` type to use when delivering items.
604    type Buffer: WriteBuffer<Self::Item> + Default;
605
606    /// Handle a host- or guest-initiated read by delivering zero or more items
607    /// to the specified destination.
608    ///
609    /// This will be called whenever the reader starts a read.
610    ///
611    /// # Arguments
612    ///
613    /// * `self` - a `Pin`'d version of self to perform Rust-level
614    ///   future-related operations on.
615    /// * `cx` - a Rust-related [`Context`] which is passed to other
616    ///   future-related operations or used to acquire a waker.
617    /// * `store` - the Wasmtime store that this operation is happening within.
618    ///   Used, for example, to consult the state `D` associated with the store.
619    /// * `destination` - the location that items are to be written to.
620    /// * `finish` - a flag indicating whether the host should strive to
621    ///   immediately complete/cancel any pending operation. See below for more
622    ///   details.
623    ///
624    /// # Behavior
625    ///
626    /// If the implementation is able to produce one or more items immediately,
627    /// it should write them to `destination` and return either
628    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
629    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
630    /// any more items.
631    ///
632    /// If the implementation is unable to produce any items immediately, but
633    /// expects to do so later, and `finish` is _false_, it should store the
634    /// waker from `cx` for later and return `Poll::Pending` without writing
635    /// anything to `destination`.  Later, it should alert the waker when either
636    /// the items arrive, the stream has ended, or an error occurs.
637    ///
638    /// If more items are written to `destination` than the reader has immediate
639    /// capacity to accept, they will be retained in memory by the caller and
640    /// used to satisfy future reads, in which case `poll_produce` will only be
641    /// called again once all those items have been delivered.
642    ///
643    /// # Zero-length reads
644    ///
645    /// This function may be called with a zero-length capacity buffer
646    /// (i.e. `Destination::remaining` returns `Some(0)`). This indicates that
647    /// the guest wants to wait to see if an item is ready without actually
648    /// reading the item. For example think of a UNIX `poll` function run on a
649    /// TCP stream, seeing if it's readable without actually reading it.
650    ///
651    /// In this situation the host is allowed to either return immediately or
652    /// wait for readiness. Note that waiting for readiness is not always
653    /// possible. For example it's impossible to test if a Rust-native `Future`
654    /// is ready without actually reading the item. Stream-specific
655    /// optimizations, such as testing if a TCP stream is readable, may be
656    /// possible however.
657    ///
658    /// For a zero-length read, the host is allowed to:
659    ///
660    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
661    ///   anything if it expects to be able to produce items immediately (i.e.
662    ///   without first returning `Poll::Pending`) the next time `poll_produce`
663    ///   is called with non-zero capacity. This is the best-case scenario of
664    ///   fulfilling the guest's desire -- items aren't read/buffered but the
665    ///   host is saying it's ready when the guest is.
666    ///
667    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without actually
668    ///   testing for readiness. The guest doesn't know this yet, but the guest
669    ///   will realize that zero-length reads won't work on this stream when a
670    ///   subsequent nonzero read attempt is made which returns `Poll::Pending`
671    ///   here.
672    ///
673    /// - Return `Poll::Pending` if the host has performed necessary async work
674    ///   to wait for this stream to be readable without actually reading
675    ///   anything. This is also a best-case scenario where the host is letting
676    ///   the guest know that nothing is ready yet. Later the zero-length read
677    ///   will complete and then the guest will attempt a nonzero-length read to
678    ///   actually read some bytes.
679    ///
680    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
681    ///   `Destination::set_buffer` with one more more items. Note, however,
682    ///   that this creates the hazard that the items will never be received by
683    ///   the guest if it decides not to do another non-zero-length read before
684    ///   closing the stream.  Moreover, if `Self::Item` is e.g. a
685    ///   `Resource<_>`, they may end up leaking in that scenario. It is not
686    ///   recommended to do this and it's better to return
687    ///   `StreamResult::Completed` without buffering anything instead.
688    ///
689    /// For more discussion on zero-length reads see the [documentation in the
690    /// component-model repo itself][docs].
691    ///
692    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
693    ///
694    /// # Return
695    ///
696    /// This function can return a number of possible cases from this function:
697    ///
698    /// * `Poll::Pending` - this operation cannot complete at this time. The
699    ///   Rust-level `Future::poll` contract applies here where a waker should
700    ///   be stored from the `cx` argument and be arranged to receive a
701    ///   notification when this implementation can make progress. For example
702    ///   if you call `Future::poll` on a sub-future, that's enough. If items
703    ///   were written to `destination` then a trap in the guest will be raised.
704    ///
705    ///   Note that implementations should strive to avoid this return value
706    ///   when `finish` is `true`. In such a situation the guest is attempting
707    ///   to, for example, cancel a previous operation. By returning
708    ///   `Poll::Pending` the guest will be blocked during the cancellation
709    ///   request. If `finish` is `true` then `StreamResult::Cancelled` is
710    ///   favored to indicate that no items were read. If a short read happened,
711    ///   however, it's ok to return `StreamResult::Completed` indicating some
712    ///   items were read.
713    ///
714    /// * `Poll::Ok(StreamResult::Completed)` - items, if applicable, were
715    ///   written to the `destination`.
716    ///
717    /// * `Poll::Ok(StreamResult::Cancelled)` - used when `finish` is `true` and
718    ///   the implementation was able to successfully cancel any async work that
719    ///   a previous read kicked off, if any. The host should not buffer values
720    ///   received after returning `Cancelled` because the guest will not be
721    ///   aware of these values and the guest could close the stream after
722    ///   cancelling a read. Hosts should only return `Cancelled` when there are
723    ///   no more async operations in flight for a previous read.
724    ///
725    ///   If items were written to `destination` then a trap in the guest will
726    ///   be raised. If `finish` is `false` then this return value will raise a
727    ///   trap in the guest.
728    ///
729    /// * `Poll::Ok(StreamResult::Dropped)` - end-of-stream marker, indicating
730    ///   that this producer should not be polled again. Note that items may
731    ///   still be written to `destination`.
732    ///
733    /// # Errors
734    ///
735    /// The implementation may alternatively choose to return `Err(_)` to
736    /// indicate an unrecoverable error. This will cause the guest (if any) to
737    /// trap and render the component instance (if any) unusable. The
738    /// implementation should report errors that _are_ recoverable by other
739    /// means (e.g. by writing to a `future`) and return
740    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
741    fn poll_produce<'a>(
742        self: Pin<&mut Self>,
743        cx: &mut Context<'_>,
744        store: StoreContextMut<'a, D>,
745        destination: Destination<'a, Self::Item, Self::Buffer>,
746        finish: bool,
747    ) -> Poll<Result<StreamResult>>;
748
749    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
750    /// be downcast to the specified type.
751    ///
752    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
753    /// to the specified type is guaranteed to succeed.
754    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
755        Err(me)
756    }
757}
758
759impl<T, D> StreamProducer<D> for iter::Empty<T>
760where
761    T: Send + Sync + 'static,
762{
763    type Item = T;
764    type Buffer = Option<Self::Item>;
765
766    fn poll_produce<'a>(
767        self: Pin<&mut Self>,
768        _: &mut Context<'_>,
769        _: StoreContextMut<'a, D>,
770        _: Destination<'a, Self::Item, Self::Buffer>,
771        _: bool,
772    ) -> Poll<Result<StreamResult>> {
773        Poll::Ready(Ok(StreamResult::Dropped))
774    }
775}
776
777impl<T, D> StreamProducer<D> for stream::Empty<T>
778where
779    T: Send + Sync + 'static,
780{
781    type Item = T;
782    type Buffer = Option<Self::Item>;
783
784    fn poll_produce<'a>(
785        self: Pin<&mut Self>,
786        _: &mut Context<'_>,
787        _: StoreContextMut<'a, D>,
788        _: Destination<'a, Self::Item, Self::Buffer>,
789        _: bool,
790    ) -> Poll<Result<StreamResult>> {
791        Poll::Ready(Ok(StreamResult::Dropped))
792    }
793}
794
795impl<T, D> StreamProducer<D> for Vec<T>
796where
797    T: Unpin + Send + Sync + 'static,
798{
799    type Item = T;
800    type Buffer = VecBuffer<T>;
801
802    fn poll_produce<'a>(
803        self: Pin<&mut Self>,
804        _: &mut Context<'_>,
805        _: StoreContextMut<'a, D>,
806        mut dst: Destination<'a, Self::Item, Self::Buffer>,
807        _: bool,
808    ) -> Poll<Result<StreamResult>> {
809        dst.set_buffer(mem::take(self.get_mut()).into());
810        Poll::Ready(Ok(StreamResult::Dropped))
811    }
812}
813
814impl<T, D> StreamProducer<D> for Box<[T]>
815where
816    T: Unpin + Send + Sync + 'static,
817{
818    type Item = T;
819    type Buffer = VecBuffer<T>;
820
821    fn poll_produce<'a>(
822        self: Pin<&mut Self>,
823        _: &mut Context<'_>,
824        _: StoreContextMut<'a, D>,
825        mut dst: Destination<'a, Self::Item, Self::Buffer>,
826        _: bool,
827    ) -> Poll<Result<StreamResult>> {
828        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
829        Poll::Ready(Ok(StreamResult::Dropped))
830    }
831}
832
833#[cfg(feature = "component-model-bytes")]
834impl<D> StreamProducer<D> for bytes::Bytes {
835    type Item = u8;
836    type Buffer = Self;
837
838    fn poll_produce<'a>(
839        self: Pin<&mut Self>,
840        _: &mut Context<'_>,
841        _store: StoreContextMut<'a, D>,
842        mut dst: Destination<'a, Self::Item, Self::Buffer>,
843        _: bool,
844    ) -> Poll<Result<StreamResult>> {
845        dst.set_buffer(mem::take(self.get_mut()));
846        Poll::Ready(Ok(StreamResult::Dropped))
847    }
848}
849
850#[cfg(feature = "component-model-bytes")]
851impl<D> StreamProducer<D> for bytes::BytesMut {
852    type Item = u8;
853    type Buffer = Self;
854
855    fn poll_produce<'a>(
856        self: Pin<&mut Self>,
857        _: &mut Context<'_>,
858        _store: StoreContextMut<'a, D>,
859        mut dst: Destination<'a, Self::Item, Self::Buffer>,
860        _: bool,
861    ) -> Poll<Result<StreamResult>> {
862        dst.set_buffer(mem::take(self.get_mut()));
863        Poll::Ready(Ok(StreamResult::Dropped))
864    }
865}
866
867/// Represents the buffer for a host- or guest-initiated stream write.
868pub struct Source<'a, T> {
869    id: TableId<TransmitState>,
870    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
871}
872
873impl<'a, T> Source<'a, T> {
874    /// Reborrow `self` so it can be used again later.
875    pub fn reborrow(&mut self) -> Source<'_, T> {
876        Source {
877            id: self.id,
878            host_buffer: self.host_buffer.as_deref_mut(),
879        }
880    }
881
882    /// Accept zero or more items from the writer.
883    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
884    where
885        T: func::Lift + 'static,
886        B: ReadBuffer<T>,
887    {
888        if let Some(input) = &mut self.host_buffer {
889            let count = input.remaining().len().min(buffer.remaining_capacity());
890            buffer.move_from(*input, count);
891        } else {
892            let store = store.as_context_mut();
893            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
894
895            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
896                bail_bug!("expected ReadState::HostReady");
897            };
898
899            let &WriteState::GuestReady {
900                ty,
901                address,
902                count,
903                options,
904                instance,
905                ..
906            } = &transmit.write
907            else {
908                bail_bug!("expected WriteState::GuestReady");
909            };
910
911            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
912            let ty = ty.payload(cx.types);
913            let old_remaining = buffer.remaining_capacity();
914            lift::<T, B>(
915                cx,
916                ty.copied(),
917                buffer,
918                address + (T::SIZE32 * guest_offset.as_usize()),
919                count.as_usize() - guest_offset.as_usize(),
920            )?;
921
922            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
923
924            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
925                bail_bug!("expected ReadState::HostReady");
926            };
927
928            guest_offset.inc(old_remaining - buffer.remaining_capacity())?;
929        }
930
931        Ok(())
932    }
933
934    /// Return the number of items remaining to be read from the current write
935    /// operation.
936    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
937    where
938        T: 'static,
939    {
940        // Note that this unwrap should only trigger for bugs in Wasmtime, and
941        // this is modeled here to centralize the `.unwrap()` for this method in
942        // one location.
943        self.remaining_(store.as_context_mut().0).unwrap()
944    }
945
946    fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
947    where
948        T: 'static,
949    {
950        let transmit = store.concurrent_state_mut().get_mut(self.id)?;
951
952        if let &WriteState::GuestReady { count, .. } = &transmit.write {
953            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
954                bail_bug!("expected ReadState::HostReady")
955            };
956
957            Ok(count.as_usize() - guest_offset.as_usize())
958        } else if let Some(host_buffer) = &self.host_buffer {
959            Ok(host_buffer.remaining().len())
960        } else {
961            bail_bug!("expected either WriteState::GuestReady or host buffer")
962        }
963    }
964}
965
966impl<'a> Source<'a, u8> {
967    /// Return a `DirectSource` view of `self`.
968    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
969        DirectSource {
970            id: self.id,
971            host_buffer: self.host_buffer,
972            store,
973        }
974    }
975}
976
977/// Represents a write to a `stream<u8>`, providing direct access to the
978/// writer's buffer.
979pub struct DirectSource<'a, D: 'static> {
980    id: TableId<TransmitState>,
981    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
982    store: StoreContextMut<'a, D>,
983}
984
985#[cfg(feature = "std")]
986impl<D: 'static> std::io::Read for DirectSource<'_, D> {
987    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
988        let rem = self.remaining();
989        let n = rem.len().min(buf.len());
990        buf[..n].copy_from_slice(&rem[..n]);
991        self.mark_read(n);
992        Ok(n)
993    }
994}
995
996impl<D: 'static> DirectSource<'_, D> {
997    /// Provide direct access to the writer's buffer.
998    pub fn remaining(&mut self) -> &[u8] {
999        // Note that this unwrap should only trigger for bugs in Wasmtime, and
1000        // this is modeled here to centralize the `.unwrap()` for this method in
1001        // one location.
1002        self.remaining_().unwrap()
1003    }
1004
1005    fn remaining_(&mut self) -> Result<&[u8]> {
1006        if let Some(buffer) = self.host_buffer.as_deref_mut() {
1007            return Ok(buffer.remaining());
1008        }
1009        let transmit = self
1010            .store
1011            .as_context_mut()
1012            .0
1013            .concurrent_state_mut()
1014            .get_mut(self.id)?;
1015
1016        let &WriteState::GuestReady {
1017            address,
1018            count,
1019            options,
1020            instance,
1021            ..
1022        } = &transmit.write
1023        else {
1024            bail_bug!("expected WriteState::GuestReady")
1025        };
1026
1027        let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
1028            bail_bug!("expected ReadState::HostReady")
1029        };
1030
1031        let memory = instance
1032            .options_memory(self.store.0, options)
1033            .get((address + guest_offset.as_usize())..)
1034            .and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize())));
1035        match memory {
1036            Some(memory) => Ok(memory),
1037            None => bail_bug!("guest buffer unexpectedly out of bounds"),
1038        }
1039    }
1040
1041    /// Mark the specified number of bytes as read from the writer's buffer.
1042    ///
1043    /// # Panics
1044    ///
1045    /// This will panic if the count is larger than the size of the buffer
1046    /// returned by `Self::remaining`.
1047    pub fn mark_read(&mut self, count: usize) {
1048        // Note that this unwrap should only trigger for bugs in Wasmtime, and
1049        // this is modeled here to centralize the `.unwrap()` for this method in
1050        // one location.
1051        self.mark_read_(count).unwrap()
1052    }
1053
1054    fn mark_read_(&mut self, count: usize) -> Result<()> {
1055        if let Some(buffer) = self.host_buffer.as_deref_mut() {
1056            buffer.skip(count);
1057            return Ok(());
1058        }
1059
1060        let transmit = self
1061            .store
1062            .as_context_mut()
1063            .0
1064            .concurrent_state_mut()
1065            .get_mut(self.id)?;
1066
1067        let WriteState::GuestReady {
1068            count: write_count, ..
1069        } = &transmit.write
1070        else {
1071            bail_bug!("expected WriteState::GuestReady");
1072        };
1073
1074        let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
1075            bail_bug!("expected ReadState::HostReady");
1076        };
1077
1078        if guest_offset.as_usize() + count > write_count.as_usize() {
1079            // Note that this is a documented panic condition of `mark_read`.
1080            panic!("read count ({count}) must be less than or equal to write count ({write_count})")
1081        } else {
1082            guest_offset.inc(count)?;
1083        }
1084        Ok(())
1085    }
1086}
1087
1088/// Represents the host-owned read end of a stream.
1089pub trait StreamConsumer<D>: Send + 'static {
1090    /// The payload type of this stream.
1091    type Item;
1092
1093    /// Handle a host- or guest-initiated write by accepting zero or more items
1094    /// from the specified source.
1095    ///
1096    /// This will be called whenever the writer starts a write.
1097    ///
1098    /// If the implementation is able to consume one or more items immediately,
1099    /// it should take them from `source` and return either
1100    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
1101    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
1102    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
1103    /// indicate that the caller should delay sending a `COMPLETED` event to the
1104    /// writer until a later call to this function returns `Poll::Ready(_)`.
1105    /// For more about that, see the `Backpressure` section below.
1106    ///
1107    /// If the implementation cannot consume any items immediately and `finish`
1108    /// is _false_, it should store the waker from `cx` for later and return
1109    /// `Poll::Pending` without writing anything to `destination`.  Later, it
1110    /// should alert the waker when either (1) the items arrive, (2) the stream
1111    /// has ended, or (3) an error occurs.
1112    ///
1113    /// If the implementation cannot consume any items immediately and `finish`
1114    /// is _true_, it should, if possible, return
1115    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
1116    /// anything from `source`.  However, that might not be possible if an
1117    /// earlier call to `poll_consume` kicked off an asynchronous operation
1118    /// which needs to be completed (and possibly interrupted) gracefully, in
1119    /// which case the implementation may return `Poll::Pending` and later alert
1120    /// the waker as described above.  In other words, when `finish` is true,
1121    /// the implementation should prioritize returning a result to the reader
1122    /// (even if no items can be consumed) rather than wait indefinitely for at
1123    /// capacity to free up.
1124    ///
1125    /// In all of the above cases, the implementation may alternatively choose
1126    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
1127    /// the guest (if any) to trap and render the component instance (if any)
1128    /// unusable.  The implementation should report errors that _are_
1129    /// recoverable by other means (e.g. by writing to a `future`) and return
1130    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
1131    ///
1132    /// Note that the implementation should only return
1133    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
1134    /// items from `source` if called with `finish` set to true.  If it does so
1135    /// when `finish` is false, the caller will trap.  Additionally, it should
1136    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
1137    /// least one item from `source` if there is an item available; otherwise,
1138    /// the caller will trap.  If `poll_consume` is called with no items in
1139    /// `source`, it should only return `Poll::Ready(_)` once it is able to
1140    /// accept at least one item during the next call to `poll_consume`.
1141    ///
1142    /// Note that any items which the implementation of this trait takes from
1143    /// `source` become the responsibility of that implementation.  For that
1144    /// reason, an implementation which forwards items to an upstream sink
1145    /// should reserve capacity in that sink before taking items out of
1146    /// `source`, if possible.  Alternatively, it might buffer items which can't
1147    /// be forwarded immediately and send them once capacity is freed up.
1148    ///
1149    /// ## Backpressure
1150    ///
1151    /// As mentioned above, an implementation might choose to return
1152    /// `Poll::Pending` after taking items from `source`, which tells the caller
1153    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
1154    /// a form of backpressure when the items are forwarded to an upstream sink
1155    /// asynchronously.  Note, however, that it's not possible to "put back"
1156    /// items into `source` once they've been taken out, so if the upstream sink
1157    /// is unable to accept all the items, that cannot be communicated to the
1158    /// writer at this level of abstraction.  Just as with application-specific,
1159    /// recoverable errors, information about which items could be forwarded and
1160    /// which could not must be communicated out-of-band, e.g. by writing to an
1161    /// application-specific `future`.
1162    ///
1163    /// Similarly, if the writer cancels the write after items have been taken
1164    /// from `source` but before the items have all been forwarded to an
1165    /// upstream sink, `poll_consume` will be called with `finish` set to true,
1166    /// and the implementation may either:
1167    ///
1168    /// - Interrupt the forwarding process gracefully.  This may be preferable
1169    /// if there is an out-of-band channel for communicating to the writer how
1170    /// many items were forwarded before being interrupted.
1171    ///
1172    /// - Allow the forwarding to complete without interrupting it.  This is
1173    /// usually preferable if there's no out-of-band channel for reporting back
1174    /// to the writer how many items were forwarded.
1175    fn poll_consume(
1176        self: Pin<&mut Self>,
1177        cx: &mut Context<'_>,
1178        store: StoreContextMut<D>,
1179        source: Source<'_, Self::Item>,
1180        finish: bool,
1181    ) -> Poll<Result<StreamResult>>;
1182}
1183
1184/// Represents a host-owned write end of a future.
1185pub trait FutureProducer<D>: Send + 'static {
1186    /// The payload type of this future.
1187    type Item;
1188
1189    /// Handle a host- or guest-initiated read by producing a value.
1190    ///
1191    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1192    /// simplified interface for futures.
1193    ///
1194    /// If `finish` is true, the implementation may return
1195    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
1196    /// could produce a value.  Otherwise, it must either return
1197    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
1198    fn poll_produce(
1199        self: Pin<&mut Self>,
1200        cx: &mut Context<'_>,
1201        store: StoreContextMut<D>,
1202        finish: bool,
1203    ) -> Poll<Result<Option<Self::Item>>>;
1204}
1205
1206impl<T, E, D, Fut> FutureProducer<D> for Fut
1207where
1208    E: Into<Error>,
1209    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1210{
1211    type Item = T;
1212
1213    fn poll_produce<'a>(
1214        self: Pin<&mut Self>,
1215        cx: &mut Context<'_>,
1216        _: StoreContextMut<'a, D>,
1217        finish: bool,
1218    ) -> Poll<Result<Option<T>>> {
1219        match self.poll(cx) {
1220            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1221            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1222            Poll::Pending if finish => Poll::Ready(Ok(None)),
1223            Poll::Pending => Poll::Pending,
1224        }
1225    }
1226}
1227
1228/// Represents a host-owned read end of a future.
1229pub trait FutureConsumer<D>: Send + 'static {
1230    /// The payload type of this future.
1231    type Item;
1232
1233    /// Handle a host- or guest-initiated write by consuming a value.
1234    ///
1235    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1236    /// simplified interface for futures.
1237    ///
1238    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
1239    /// without taking the item from `source`, which indicates the operation was
1240    /// canceled before it could consume the value.  Otherwise, it must either
1241    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
1242    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
1243    /// the item).
1244    fn poll_consume(
1245        self: Pin<&mut Self>,
1246        cx: &mut Context<'_>,
1247        store: StoreContextMut<D>,
1248        source: Source<'_, Self::Item>,
1249        finish: bool,
1250    ) -> Poll<Result<()>>;
1251}
1252
1253/// Represents the readable end of a Component Model `future`.
1254///
1255/// Note that `FutureReader` instances must be disposed of using either `pipe`
1256/// or `close`; otherwise the in-store representation will leak and the writer
1257/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1258/// ensure that disposal happens automatically.
1259pub struct FutureReader<T> {
1260    id: TableId<TransmitHandle>,
1261    _phantom: PhantomData<T>,
1262}
1263
1264impl<T> FutureReader<T> {
1265    /// Create a new future with the specified producer.
1266    ///
1267    /// # Errors
1268    ///
1269    /// Returns an error if the resource table for this store is full or if
1270    /// [`Config::concurrency_support`] is not enabled.
1271    ///
1272    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1273    pub fn new<S: AsContextMut>(
1274        mut store: S,
1275        producer: impl FutureProducer<S::Data, Item = T>,
1276    ) -> Result<Self>
1277    where
1278        T: func::Lower + func::Lift + Send + Sync + 'static,
1279    {
1280        ensure!(
1281            store.as_context().0.concurrency_support(),
1282            "concurrency support is not enabled"
1283        );
1284
1285        struct Producer<P>(P);
1286
1287        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1288            for Producer<P>
1289        {
1290            type Item = P::Item;
1291            type Buffer = Option<P::Item>;
1292
1293            fn poll_produce<'a>(
1294                self: Pin<&mut Self>,
1295                cx: &mut Context<'_>,
1296                store: StoreContextMut<D>,
1297                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1298                finish: bool,
1299            ) -> Poll<Result<StreamResult>> {
1300                // SAFETY: This is a standard pin-projection, and we never move
1301                // out of `self`.
1302                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1303
1304                Poll::Ready(Ok(
1305                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1306                        destination.set_buffer(Some(value));
1307
1308                        // Here we return `StreamResult::Completed` even though
1309                        // we've produced the last item we'll ever produce.
1310                        // That's because the ABI expects
1311                        // `ReturnCode::Completed(1)` rather than
1312                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1313                        // called again since the future will have resolved.
1314                        StreamResult::Completed
1315                    } else {
1316                        StreamResult::Cancelled
1317                    },
1318                ))
1319            }
1320        }
1321
1322        Ok(Self::new_(
1323            store
1324                .as_context_mut()
1325                .new_transmit(TransmitKind::Future, Producer(producer))?,
1326        ))
1327    }
1328
1329    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1330        Self {
1331            id,
1332            _phantom: PhantomData,
1333        }
1334    }
1335
1336    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1337        self.id
1338    }
1339
1340    /// Set the consumer that accepts the result of this future.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if this future has already been closed.
1345    ///
1346    /// # Panics
1347    ///
1348    /// Panics if this future does not belong to `store`.
1349    pub fn pipe<S: AsContextMut>(
1350        self,
1351        mut store: S,
1352        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1353    ) -> Result<()>
1354    where
1355        T: func::Lift + 'static,
1356    {
1357        struct Consumer<C>(C);
1358
1359        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1360            for Consumer<C>
1361        {
1362            type Item = T;
1363
1364            fn poll_consume(
1365                self: Pin<&mut Self>,
1366                cx: &mut Context<'_>,
1367                mut store: StoreContextMut<D>,
1368                mut source: Source<Self::Item>,
1369                finish: bool,
1370            ) -> Poll<Result<StreamResult>> {
1371                // SAFETY: This is a standard pin-projection, and we never move
1372                // out of `self`.
1373                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1374
1375                ready!(consumer.poll_consume(
1376                    cx,
1377                    store.as_context_mut(),
1378                    source.reborrow(),
1379                    finish
1380                ))?;
1381
1382                Poll::Ready(Ok(if source.remaining(store) == 0 {
1383                    // Here we return `StreamResult::Completed` even though
1384                    // we've consumed the last item we'll ever consume.  That's
1385                    // because the ABI expects `ReturnCode::Completed(1)` rather
1386                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1387                    // called again since the future will have resolved.
1388                    StreamResult::Completed
1389                } else {
1390                    StreamResult::Cancelled
1391                }))
1392            }
1393        }
1394
1395        store
1396            .as_context_mut()
1397            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1398    }
1399
1400    /// Transfer ownership of the read end of a future from a guest to the host.
1401    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1402        let id = lift_index_to_future(cx, ty, index)?;
1403        Ok(Self::new_(id))
1404    }
1405
1406    /// Close this `FutureReader`.
1407    ///
1408    /// This will close this half of the future which will signal to a pending
1409    /// write, if any, that the reader side is dropped. If the writer half has
1410    /// not yet written a value then when it attempts to write a value it will
1411    /// see that this end is closed.
1412    ///
1413    /// # Errors
1414    ///
1415    /// Returns an error if this future has already been closed.
1416    ///
1417    /// # Panics
1418    ///
1419    /// Panics if the store that the [`Accessor`] is derived from does not own
1420    /// this future.
1421    ///
1422    /// [`Accessor`]: crate::component::Accessor
1423    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1424        future_close(store.as_context_mut().0, &mut self.id)
1425    }
1426
1427    /// Convenience method around [`Self::close`].
1428    pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1429        accessor.as_accessor().with(|access| self.close(access))
1430    }
1431
1432    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1433    /// drop and clean it up from the store.
1434    ///
1435    /// Note that the `accessor` provided must own this future and is
1436    /// additionally transferred to the `GuardedFutureReader` return value.
1437    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1438    where
1439        A: AsAccessor,
1440    {
1441        GuardedFutureReader::new(accessor, self)
1442    }
1443
1444    /// Attempts to convert this [`FutureReader<T>`] to a [`FutureAny`].
1445    ///
1446    /// # Errors
1447    ///
1448    /// This function will return an error if `self` does not belong to
1449    /// `store`.
1450    pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1451    where
1452        T: ComponentType + 'static,
1453    {
1454        FutureAny::try_from_future_reader(store, self)
1455    }
1456
1457    /// Attempts to convert a [`FutureAny`] into a [`FutureReader<T>`].
1458    ///
1459    /// # Errors
1460    ///
1461    /// This function will fail if `T` doesn't match the type of the value that
1462    /// `future` is sending.
1463    pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1464    where
1465        T: ComponentType + 'static,
1466    {
1467        future.try_into_future_reader()
1468    }
1469}
1470
1471impl<T> fmt::Debug for FutureReader<T> {
1472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1473        f.debug_struct("FutureReader")
1474            .field("id", &self.id)
1475            .finish()
1476    }
1477}
1478
1479pub(super) fn future_close(
1480    store: &mut StoreOpaque,
1481    id: &mut TableId<TransmitHandle>,
1482) -> Result<()> {
1483    let id = mem::replace(id, TableId::new(u32::MAX));
1484    store.host_drop_reader(id, TransmitKind::Future)
1485}
1486
1487/// Transfer ownership of the read end of a future from the host to a guest.
1488pub(super) fn lift_index_to_future(
1489    cx: &mut LiftContext<'_>,
1490    ty: InterfaceType,
1491    index: u32,
1492) -> Result<TableId<TransmitHandle>> {
1493    match ty {
1494        InterfaceType::Future(src) => {
1495            let (state, instance) = cx.concurrent_state_and_instance_mut();
1496            lift_index_to_transmit(instance, state, TransmitIndex::Future(src), index)
1497        }
1498        _ => func::bad_type_info(),
1499    }
1500}
1501
1502/// Transfer ownership of the read end of a future from the host to a guest.
1503pub(super) fn lower_future_to_index<U>(
1504    id: TableId<TransmitHandle>,
1505    cx: &mut LowerContext<'_, U>,
1506    ty: InterfaceType,
1507) -> Result<u32> {
1508    match ty {
1509        InterfaceType::Future(dst) => {
1510            cx.instance_handle()
1511                .lower_transmit_to_index(cx.store.0, TransmitIndex::Future(dst), id)
1512        }
1513        _ => func::bad_type_info(),
1514    }
1515}
1516
1517// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1518// safe and correct since we lift and lower future handles as `u32`s.
1519unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1520    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1521
1522    type Lower = <u32 as func::ComponentType>::Lower;
1523
1524    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1525        match ty {
1526            InterfaceType::Future(ty) => {
1527                let ty = types.types[*ty].ty;
1528                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1529            }
1530            other => bail!("expected `future`, found `{}`", func::desc(other)),
1531        }
1532    }
1533}
1534
1535// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1536unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1537    fn linear_lower_to_flat<U>(
1538        &self,
1539        cx: &mut LowerContext<'_, U>,
1540        ty: InterfaceType,
1541        dst: &mut MaybeUninit<Self::Lower>,
1542    ) -> Result<()> {
1543        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1544    }
1545
1546    fn linear_lower_to_memory<U>(
1547        &self,
1548        cx: &mut LowerContext<'_, U>,
1549        ty: InterfaceType,
1550        offset: usize,
1551    ) -> Result<()> {
1552        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1553            cx,
1554            InterfaceType::U32,
1555            offset,
1556        )
1557    }
1558}
1559
1560// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1561unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1562    fn linear_lift_from_flat(
1563        cx: &mut LiftContext<'_>,
1564        ty: InterfaceType,
1565        src: &Self::Lower,
1566    ) -> Result<Self> {
1567        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1568        Self::lift_from_index(cx, ty, index)
1569    }
1570
1571    fn linear_lift_from_memory(
1572        cx: &mut LiftContext<'_>,
1573        ty: InterfaceType,
1574        bytes: &[u8],
1575    ) -> Result<Self> {
1576        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1577        Self::lift_from_index(cx, ty, index)
1578    }
1579}
1580
1581/// A [`FutureReader`] paired with an [`Accessor`].
1582///
1583/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1584/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1585/// [`FutureReader::guard`].
1586///
1587/// [`Accessor`]: crate::component::Accessor
1588pub struct GuardedFutureReader<T, A>
1589where
1590    A: AsAccessor,
1591{
1592    // This field is `None` to implement the conversion from this guard back to
1593    // `FutureReader`. When `None` is seen in the destructor it will cause the
1594    // destructor to do nothing.
1595    reader: Option<FutureReader<T>>,
1596    accessor: A,
1597}
1598
1599impl<T, A> GuardedFutureReader<T, A>
1600where
1601    A: AsAccessor,
1602{
1603    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1604    ///
1605    /// # Panics
1606    ///
1607    /// Panics if [`Config::concurrency_support`] is not enabled.
1608    ///
1609    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1610    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1611        assert!(
1612            accessor
1613                .as_accessor()
1614                .with(|a| a.as_context().0.concurrency_support())
1615        );
1616        Self {
1617            reader: Some(reader),
1618            accessor,
1619        }
1620    }
1621
1622    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1623    /// back.
1624    pub fn into_future(self) -> FutureReader<T> {
1625        self.into()
1626    }
1627}
1628
1629impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1630where
1631    A: AsAccessor,
1632{
1633    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1634        guard.reader.take().unwrap()
1635    }
1636}
1637
1638impl<T, A> Drop for GuardedFutureReader<T, A>
1639where
1640    A: AsAccessor,
1641{
1642    fn drop(&mut self) {
1643        if let Some(reader) = &mut self.reader {
1644            // Currently this can only fail if the future is closed twice, which
1645            // this guard prevents, so this error shouldn't happen.
1646            let result = reader.close_with(&self.accessor);
1647            debug_assert!(result.is_ok());
1648        }
1649    }
1650}
1651
1652/// Represents the readable end of a Component Model `stream`.
1653///
1654/// Note that `StreamReader` instances must be disposed of using `close`;
1655/// otherwise the in-store representation will leak and the writer end will hang
1656/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1657/// disposal happens automatically.
1658pub struct StreamReader<T> {
1659    id: TableId<TransmitHandle>,
1660    _phantom: PhantomData<T>,
1661}
1662
1663impl<T> StreamReader<T> {
1664    /// Create a new stream with the specified producer.
1665    ///
1666    /// # Errors
1667    ///
1668    /// Returns an error if the resource table for this store is full or if
1669    /// [`Config::concurrency_support`] is not enabled.
1670    ///
1671    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1672    pub fn new<S: AsContextMut>(
1673        mut store: S,
1674        producer: impl StreamProducer<S::Data, Item = T>,
1675    ) -> Result<Self>
1676    where
1677        T: func::Lower + func::Lift + Send + Sync + 'static,
1678    {
1679        ensure!(
1680            store.as_context().0.concurrency_support(),
1681            "concurrency support is not enabled",
1682        );
1683        Ok(Self::new_(
1684            store
1685                .as_context_mut()
1686                .new_transmit(TransmitKind::Stream, producer)?,
1687        ))
1688    }
1689
1690    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1691        Self {
1692            id,
1693            _phantom: PhantomData,
1694        }
1695    }
1696
1697    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1698        self.id
1699    }
1700
1701    /// Attempt to consume this object by converting it into the specified type.
1702    ///
1703    /// This can be useful for "short-circuiting" host-to-host streams,
1704    /// bypassing the guest entirely.  For example, if a guest task returns a
1705    /// host-created stream and then exits, this function may be used to
1706    /// retrieve the write end, after which the guest instance and store may be
1707    /// disposed of if no longer needed.
1708    ///
1709    /// This will return `Ok(_)` if and only if the following conditions are
1710    /// met:
1711    ///
1712    /// - The stream was created by the host (i.e. not by the guest).
1713    ///
1714    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1715    /// producer provided to `StreamReader::new` when the stream was created,
1716    /// along with `TypeId::of::<V>()`.
1717    ///
1718    /// # Panics
1719    ///
1720    /// Panics if this stream has already been closed, or if this stream doesn't
1721    /// belong to the specified `store`.
1722    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1723        let store = store.as_context_mut();
1724        let state = store.0.concurrent_state_mut();
1725        let id = state.get_mut(self.id).unwrap().state;
1726        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1727            match try_into(TypeId::of::<V>()) {
1728                Some(result) => {
1729                    self.close(store).unwrap();
1730                    Ok(*result.downcast::<V>().unwrap())
1731                }
1732                None => Err(self),
1733            }
1734        } else {
1735            Err(self)
1736        }
1737    }
1738
1739    /// Set the consumer that accepts the items delivered to this stream.
1740    ///
1741    /// # Errors
1742    ///
1743    /// Returns an error if this stream has already been closed.
1744    ///
1745    /// # Panics
1746    ///
1747    /// Panics if this stream does not belong to `store`.
1748    pub fn pipe<S: AsContextMut>(
1749        self,
1750        mut store: S,
1751        consumer: impl StreamConsumer<S::Data, Item = T>,
1752    ) -> Result<()>
1753    where
1754        T: 'static,
1755    {
1756        store
1757            .as_context_mut()
1758            .set_consumer(self.id, TransmitKind::Stream, consumer)
1759    }
1760
1761    /// Transfer ownership of the read end of a stream from a guest to the host.
1762    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1763        let id = lift_index_to_stream(cx, ty, index)?;
1764        Ok(Self::new_(id))
1765    }
1766
1767    /// Close this `StreamReader`.
1768    ///
1769    /// This will signal that this portion of the stream is closed causing all
1770    /// future writes to return immediately with "DROPPED".
1771    ///
1772    /// # Errors
1773    ///
1774    /// Returns an error if this stream has already been closed.
1775    ///
1776    /// # Panics
1777    ///
1778    /// Panics if the store that the [`Accessor`] is derived from does not own
1779    /// this stream.
1780    ///
1781    /// [`Accessor`]: crate::component::Accessor
1782    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1783        stream_close(store.as_context_mut().0, &mut self.id)
1784    }
1785
1786    /// Convenience method around [`Self::close`].
1787    pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1788        accessor.as_accessor().with(|access| self.close(access))
1789    }
1790
1791    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1792    /// drop and clean it up from the store.
1793    ///
1794    /// Note that the `accessor` provided must own this future and is
1795    /// additionally transferred to the `GuardedStreamReader` return value.
1796    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1797    where
1798        A: AsAccessor,
1799    {
1800        GuardedStreamReader::new(accessor, self)
1801    }
1802
1803    /// Attempts to convert this [`StreamReader<T>`] to a [`StreamAny`].
1804    ///
1805    /// # Errors
1806    ///
1807    /// This function will return an error if `self` does not belong to
1808    /// `store`.
1809    pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1810    where
1811        T: ComponentType + 'static,
1812    {
1813        StreamAny::try_from_stream_reader(store, self)
1814    }
1815
1816    /// Attempts to convert a [`StreamAny`] into a [`StreamReader<T>`].
1817    ///
1818    /// # Errors
1819    ///
1820    /// This function will fail if `T` doesn't match the type of the value that
1821    /// `stream` is sending.
1822    pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1823    where
1824        T: ComponentType + 'static,
1825    {
1826        stream.try_into_stream_reader()
1827    }
1828}
1829
1830impl<T> fmt::Debug for StreamReader<T> {
1831    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1832        f.debug_struct("StreamReader")
1833            .field("id", &self.id)
1834            .finish()
1835    }
1836}
1837
1838pub(super) fn stream_close(
1839    store: &mut StoreOpaque,
1840    id: &mut TableId<TransmitHandle>,
1841) -> Result<()> {
1842    let id = mem::replace(id, TableId::new(u32::MAX));
1843    store.host_drop_reader(id, TransmitKind::Stream)
1844}
1845
1846/// Transfer ownership of the read end of a stream from a guest to the host.
1847pub(super) fn lift_index_to_stream(
1848    cx: &mut LiftContext<'_>,
1849    ty: InterfaceType,
1850    index: u32,
1851) -> Result<TableId<TransmitHandle>> {
1852    match ty {
1853        InterfaceType::Stream(src) => {
1854            let (state, instance) = cx.concurrent_state_and_instance_mut();
1855            lift_index_to_transmit(instance, state, TransmitIndex::Stream(src), index)
1856        }
1857        _ => func::bad_type_info(),
1858    }
1859}
1860
1861/// Transfer ownership of the read end of a stream from the host to a guest.
1862pub(super) fn lower_stream_to_index<U>(
1863    id: TableId<TransmitHandle>,
1864    cx: &mut LowerContext<'_, U>,
1865    ty: InterfaceType,
1866) -> Result<u32> {
1867    match ty {
1868        InterfaceType::Stream(dst) => {
1869            cx.instance_handle()
1870                .lower_transmit_to_index(cx.store.0, TransmitIndex::Stream(dst), id)
1871        }
1872        _ => func::bad_type_info(),
1873    }
1874}
1875
1876// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1877// safe and correct since we lift and lower stream handles as `u32`s.
1878unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1879    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1880
1881    type Lower = <u32 as func::ComponentType>::Lower;
1882
1883    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1884        match ty {
1885            InterfaceType::Stream(ty) => {
1886                let ty = types.types[*ty].ty;
1887                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1888            }
1889            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1890        }
1891    }
1892}
1893
1894// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1895unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1896    fn linear_lower_to_flat<U>(
1897        &self,
1898        cx: &mut LowerContext<'_, U>,
1899        ty: InterfaceType,
1900        dst: &mut MaybeUninit<Self::Lower>,
1901    ) -> Result<()> {
1902        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1903    }
1904
1905    fn linear_lower_to_memory<U>(
1906        &self,
1907        cx: &mut LowerContext<'_, U>,
1908        ty: InterfaceType,
1909        offset: usize,
1910    ) -> Result<()> {
1911        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1912            cx,
1913            InterfaceType::U32,
1914            offset,
1915        )
1916    }
1917}
1918
1919// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1920unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1921    fn linear_lift_from_flat(
1922        cx: &mut LiftContext<'_>,
1923        ty: InterfaceType,
1924        src: &Self::Lower,
1925    ) -> Result<Self> {
1926        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1927        Self::lift_from_index(cx, ty, index)
1928    }
1929
1930    fn linear_lift_from_memory(
1931        cx: &mut LiftContext<'_>,
1932        ty: InterfaceType,
1933        bytes: &[u8],
1934    ) -> Result<Self> {
1935        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1936        Self::lift_from_index(cx, ty, index)
1937    }
1938}
1939
1940/// A [`StreamReader`] paired with an [`Accessor`].
1941///
1942/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1943/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1944/// [`StreamReader::guard`].
1945///
1946/// [`Accessor`]: crate::component::Accessor
1947pub struct GuardedStreamReader<T, A>
1948where
1949    A: AsAccessor,
1950{
1951    // This field is `None` to implement the conversion from this guard back to
1952    // `StreamReader`. When `None` is seen in the destructor it will cause the
1953    // destructor to do nothing.
1954    reader: Option<StreamReader<T>>,
1955    accessor: A,
1956}
1957
1958impl<T, A> GuardedStreamReader<T, A>
1959where
1960    A: AsAccessor,
1961{
1962    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1963    /// `reader`.
1964    ///
1965    /// # Panics
1966    ///
1967    /// Panics if [`Config::concurrency_support`] is not enabled.
1968    ///
1969    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1970    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1971        assert!(
1972            accessor
1973                .as_accessor()
1974                .with(|a| a.as_context().0.concurrency_support())
1975        );
1976        Self {
1977            reader: Some(reader),
1978            accessor,
1979        }
1980    }
1981
1982    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1983    /// back.
1984    pub fn into_stream(self) -> StreamReader<T> {
1985        self.into()
1986    }
1987}
1988
1989impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1990where
1991    A: AsAccessor,
1992{
1993    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1994        guard.reader.take().unwrap()
1995    }
1996}
1997
1998impl<T, A> Drop for GuardedStreamReader<T, A>
1999where
2000    A: AsAccessor,
2001{
2002    fn drop(&mut self) {
2003        if let Some(reader) = &mut self.reader {
2004            // Currently this can only fail if the future is closed twice, which
2005            // this guard prevents, so this error shouldn't happen.
2006            let result = reader.close_with(&self.accessor);
2007            debug_assert!(result.is_ok());
2008        }
2009    }
2010}
2011
2012/// Represents a Component Model `error-context`.
2013pub struct ErrorContext {
2014    rep: u32,
2015}
2016
2017impl ErrorContext {
2018    pub(crate) fn new(rep: u32) -> Self {
2019        Self { rep }
2020    }
2021
2022    /// Convert this `ErrorContext` into a [`Val`].
2023    pub fn into_val(self) -> Val {
2024        Val::ErrorContext(ErrorContextAny(self.rep))
2025    }
2026
2027    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
2028    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
2029        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
2030            bail!("expected `error-context`; got `{}`", value.desc());
2031        };
2032        Ok(Self::new(*rep))
2033    }
2034
2035    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
2036        match ty {
2037            InterfaceType::ErrorContext(src) => {
2038                let rep = cx
2039                    .instance_mut()
2040                    .table_for_error_context(src)
2041                    .error_context_rep(index)?;
2042
2043                Ok(Self { rep })
2044            }
2045            _ => func::bad_type_info(),
2046        }
2047    }
2048}
2049
2050pub(crate) fn lower_error_context_to_index<U>(
2051    rep: u32,
2052    cx: &mut LowerContext<'_, U>,
2053    ty: InterfaceType,
2054) -> Result<u32> {
2055    match ty {
2056        InterfaceType::ErrorContext(dst) => {
2057            let tbl = cx.instance_mut().table_for_error_context(dst);
2058            tbl.error_context_insert(rep)
2059        }
2060        _ => func::bad_type_info(),
2061    }
2062}
2063// SAFETY: This relies on the `ComponentType` implementation for `u32` being
2064// safe and correct since we lift and lower future handles as `u32`s.
2065unsafe impl func::ComponentType for ErrorContext {
2066    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2067
2068    type Lower = <u32 as func::ComponentType>::Lower;
2069
2070    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2071        match ty {
2072            InterfaceType::ErrorContext(_) => Ok(()),
2073            other => bail!("expected `error`, found `{}`", func::desc(other)),
2074        }
2075    }
2076}
2077
2078// SAFETY: See the comment on the `ComponentType` `impl` for this type.
2079unsafe impl func::Lower for ErrorContext {
2080    fn linear_lower_to_flat<T>(
2081        &self,
2082        cx: &mut LowerContext<'_, T>,
2083        ty: InterfaceType,
2084        dst: &mut MaybeUninit<Self::Lower>,
2085    ) -> Result<()> {
2086        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2087            cx,
2088            InterfaceType::U32,
2089            dst,
2090        )
2091    }
2092
2093    fn linear_lower_to_memory<T>(
2094        &self,
2095        cx: &mut LowerContext<'_, T>,
2096        ty: InterfaceType,
2097        offset: usize,
2098    ) -> Result<()> {
2099        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2100            cx,
2101            InterfaceType::U32,
2102            offset,
2103        )
2104    }
2105}
2106
2107// SAFETY: See the comment on the `ComponentType` `impl` for this type.
2108unsafe impl func::Lift for ErrorContext {
2109    fn linear_lift_from_flat(
2110        cx: &mut LiftContext<'_>,
2111        ty: InterfaceType,
2112        src: &Self::Lower,
2113    ) -> Result<Self> {
2114        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2115        Self::lift_from_index(cx, ty, index)
2116    }
2117
2118    fn linear_lift_from_memory(
2119        cx: &mut LiftContext<'_>,
2120        ty: InterfaceType,
2121        bytes: &[u8],
2122    ) -> Result<Self> {
2123        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2124        Self::lift_from_index(cx, ty, index)
2125    }
2126}
2127
2128/// Represents the read or write end of a stream or future.
2129pub(super) struct TransmitHandle {
2130    pub(super) common: WaitableCommon,
2131    /// See `TransmitState`
2132    state: TableId<TransmitState>,
2133}
2134
2135impl TransmitHandle {
2136    fn new(state: TableId<TransmitState>) -> Self {
2137        Self {
2138            common: WaitableCommon::default(),
2139            state,
2140        }
2141    }
2142}
2143
2144impl TableDebug for TransmitHandle {
2145    fn type_name() -> &'static str {
2146        "TransmitHandle"
2147    }
2148}
2149
2150/// Represents the state of a stream or future.
2151struct TransmitState {
2152    /// The write end of the stream or future.
2153    write_handle: TableId<TransmitHandle>,
2154    /// The read end of the stream or future.
2155    read_handle: TableId<TransmitHandle>,
2156    /// See `WriteState`
2157    write: WriteState,
2158    /// See `ReadState`
2159    read: ReadState,
2160    /// Whether further values may be transmitted via this stream or future.
2161    done: bool,
2162    /// The original creator of this stream, used for type-checking with
2163    /// `{Future,Stream}Any`.
2164    pub(super) origin: TransmitOrigin,
2165}
2166
2167#[derive(Copy, Clone)]
2168pub(super) enum TransmitOrigin {
2169    Host,
2170    GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2171    GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2172}
2173
2174impl TransmitState {
2175    fn new(origin: TransmitOrigin) -> Self {
2176        Self {
2177            write_handle: TableId::new(u32::MAX),
2178            read_handle: TableId::new(u32::MAX),
2179            read: ReadState::Open,
2180            write: WriteState::Open,
2181            done: false,
2182            origin,
2183        }
2184    }
2185}
2186
2187impl TableDebug for TransmitState {
2188    fn type_name() -> &'static str {
2189        "TransmitState"
2190    }
2191}
2192
2193impl TransmitOrigin {
2194    fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2195        match index {
2196            TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2197            TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2198        }
2199    }
2200}
2201
2202type PollStream = Box<
2203    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2204>;
2205
2206type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2207
2208/// Represents the state of the write end of a stream or future.
2209enum WriteState {
2210    /// The write end is open, but no write is pending.
2211    Open,
2212    /// The write end is owned by a guest task and a write is pending.
2213    GuestReady {
2214        instance: Instance,
2215        caller: RuntimeComponentInstanceIndex,
2216        ty: TransmitIndex,
2217        flat_abi: Option<FlatAbi>,
2218        options: OptionsIndex,
2219        address: usize,
2220        count: ItemCount,
2221        handle: u32,
2222    },
2223    /// The write end is owned by the host, which is ready to produce items.
2224    HostReady {
2225        produce: PollStream,
2226        try_into: TryInto,
2227        guest_offset: ItemCount,
2228        cancel: bool,
2229        cancel_waker: Option<Waker>,
2230    },
2231    /// The write end has been dropped.
2232    Dropped,
2233}
2234
2235impl fmt::Debug for WriteState {
2236    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2237        match self {
2238            Self::Open => f.debug_tuple("Open").finish(),
2239            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2240            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2241            Self::Dropped => f.debug_tuple("Dropped").finish(),
2242        }
2243    }
2244}
2245
2246/// Represents the state of the read end of a stream or future.
2247enum ReadState {
2248    /// The read end is open, but no read is pending.
2249    Open,
2250    /// The read end is owned by a guest task and a read is pending.
2251    GuestReady {
2252        ty: TransmitIndex,
2253        caller_instance: RuntimeComponentInstanceIndex,
2254        caller_thread: QualifiedThreadId,
2255        flat_abi: Option<FlatAbi>,
2256        instance: Instance,
2257        options: OptionsIndex,
2258        address: usize,
2259        count: ItemCount,
2260        handle: u32,
2261    },
2262    /// The read end is owned by a host task, and it is ready to consume items.
2263    HostReady {
2264        consume: PollStream,
2265        guest_offset: ItemCount,
2266        cancel: bool,
2267        cancel_waker: Option<Waker>,
2268    },
2269    /// Both the read and write ends are owned by the host.
2270    HostToHost {
2271        accept: Box<
2272            dyn for<'a> Fn(
2273                    &'a mut UntypedWriteBuffer<'a>,
2274                )
2275                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2276                + Send
2277                + Sync,
2278        >,
2279        buffer: Vec<u8>,
2280        limit: usize,
2281    },
2282    /// The read end has been dropped.
2283    Dropped,
2284}
2285
2286impl fmt::Debug for ReadState {
2287    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2288        match self {
2289            Self::Open => f.debug_tuple("Open").finish(),
2290            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2291            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2292            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2293            Self::Dropped => f.debug_tuple("Dropped").finish(),
2294        }
2295    }
2296}
2297
2298fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result<ReturnCode> {
2299    Ok(match state {
2300        StreamResult::Dropped => ReturnCode::Dropped(count),
2301        StreamResult::Completed => ReturnCode::completed(kind, count),
2302        StreamResult::Cancelled => ReturnCode::Cancelled(count),
2303    })
2304}
2305
2306impl StoreOpaque {
2307    fn pipe_from_guest(
2308        &mut self,
2309        kind: TransmitKind,
2310        id: TableId<TransmitState>,
2311        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2312    ) {
2313        let future = async move {
2314            let stream_state = future.await?;
2315            tls::get(|store| {
2316                let state = store.concurrent_state_mut();
2317                let transmit = state.get_mut(id)?;
2318                let ReadState::HostReady {
2319                    consume,
2320                    guest_offset,
2321                    ..
2322                } = mem::replace(&mut transmit.read, ReadState::Open)
2323                else {
2324                    bail_bug!("expected ReadState::HostReady")
2325                };
2326                let code = return_code(kind, stream_state, guest_offset)?;
2327                transmit.read = match stream_state {
2328                    StreamResult::Dropped => ReadState::Dropped,
2329                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2330                        consume,
2331                        guest_offset: ItemCount::ZERO,
2332                        cancel: false,
2333                        cancel_waker: None,
2334                    },
2335                };
2336                let WriteState::GuestReady { ty, handle, .. } =
2337                    mem::replace(&mut transmit.write, WriteState::Open)
2338                else {
2339                    bail_bug!("expected WriteState::GuestReady")
2340                };
2341                state.send_write_result(ty, id, handle, code)?;
2342                Ok(())
2343            })
2344        };
2345
2346        self.concurrent_state_mut().push_future(future.boxed());
2347    }
2348
2349    fn pipe_to_guest(
2350        &mut self,
2351        kind: TransmitKind,
2352        id: TableId<TransmitState>,
2353        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2354    ) {
2355        let future = async move {
2356            let stream_state = future.await?;
2357            tls::get(|store| {
2358                let state = store.concurrent_state_mut();
2359                let transmit = state.get_mut(id)?;
2360                let WriteState::HostReady {
2361                    produce,
2362                    try_into,
2363                    guest_offset,
2364                    ..
2365                } = mem::replace(&mut transmit.write, WriteState::Open)
2366                else {
2367                    bail_bug!("expected WriteState::HostReady")
2368                };
2369                let code = return_code(kind, stream_state, guest_offset)?;
2370                transmit.write = match stream_state {
2371                    StreamResult::Dropped => WriteState::Dropped,
2372                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2373                        produce,
2374                        try_into,
2375                        guest_offset: ItemCount::ZERO,
2376                        cancel: false,
2377                        cancel_waker: None,
2378                    },
2379                };
2380                let ReadState::GuestReady { ty, handle, .. } =
2381                    mem::replace(&mut transmit.read, ReadState::Open)
2382                else {
2383                    bail_bug!("expected ReadState::GuestReady")
2384                };
2385                state.send_read_result(ty, id, handle, code)?;
2386                Ok(())
2387            })
2388        };
2389
2390        self.concurrent_state_mut().push_future(future.boxed());
2391    }
2392
2393    /// Drop the read end of a stream or future read from the host.
2394    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2395        let state = self.concurrent_state_mut();
2396        Waitable::Transmit(id).join(state, None)?;
2397        let transmit_id = state.get_mut(id)?.state;
2398        let transmit = state
2399            .get_mut(transmit_id)
2400            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2401        log::trace!(
2402            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2403            transmit.read,
2404            transmit.write
2405        );
2406
2407        transmit.read = ReadState::Dropped;
2408
2409        // If the write end is already dropped, it should stay dropped,
2410        // otherwise, it should be opened.
2411        let new_state = if let WriteState::Dropped = &transmit.write {
2412            WriteState::Dropped
2413        } else {
2414            WriteState::Open
2415        };
2416
2417        let write_handle = transmit.write_handle;
2418
2419        match mem::replace(&mut transmit.write, new_state) {
2420            // If a guest is waiting to write, notify it that the read end has
2421            // been dropped.
2422            WriteState::GuestReady { ty, handle, .. } => {
2423                state.update_event(
2424                    write_handle.rep(),
2425                    match ty {
2426                        TransmitIndex::Future(ty) => Event::FutureWrite {
2427                            code: ReturnCode::Dropped(ItemCount::ZERO),
2428                            pending: Some((ty, handle)),
2429                        },
2430                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2431                            code: ReturnCode::Dropped(ItemCount::ZERO),
2432                            pending: Some((ty, handle)),
2433                        },
2434                    },
2435                )?;
2436            }
2437
2438            WriteState::Open => {
2439                state.update_event(
2440                    write_handle.rep(),
2441                    match kind {
2442                        TransmitKind::Future => Event::FutureWrite {
2443                            code: ReturnCode::Dropped(ItemCount::ZERO),
2444                            pending: None,
2445                        },
2446                        TransmitKind::Stream => Event::StreamWrite {
2447                            code: ReturnCode::Dropped(ItemCount::ZERO),
2448                            pending: None,
2449                        },
2450                    },
2451                )?;
2452            }
2453
2454            // If the writer has already been dropped, then this cleans out the
2455            // state that the reader is using. If the write is host-owned then
2456            // by cleaning this out we run the host's `Drop` implementation
2457            // which notifies it of this drop.
2458            WriteState::Dropped | WriteState::HostReady { .. } => {
2459                log::trace!("host_drop_reader delete {transmit_id:?}");
2460                state.delete_transmit(transmit_id)?;
2461            }
2462        }
2463        Ok(())
2464    }
2465
2466    /// Drop the write end of a stream or future read from the host.
2467    fn host_drop_writer(
2468        &mut self,
2469        id: TableId<TransmitHandle>,
2470        on_drop_open: Option<fn() -> Result<()>>,
2471    ) -> Result<()> {
2472        let state = self.concurrent_state_mut();
2473        Waitable::Transmit(id).join(state, None)?;
2474        let transmit_id = state.get_mut(id)?.state;
2475        let transmit = state
2476            .get_mut(transmit_id)
2477            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2478        log::trace!(
2479            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2480            transmit.read,
2481            transmit.write
2482        );
2483
2484        // Existing queued transmits must be updated with information for the impending writer closure
2485        match &mut transmit.write {
2486            WriteState::GuestReady { .. } => {
2487                bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2488            }
2489            WriteState::HostReady { .. } => {}
2490            v @ WriteState::Open => {
2491                if let (Some(on_drop_open), false) = (on_drop_open, transmit.done) {
2492                    on_drop_open()?;
2493                } else {
2494                    *v = WriteState::Dropped;
2495                }
2496            }
2497            WriteState::Dropped => bail_bug!("write state is already dropped"),
2498        }
2499
2500        let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2501
2502        // If the existing read state is dropped, then there's nothing to read
2503        // and we can keep it that way.
2504        //
2505        // If the read state was any other state, then we must set the new state to open
2506        // to indicate that there *is* data to be read
2507        let new_state = if let ReadState::Dropped = &transmit.read {
2508            ReadState::Dropped
2509        } else {
2510            ReadState::Open
2511        };
2512
2513        let read_handle = transmit.read_handle;
2514
2515        // Swap in the new read state
2516        match mem::replace(&mut transmit.read, new_state) {
2517            // If the guest was ready to read, then we cannot drop the reader (or writer);
2518            // we must deliver the event, and update the state associated with the handle to
2519            // represent that a read must be performed
2520            ReadState::GuestReady { ty, handle, .. } => {
2521                // Ensure the final read of the guest is queued, with appropriate closure indicator
2522                self.concurrent_state_mut().update_event(
2523                    read_handle.rep(),
2524                    match ty {
2525                        TransmitIndex::Future(ty) => Event::FutureRead {
2526                            code: ReturnCode::Dropped(ItemCount::ZERO),
2527                            pending: Some((ty, handle)),
2528                        },
2529                        TransmitIndex::Stream(ty) => Event::StreamRead {
2530                            code: ReturnCode::Dropped(ItemCount::ZERO),
2531                            pending: Some((ty, handle)),
2532                        },
2533                    },
2534                )?;
2535            }
2536
2537            // If the read state is open, then there are no registered readers of the stream/future
2538            ReadState::Open => {
2539                self.concurrent_state_mut().update_event(
2540                    read_handle.rep(),
2541                    match on_drop_open {
2542                        Some(_) => Event::FutureRead {
2543                            code: ReturnCode::Dropped(ItemCount::ZERO),
2544                            pending: None,
2545                        },
2546                        None => Event::StreamRead {
2547                            code: ReturnCode::Dropped(ItemCount::ZERO),
2548                            pending: None,
2549                        },
2550                    },
2551                )?;
2552            }
2553
2554            // If the read state was already dropped, then we can remove the
2555            // transmit state completely (both writer and reader have been
2556            // dropped). If the read state is host-owned then it's additionally
2557            // deleted here as a notification that the read end has gone away.
2558            // Running the host's `Drop` implementation is what notifies it of
2559            // this event.
2560            ReadState::Dropped | ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
2561                log::trace!("host_drop_writer delete {transmit_id:?}");
2562                self.concurrent_state_mut().delete_transmit(transmit_id)?;
2563            }
2564        }
2565        Ok(())
2566    }
2567
2568    pub(super) fn transmit_origin(
2569        &mut self,
2570        id: TableId<TransmitHandle>,
2571    ) -> Result<TransmitOrigin> {
2572        let state = self.concurrent_state_mut();
2573        let state_id = state.get_mut(id)?.state;
2574        Ok(state.get_mut(state_id)?.origin)
2575    }
2576}
2577
2578impl<T> StoreContextMut<'_, T> {
2579    fn new_transmit<P: StreamProducer<T>>(
2580        mut self,
2581        kind: TransmitKind,
2582        producer: P,
2583    ) -> Result<TableId<TransmitHandle>>
2584    where
2585        P::Item: func::Lower,
2586    {
2587        let token = StoreToken::new(self.as_context_mut());
2588        let state = self.0.concurrent_state_mut();
2589        let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2590        let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2591        let id = state.get_mut(read)?.state;
2592        let mut dropped = false;
2593        let produce = Box::new({
2594            let producer = producer.clone();
2595            move || {
2596                let producer = producer.clone();
2597                async move {
2598                    let mut state = producer.take()?;
2599                    let (mine, buffer) = &mut *state;
2600
2601                    let (result, cancelled) = if buffer.remaining().is_empty() {
2602                        future::poll_fn(|cx| {
2603                            tls::get(|store| {
2604                                let transmit = store.concurrent_state_mut().get_mut(id)?;
2605
2606                                let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2607                                    bail_bug!("expected WriteState::HostReady")
2608                                };
2609
2610                                let mut host_written = 0;
2611                                let mut host_buffer =
2612                                    if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2613                                        Some(mem::take(buffer))
2614                                    } else {
2615                                        None
2616                                    };
2617
2618                                let poll = mine.as_mut().poll_produce(
2619                                    cx,
2620                                    token.as_context_mut(store),
2621                                    Destination {
2622                                        id,
2623                                        buffer,
2624                                        host_buffer: host_buffer.as_mut().map(|b| {
2625                                            HostBuffer {
2626                                                dst: b,
2627                                                marked_written: &mut host_written,
2628                                            }
2629                                        }),
2630                                        _phantom: PhantomData,
2631                                    },
2632                                    cancel,
2633                                );
2634
2635                                let transmit = store.concurrent_state_mut().get_mut(id)?;
2636
2637                                let host_offset = if let (
2638                                    Some(host_buffer),
2639                                    ReadState::HostToHost { buffer, limit, .. },
2640                                ) = (host_buffer, &mut transmit.read)
2641                                {
2642                                    *limit = host_written;
2643                                    *buffer = host_buffer;
2644                                    *limit
2645                                } else {
2646                                    0
2647                                };
2648
2649                                {
2650                                    let WriteState::HostReady {
2651                                        guest_offset,
2652                                        cancel,
2653                                        cancel_waker,
2654                                        ..
2655                                    } = &mut transmit.write
2656                                    else {
2657                                        bail_bug!("expected WriteState::HostReady")
2658                                    };
2659
2660                                    if poll.is_pending() {
2661                                        if !buffer.remaining().is_empty()
2662                                            || *guest_offset > 0
2663                                            || host_offset > 0
2664                                        {
2665                                            bail!(
2666                                                "StreamProducer::poll_produce returned Poll::Pending \
2667                                                 after producing at least one item"
2668                                            )
2669                                        }
2670                                        *cancel_waker = Some(cx.waker().clone());
2671                                    } else {
2672                                        *cancel_waker = None;
2673                                        *cancel = false;
2674                                    }
2675                                }
2676
2677                                Ok(poll.map(|v| v.map(|result| (result, cancel))))
2678                            })?
2679                        })
2680                            .await?
2681                    } else {
2682                        (StreamResult::Completed, false)
2683                    };
2684
2685                    let (guest_offset, host_offset, count) = tls::get(|store| {
2686                        let transmit = store.concurrent_state_mut().get_mut(id)?;
2687                        let (count, host_offset) = match &transmit.read {
2688                            &ReadState::GuestReady { count, .. } => (count.as_u32(), 0),
2689                            &ReadState::HostToHost { limit, .. } => (1, limit),
2690                            _ => bail_bug!("invalid read state"),
2691                        };
2692                        let guest_offset = match &transmit.write {
2693                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2694                            _ => bail_bug!("invalid write state"),
2695                        };
2696                        Ok((guest_offset, host_offset, count))
2697                    })?;
2698
2699                    match result {
2700                        StreamResult::Completed => {
2701                            if count > 1
2702                                && buffer.remaining().is_empty()
2703                                && guest_offset == 0
2704                                && host_offset == 0
2705                            {
2706                                bail!(
2707                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2708                                     without producing any items"
2709                                );
2710                            }
2711                        }
2712                        StreamResult::Cancelled => {
2713                            if !cancelled {
2714                                bail!(
2715                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2716                                     without being given a `finish` parameter value of true"
2717                                );
2718                            }
2719                        }
2720                        StreamResult::Dropped => {
2721                            dropped = true;
2722                        }
2723                    }
2724
2725                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2726
2727                    drop(state);
2728
2729                    if write_buffer {
2730                        write(token, id, producer.clone(), kind).await?;
2731                    }
2732
2733                    Ok(if dropped {
2734                        if producer.with(|p| p.1.remaining().is_empty())?  {
2735                            StreamResult::Dropped
2736                        } else {
2737                            StreamResult::Completed
2738                        }
2739                    } else {
2740                        result
2741                    })
2742                }
2743                .boxed()
2744            }
2745        });
2746        let try_into = Box::new(move |ty| {
2747            let (mine, buffer) = producer.try_lock().ok()?.take()?;
2748            match P::try_into(mine, ty) {
2749                Ok(value) => Some(value),
2750                Err(mine) => {
2751                    *producer.try_lock().ok()? = Some((mine, buffer));
2752                    None
2753                }
2754            }
2755        });
2756        state.get_mut(id)?.write = WriteState::HostReady {
2757            produce,
2758            try_into,
2759            guest_offset: ItemCount::ZERO,
2760            cancel: false,
2761            cancel_waker: None,
2762        };
2763        Ok(read)
2764    }
2765
2766    fn set_consumer<C: StreamConsumer<T>>(
2767        mut self,
2768        id: TableId<TransmitHandle>,
2769        kind: TransmitKind,
2770        consumer: C,
2771    ) -> Result<()> {
2772        let token = StoreToken::new(self.as_context_mut());
2773        let state = self.0.concurrent_state_mut();
2774        let id = state.get_mut(id)?.state;
2775        let transmit = state.get_mut(id)?;
2776        let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2777        let consume_with_buffer = {
2778            let consumer = consumer.clone();
2779            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2780                let mut mine = consumer.take()?;
2781
2782                let host_buffer_remaining_before =
2783                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2784
2785                let (result, cancelled) = future::poll_fn(|cx| {
2786                    tls::get(|store| {
2787                        let cancel = match &store.concurrent_state_mut().get_mut(id)?.read {
2788                            &ReadState::HostReady { cancel, .. } => cancel,
2789                            ReadState::Open => false,
2790                            _ => bail_bug!("unexpected read state"),
2791                        };
2792
2793                        let poll = mine.as_mut().poll_consume(
2794                            cx,
2795                            token.as_context_mut(store),
2796                            Source {
2797                                id,
2798                                host_buffer: host_buffer.as_deref_mut(),
2799                            },
2800                            cancel,
2801                        );
2802
2803                        if let ReadState::HostReady {
2804                            cancel_waker,
2805                            cancel,
2806                            ..
2807                        } = &mut store.concurrent_state_mut().get_mut(id)?.read
2808                        {
2809                            if poll.is_pending() {
2810                                *cancel_waker = Some(cx.waker().clone());
2811                            } else {
2812                                *cancel_waker = None;
2813                                *cancel = false;
2814                            }
2815                        }
2816
2817                        Ok(poll.map(|v| v.map(|result| (result, cancel))))
2818                    })?
2819                })
2820                .await?;
2821
2822                let (guest_offset, count) = tls::get(|store| {
2823                    let transmit = store.concurrent_state_mut().get_mut(id)?;
2824                    Ok((
2825                        match &transmit.read {
2826                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2827                            ReadState::Open => ItemCount::ZERO,
2828                            _ => bail_bug!("invalid read state"),
2829                        },
2830                        match &transmit.write {
2831                            WriteState::GuestReady { count, .. } => count.as_usize(),
2832                            WriteState::HostReady { .. } => match host_buffer_remaining_before {
2833                                Some(n) => n,
2834                                None => bail_bug!("host_buffer_remaining_before should be set"),
2835                            },
2836                            _ => bail_bug!("invalid write state"),
2837                        },
2838                    ))
2839                })?;
2840
2841                match result {
2842                    StreamResult::Completed => {
2843                        if count > 0
2844                            && guest_offset == 0
2845                            && host_buffer_remaining_before
2846                                .zip(host_buffer.map(|v| v.remaining().len()))
2847                                .map(|(before, after)| before == after)
2848                                .unwrap_or(false)
2849                        {
2850                            bail!(
2851                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2852                                 without consuming any items"
2853                            );
2854                        }
2855
2856                        if let TransmitKind::Future = kind {
2857                            tls::get(|store| {
2858                                store.concurrent_state_mut().get_mut(id)?.done = true;
2859                                crate::error::Ok(())
2860                            })?;
2861                        }
2862                    }
2863                    StreamResult::Cancelled => {
2864                        if !cancelled {
2865                            bail!(
2866                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2867                                 without being given a `finish` parameter value of true"
2868                            );
2869                        }
2870                    }
2871                    StreamResult::Dropped => {}
2872                }
2873
2874                Ok(result)
2875            }
2876        };
2877        let consume = {
2878            let consume = consume_with_buffer.clone();
2879            Box::new(move || {
2880                let consume = consume.clone();
2881                async move { consume(None).await }.boxed()
2882            })
2883        };
2884
2885        match &transmit.write {
2886            WriteState::Open => {
2887                transmit.read = ReadState::HostReady {
2888                    consume,
2889                    guest_offset: ItemCount::ZERO,
2890                    cancel: false,
2891                    cancel_waker: None,
2892                };
2893            }
2894            &WriteState::GuestReady { .. } => {
2895                let future = consume();
2896                transmit.read = ReadState::HostReady {
2897                    consume,
2898                    guest_offset: ItemCount::ZERO,
2899                    cancel: false,
2900                    cancel_waker: None,
2901                };
2902                self.0.pipe_from_guest(kind, id, future);
2903            }
2904            WriteState::HostReady { .. } => {
2905                let WriteState::HostReady { produce, .. } = mem::replace(
2906                    &mut transmit.write,
2907                    WriteState::HostReady {
2908                        produce: Box::new(|| {
2909                            Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2910                        }),
2911                        try_into: Box::new(|_| None),
2912                        guest_offset: ItemCount::ZERO,
2913                        cancel: false,
2914                        cancel_waker: None,
2915                    },
2916                ) else {
2917                    bail_bug!("expected WriteState::HostReady")
2918                };
2919
2920                transmit.read = ReadState::HostToHost {
2921                    accept: Box::new(move |input| {
2922                        let consume = consume_with_buffer.clone();
2923                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2924                    }),
2925                    buffer: Vec::new(),
2926                    limit: 0,
2927                };
2928
2929                let future = async move {
2930                    loop {
2931                        if tls::get(|store| {
2932                            crate::error::Ok(matches!(
2933                                store.concurrent_state_mut().get_mut(id)?.read,
2934                                ReadState::Dropped
2935                            ))
2936                        })? {
2937                            break Ok(());
2938                        }
2939
2940                        match produce().await? {
2941                            StreamResult::Completed | StreamResult::Cancelled => {}
2942                            StreamResult::Dropped => break Ok(()),
2943                        }
2944
2945                        if let TransmitKind::Future = kind {
2946                            break Ok(());
2947                        }
2948                    }
2949                }
2950                .map(move |result| {
2951                    tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2952                    result
2953                });
2954
2955                state.push_future(Box::pin(future));
2956            }
2957            WriteState::Dropped => {
2958                let reader = transmit.read_handle;
2959                self.0.host_drop_reader(reader, kind)?;
2960            }
2961        }
2962        Ok(())
2963    }
2964}
2965
2966async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2967    token: StoreToken<D>,
2968    id: TableId<TransmitState>,
2969    pair: Arc<LockedState<(P, B)>>,
2970    kind: TransmitKind,
2971) -> Result<()> {
2972    let (read, guest_offset) = tls::get(|store| {
2973        let transmit = store.concurrent_state_mut().get_mut(id)?;
2974
2975        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2976            Some(guest_offset)
2977        } else {
2978            None
2979        };
2980
2981        crate::error::Ok((
2982            mem::replace(&mut transmit.read, ReadState::Open),
2983            guest_offset,
2984        ))
2985    })?;
2986
2987    match read {
2988        ReadState::GuestReady {
2989            ty,
2990            flat_abi,
2991            options,
2992            address,
2993            count,
2994            handle,
2995            instance,
2996            caller_instance,
2997            caller_thread,
2998        } => {
2999            let guest_offset = match guest_offset {
3000                Some(i) => i,
3001                None => bail_bug!("guest_offset should be present if ready"),
3002            };
3003
3004            if let TransmitKind::Future = kind {
3005                tls::get(|store| {
3006                    store.concurrent_state_mut().get_mut(id)?.done = true;
3007                    crate::error::Ok(())
3008                })?;
3009            }
3010
3011            let old_remaining = pair.with(|p| p.1.remaining().len())?;
3012            let accept = {
3013                let pair = pair.clone();
3014                move |mut store: StoreContextMut<D>| {
3015                    let mut state = pair.take()?;
3016                    lower::<T, B, D>(
3017                        store.as_context_mut(),
3018                        instance,
3019                        caller_thread,
3020                        options,
3021                        ty,
3022                        address + (T::SIZE32 * guest_offset.as_usize()),
3023                        count.as_usize() - guest_offset.as_usize(),
3024                        &mut state.1,
3025                    )?;
3026                    crate::error::Ok(())
3027                }
3028            };
3029
3030            if guest_offset < count {
3031                if T::MAY_REQUIRE_REALLOC {
3032                    // For payloads which may require a realloc call, use a
3033                    // oneshot::channel and background task.  This is
3034                    // necessary because calling the guest while there are
3035                    // host embedder frames on the stack is unsound.
3036                    let (tx, rx) = oneshot::channel();
3037                    tls::get(move |store| {
3038                        store
3039                            .concurrent_state_mut()
3040                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3041                                move |store| {
3042                                    _ = tx.send(accept(token.as_context_mut(store))?);
3043                                    Ok(())
3044                                },
3045                            ))))
3046                    });
3047                    match rx.await {
3048                        Ok(r) => r,
3049                        Err(oneshot::Canceled) => bail_bug!("work cancelled"),
3050                    }
3051                } else {
3052                    // Optimize flat payloads (i.e. those which do not
3053                    // require calling the guest's realloc function) by
3054                    // lowering directly instead of using a oneshot::channel
3055                    // and background task.
3056                    tls::get(|store| accept(token.as_context_mut(store)))?
3057                }
3058            }
3059
3060            tls::get(|store| {
3061                let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3062
3063                let transmit = store.concurrent_state_mut().get_mut(id)?;
3064
3065                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3066                    bail_bug!("expected WriteState::HostReady")
3067                };
3068
3069                guest_offset.inc(count)?;
3070
3071                transmit.read = ReadState::GuestReady {
3072                    ty,
3073                    flat_abi,
3074                    options,
3075                    address,
3076                    count: ItemCount::new_usize(count)?,
3077                    handle,
3078                    instance,
3079                    caller_instance,
3080                    caller_thread,
3081                };
3082
3083                crate::error::Ok(())
3084            })?;
3085
3086            Ok(())
3087        }
3088
3089        ReadState::HostToHost {
3090            accept,
3091            mut buffer,
3092            limit,
3093        } => {
3094            let mut state = StreamResult::Completed;
3095            let mut position = 0;
3096
3097            while !matches!(state, StreamResult::Dropped) && position < limit {
3098                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3099                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3100                (buffer, position, _) = slice_buffer.into_parts();
3101            }
3102
3103            {
3104                let mut pair = pair.take()?;
3105                let (_, buffer) = &mut *pair;
3106
3107                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3108                    state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3109                }
3110            }
3111
3112            tls::get(|store| {
3113                store.concurrent_state_mut().get_mut(id)?.read = match state {
3114                    StreamResult::Dropped => ReadState::Dropped,
3115                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3116                        accept,
3117                        buffer,
3118                        limit: 0,
3119                    },
3120                };
3121
3122                crate::error::Ok(())
3123            })?;
3124            Ok(())
3125        }
3126
3127        _ => bail_bug!("unexpected read state"),
3128    }
3129}
3130
3131impl Instance {
3132    /// Handle a host- or guest-initiated write by delivering the item(s) to the
3133    /// `StreamConsumer` for the specified stream or future.
3134    fn consume(
3135        self,
3136        store: &mut dyn VMStore,
3137        kind: TransmitKind,
3138        transmit_id: TableId<TransmitState>,
3139        consume: PollStream,
3140        guest_offset: ItemCount,
3141        cancel: bool,
3142    ) -> Result<ReturnCode> {
3143        let mut future = consume();
3144        store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
3145            consume,
3146            guest_offset,
3147            cancel,
3148            cancel_waker: None,
3149        };
3150        let poll = tls::set(store, || {
3151            future
3152                .as_mut()
3153                .poll(&mut Context::from_waker(&Waker::noop()))
3154        });
3155
3156        Ok(match poll {
3157            Poll::Ready(state) => {
3158                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3159                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3160                    bail_bug!("expected ReadState::HostReady")
3161                };
3162                let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3163                transmit.write = WriteState::Open;
3164                code
3165            }
3166            Poll::Pending => {
3167                store.pipe_from_guest(kind, transmit_id, future);
3168                ReturnCode::Blocked
3169            }
3170        })
3171    }
3172
3173    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
3174    /// for the specified stream or future for items.
3175    fn produce(
3176        self,
3177        store: &mut dyn VMStore,
3178        kind: TransmitKind,
3179        transmit_id: TableId<TransmitState>,
3180        produce: PollStream,
3181        try_into: TryInto,
3182        guest_offset: ItemCount,
3183        cancel: bool,
3184    ) -> Result<ReturnCode> {
3185        let mut future = produce();
3186        store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3187            produce,
3188            try_into,
3189            guest_offset,
3190            cancel,
3191            cancel_waker: None,
3192        };
3193        let poll = tls::set(store, || {
3194            future
3195                .as_mut()
3196                .poll(&mut Context::from_waker(&Waker::noop()))
3197        });
3198
3199        Ok(match poll {
3200            Poll::Ready(state) => {
3201                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3202                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3203                    bail_bug!("expected WriteState::HostReady")
3204                };
3205                let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3206                transmit.read = ReadState::Open;
3207                code
3208            }
3209            Poll::Pending => {
3210                store.pipe_to_guest(kind, transmit_id, future);
3211                ReturnCode::Blocked
3212            }
3213        })
3214    }
3215
3216    /// Drop the writable end of the specified stream or future from the guest.
3217    pub(super) fn guest_drop_writable(
3218        self,
3219        store: &mut StoreOpaque,
3220        ty: TransmitIndex,
3221        writer: u32,
3222    ) -> Result<()> {
3223        let table = self.id().get_mut(store).table_for_transmit(ty);
3224        let transmit_rep = match ty {
3225            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3226            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3227        };
3228
3229        let id = TableId::<TransmitHandle>::new(transmit_rep);
3230        log::trace!("guest_drop_writable: drop writer {id:?}");
3231        match ty {
3232            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3233            TransmitIndex::Future(_) => store.host_drop_writer(
3234                id,
3235                Some(|| {
3236                    Err(format_err!(
3237                        "cannot drop future write end without first writing a value"
3238                    ))
3239                }),
3240            ),
3241        }
3242    }
3243
3244    /// Copy `count` items from `read_address` to `write_address` for the
3245    /// specified stream or future.
3246    fn copy<T: 'static>(
3247        store: StoreContextMut<T>,
3248        flat_abi: Option<FlatAbi>,
3249        write_instance: Instance,
3250        write_caller_instance: RuntimeComponentInstanceIndex,
3251        write_ty: TransmitIndex,
3252        write_options: OptionsIndex,
3253        write_address: usize,
3254        read_instance: Instance,
3255        read_caller_instance: RuntimeComponentInstanceIndex,
3256        read_caller_thread: QualifiedThreadId,
3257        read_ty: TransmitIndex,
3258        read_options: OptionsIndex,
3259        read_address: usize,
3260        count: ItemCount,
3261        rep: u32,
3262    ) -> Result<()> {
3263        let (write_component, store) = write_instance.component_and_store_mut(store.0);
3264        let (read_component, mut store) = read_instance.component_and_store_mut(store);
3265        let write_types = write_component.types();
3266        let read_types = read_component.types();
3267        let count = count.as_usize();
3268
3269        // Validate `write_ty` w.r.t. `write_address` to ensure it's properly
3270        // aligned and in-bounds.
3271        let write_payload_ty = write_ty.payload(write_types);
3272        let write_abi = match write_payload_ty {
3273            Some(ty) => write_types.canonical_abi(ty),
3274            None => &CanonicalAbiInfo::ZERO,
3275        };
3276        let write_length_in_bytes = match flat_abi {
3277            Some(abi) => usize::try_from(abi.size)? * count,
3278            None => usize::try_from(write_abi.size32)? * count,
3279        };
3280        if write_length_in_bytes > 0 {
3281            if write_address % usize::try_from(write_abi.align32)? != 0 {
3282                bail!("write pointer not aligned");
3283            }
3284            write_instance
3285                .options_memory(store, write_options)
3286                .get(write_address..)
3287                .and_then(|b| b.get(..write_length_in_bytes))
3288                .ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
3289        }
3290
3291        let read_payload_ty = read_ty.payload(read_types);
3292        let read_abi = match read_payload_ty {
3293            Some(ty) => read_types.canonical_abi(ty),
3294            None => &CanonicalAbiInfo::ZERO,
3295        };
3296        let read_length_in_bytes = match flat_abi {
3297            Some(abi) => usize::try_from(abi.size)? * count,
3298            None => usize::try_from(read_abi.size32)? * count,
3299        };
3300        if read_length_in_bytes > 0 {
3301            if read_address % usize::try_from(read_abi.align32)? != 0 {
3302                bail!("read pointer not aligned");
3303            }
3304            read_instance
3305                .options_memory(store, read_options)
3306                .get(read_address..)
3307                .and_then(|b| b.get(..read_length_in_bytes))
3308                .ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
3309        }
3310
3311        if write_caller_instance == read_caller_instance
3312            && !allow_intra_component_read_write(write_payload_ty)
3313        {
3314            bail!(
3315                "cannot read from and write to intra-component future/stream with non-numeric payload"
3316            )
3317        }
3318
3319        match (write_ty, read_ty) {
3320            (TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
3321                if count != 1 {
3322                    bail_bug!("futures can only send 1 item");
3323                }
3324
3325                let val = write_payload_ty
3326                    .map(|ty| {
3327                        let lift = &mut LiftContext::new(store, write_options, write_instance);
3328                        let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3329                        Val::load(lift, *ty, bytes)
3330                    })
3331                    .transpose()?;
3332
3333                if let Some(val) = val {
3334                    // Serializing the value may require calling the guest's realloc function, so we
3335                    // set the guest's thread context in case realloc requires it, and restore the original
3336                    // thread context after the copy is complete.
3337                    let old_thread = store.set_thread(read_caller_thread)?;
3338                    let lower =
3339                        &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3340                    let ptr = func::validate_inbounds_dynamic(
3341                        read_abi,
3342                        lower.as_slice_mut(),
3343                        &ValRaw::u32(read_address.try_into()?),
3344                    )?;
3345                    let ty = match read_payload_ty {
3346                        Some(ty) => ty,
3347                        None => bail_bug!("expected read payload type to be present"),
3348                    };
3349                    val.store(lower, *ty, ptr)?;
3350                    store.set_thread(old_thread)?;
3351                }
3352            }
3353            (TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
3354                if write_length_in_bytes == 0 {
3355                    return Ok(());
3356                }
3357                let write_payload_ty = match write_payload_ty {
3358                    Some(ty) => ty,
3359                    None => bail_bug!("expected write payload type to be present"),
3360                };
3361                let read_payload_ty = match read_payload_ty {
3362                    Some(ty) => ty,
3363                    None => bail_bug!("expected read payload type to be present"),
3364                };
3365                if flat_abi.is_some() {
3366                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
3367                    let store_opaque = store.store_opaque_mut();
3368
3369                    assert_eq!(read_length_in_bytes, write_length_in_bytes);
3370
3371                    if read_instance
3372                        .options_memory(store_opaque, read_options)
3373                        .as_ptr()
3374                        == write_instance
3375                            .options_memory(store_opaque, write_options)
3376                            .as_ptr()
3377                    {
3378                        let memory = read_instance.options_memory_mut(store_opaque, read_options);
3379                        memory.copy_within(
3380                            write_address..write_address + write_length_in_bytes,
3381                            read_address,
3382                        );
3383                    } else {
3384                        let src = write_instance.options_memory(store_opaque, write_options)
3385                            [write_address..][..write_length_in_bytes]
3386                            .as_ptr();
3387                        let dst = read_instance.options_memory_mut(store_opaque, read_options)
3388                            [read_address..][..read_length_in_bytes]
3389                            .as_mut_ptr();
3390
3391                        // SAFETY: Both `src` and `dst` have been validated
3392                        // above to be valid pointers as they're derived from
3393                        // slices that have the desired length with the desired
3394                        // read/write permission. The `unsafe` bit here is that
3395                        // the memories are disjoint (different base pointers)
3396                        // and there's no easy way to borrow both
3397                        // simultaneously from the store. Different memories
3398                        // are guaranteed to be disjoint, however, so the
3399                        // `unsafe` here should be ok.
3400                        unsafe {
3401                            src.copy_to_nonoverlapping(dst, write_length_in_bytes);
3402                        }
3403                    }
3404                } else {
3405                    let store_opaque = store.store_opaque_mut();
3406                    let lift = &mut LiftContext::new(store_opaque, write_options, write_instance);
3407                    let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3408                    lift.consume_fuel_array(count, size_of::<Val>())?;
3409
3410                    let values = (0..count)
3411                        .map(|index| {
3412                            let size = usize::try_from(write_abi.size32)?;
3413                            Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
3414                        })
3415                        .collect::<Result<Vec<_>>>()?;
3416
3417                    let id = TableId::<TransmitHandle>::new(rep);
3418                    log::trace!("copy values {values:?} for {id:?}");
3419
3420                    // Serializing the value may require calling the guest's realloc function, so we
3421                    // set the guest's thread context in case realloc requires it, and restore the original
3422                    // thread context after the copy is complete.
3423                    let old_thread = store.set_thread(read_caller_thread)?;
3424                    let lower =
3425                        &mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
3426                    let mut ptr = read_address;
3427                    for value in values {
3428                        value.store(lower, *read_payload_ty, ptr)?;
3429                        ptr += usize::try_from(read_abi.size32)?;
3430                    }
3431                    store.set_thread(old_thread)?;
3432                }
3433            }
3434            _ => bail_bug!("mismatched transmit types in copy"),
3435        }
3436
3437        Ok(())
3438    }
3439
3440    fn check_bounds(
3441        self,
3442        store: &StoreOpaque,
3443        options: OptionsIndex,
3444        ty: TransmitIndex,
3445        address: usize,
3446        count: usize,
3447    ) -> Result<()> {
3448        let types = self.id().get(store).component().types();
3449        let size = usize::try_from(
3450            match ty {
3451                TransmitIndex::Future(ty) => types[types[ty].ty]
3452                    .payload
3453                    .map(|ty| types.canonical_abi(&ty).size32),
3454                TransmitIndex::Stream(ty) => types[types[ty].ty]
3455                    .payload
3456                    .map(|ty| types.canonical_abi(&ty).size32),
3457            }
3458            .unwrap_or(0),
3459        )?;
3460
3461        if count > 0 && size > 0 {
3462            self.options_memory(store, options)
3463                .get(address..)
3464                .and_then(|b| b.get(..size.checked_mul(count)?))
3465                .map(drop)
3466                .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3467        } else {
3468            Ok(())
3469        }
3470    }
3471
3472    /// Write to the specified stream or future from the guest.
3473    pub(super) fn guest_write<T: 'static>(
3474        self,
3475        mut store: StoreContextMut<T>,
3476        caller: RuntimeComponentInstanceIndex,
3477        ty: TransmitIndex,
3478        options: OptionsIndex,
3479        flat_abi: Option<FlatAbi>,
3480        handle: u32,
3481        address: u32,
3482        count: u32,
3483    ) -> Result<ReturnCode> {
3484        let count = ItemCount::new(count)?;
3485
3486        if !self.options(store.0, options).async_ {
3487            // The caller may only sync call `{stream,future}.write` from an
3488            // async task (i.e. a task created via a call to an async export).
3489            // Otherwise, we'll trap.
3490            store.0.check_blocking()?;
3491        }
3492
3493        let address = usize::try_from(address)?;
3494        self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3495        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3496        let TransmitLocalState::Write { done } = *state else {
3497            bail!(Trap::ConcurrentFutureStreamOp);
3498        };
3499
3500        if done {
3501            bail!("cannot write after being notified that the readable end dropped");
3502        }
3503
3504        *state = TransmitLocalState::Busy;
3505        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3506        let concurrent_state = store.0.concurrent_state_mut();
3507        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3508        let transmit = concurrent_state.get_mut(transmit_id)?;
3509        log::trace!(
3510            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3511            transmit.read
3512        );
3513
3514        if transmit.done {
3515            bail!("cannot write to future after previous write succeeded or readable end dropped");
3516        }
3517
3518        let new_state = if let ReadState::Dropped = &transmit.read {
3519            ReadState::Dropped
3520        } else {
3521            ReadState::Open
3522        };
3523
3524        let set_guest_ready = |me: &mut ConcurrentState| {
3525            let transmit = me.get_mut(transmit_id)?;
3526            if !matches!(&transmit.write, WriteState::Open) {
3527                bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3528            }
3529            transmit.write = WriteState::GuestReady {
3530                instance: self,
3531                caller,
3532                ty,
3533                flat_abi,
3534                options,
3535                address,
3536                count,
3537                handle,
3538            };
3539            Ok::<_, crate::Error>(())
3540        };
3541
3542        let mut result = match mem::replace(&mut transmit.read, new_state) {
3543            ReadState::GuestReady {
3544                ty: read_ty,
3545                flat_abi: read_flat_abi,
3546                options: read_options,
3547                address: read_address,
3548                count: read_count,
3549                handle: read_handle,
3550                instance: read_instance,
3551                caller_instance: read_caller_instance,
3552                caller_thread: read_caller_thread,
3553            } => {
3554                if flat_abi != read_flat_abi {
3555                    bail_bug!("expected flat ABI calculations to be the same");
3556                }
3557
3558                if let TransmitIndex::Future(_) = ty {
3559                    transmit.done = true;
3560                }
3561
3562                // Note that zero-length reads and writes are handling specially
3563                // by the spec to allow each end to signal readiness to the
3564                // other.  Quoting the spec:
3565                //
3566                // ```
3567                // The meaning of a read or write when the length is 0 is that
3568                // the caller is querying the "readiness" of the other
3569                // side. When a 0-length read/write rendezvous with a
3570                // non-0-length read/write, only the 0-length read/write
3571                // completes; the non-0-length read/write is kept pending (and
3572                // ready for a subsequent rendezvous).
3573                //
3574                // In the corner case where a 0-length read and write
3575                // rendezvous, only the writer is notified of readiness. To
3576                // avoid livelock, the Canonical ABI requires that a writer must
3577                // (eventually) follow a completed 0-length write with a
3578                // non-0-length write that is allowed to block (allowing the
3579                // reader end to run and rendezvous with its own non-0-length
3580                // read).
3581                // ```
3582
3583                let write_complete = count == 0 || read_count > 0;
3584                let read_complete = count > 0;
3585                let read_buffer_remaining = count < read_count;
3586
3587                let read_handle_rep = transmit.read_handle.rep();
3588
3589                let count = count.min(read_count);
3590
3591                Instance::copy(
3592                    store.as_context_mut(),
3593                    flat_abi,
3594                    self,
3595                    caller,
3596                    ty,
3597                    options,
3598                    address,
3599                    read_instance,
3600                    read_caller_instance,
3601                    read_caller_thread,
3602                    read_ty,
3603                    read_options,
3604                    read_address,
3605                    count,
3606                    rep,
3607                )?;
3608
3609                let instance = read_instance.id().get(store.0);
3610                let types = instance.component().types();
3611                let item_size = match read_ty.payload(types) {
3612                    Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3613                    None => 0,
3614                };
3615                let concurrent_state = store.0.concurrent_state_mut();
3616                if read_complete {
3617                    let total = if let Some(Event::StreamRead {
3618                        code: ReturnCode::Completed(old_total),
3619                        ..
3620                    }) = concurrent_state.take_event(read_handle_rep)?
3621                    {
3622                        count.add(old_total)?
3623                    } else {
3624                        count
3625                    };
3626
3627                    let code = ReturnCode::completed(ty.kind(), total);
3628
3629                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3630                }
3631
3632                // If the reader still has buffer remaining, or if this was a
3633                // zero-length rendezvous, then restore the state of the reader
3634                // back to what it was when we found it. Note that for the
3635                // zero-length rendezvous case this specifically won't execute
3636                // the `read_complete` logic above, which is intentional, as the
3637                // reader remains blocked.
3638                if read_buffer_remaining || (count == 0 && read_count == 0) {
3639                    let transmit = concurrent_state.get_mut(transmit_id)?;
3640                    transmit.read = ReadState::GuestReady {
3641                        ty: read_ty,
3642                        flat_abi: read_flat_abi,
3643                        options: read_options,
3644                        address: read_address + (count.as_usize() * item_size),
3645                        count: read_count.sub(count)?,
3646                        handle: read_handle,
3647                        instance: read_instance,
3648                        caller_instance: read_caller_instance,
3649                        caller_thread: read_caller_thread,
3650                    };
3651                }
3652
3653                if write_complete {
3654                    ReturnCode::completed(ty.kind(), count)
3655                } else {
3656                    set_guest_ready(concurrent_state)?;
3657                    ReturnCode::Blocked
3658                }
3659            }
3660
3661            ReadState::HostReady {
3662                consume,
3663                guest_offset,
3664                cancel,
3665                cancel_waker,
3666            } => {
3667                if cancel_waker.is_some() {
3668                    bail_bug!("expected cancel_waker to be none");
3669                }
3670                if cancel {
3671                    bail_bug!("expected cancel to be false");
3672                }
3673                if guest_offset != 0 {
3674                    bail_bug!("expected guest_offset to be 0");
3675                }
3676
3677                if let TransmitIndex::Future(_) = ty {
3678                    transmit.done = true;
3679                }
3680
3681                set_guest_ready(concurrent_state)?;
3682                self.consume(
3683                    store.0,
3684                    ty.kind(),
3685                    transmit_id,
3686                    consume,
3687                    ItemCount::ZERO,
3688                    false,
3689                )?
3690            }
3691
3692            ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3693
3694            ReadState::Open => {
3695                set_guest_ready(concurrent_state)?;
3696                ReturnCode::Blocked
3697            }
3698
3699            ReadState::Dropped => {
3700                if let TransmitIndex::Future(_) = ty {
3701                    transmit.done = true;
3702                }
3703
3704                ReturnCode::Dropped(ItemCount::ZERO)
3705            }
3706        };
3707
3708        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3709            result = self.wait_for_write(store.0, transmit_handle)?;
3710        }
3711
3712        if result != ReturnCode::Blocked {
3713            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3714                TransmitLocalState::Write {
3715                    done: matches!(result, ReturnCode::Dropped(_)),
3716                };
3717        }
3718
3719        log::trace!(
3720            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3721        );
3722
3723        Ok(result)
3724    }
3725
3726    /// Read from the specified stream or future from the guest.
3727    pub(super) fn guest_read<T: 'static>(
3728        self,
3729        mut store: StoreContextMut<T>,
3730        caller_instance: RuntimeComponentInstanceIndex,
3731        ty: TransmitIndex,
3732        options: OptionsIndex,
3733        flat_abi: Option<FlatAbi>,
3734        handle: u32,
3735        address: u32,
3736        count: u32,
3737    ) -> Result<ReturnCode> {
3738        let count = ItemCount::new(count)?;
3739
3740        if !self.options(store.0, options).async_ {
3741            // The caller may only sync call `{stream,future}.read` from an
3742            // async task (i.e. a task created via a call to an async export).
3743            // Otherwise, we'll trap.
3744            store.0.check_blocking()?;
3745        }
3746
3747        let address = usize::try_from(address)?;
3748        self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3749        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3750        let TransmitLocalState::Read { done } = *state else {
3751            bail!(Trap::ConcurrentFutureStreamOp);
3752        };
3753
3754        if done {
3755            bail!("cannot read after being notified that the writable end dropped");
3756        }
3757
3758        *state = TransmitLocalState::Busy;
3759        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3760        let concurrent_state = store.0.concurrent_state_mut();
3761        let caller_thread = concurrent_state.current_guest_thread()?;
3762        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3763        let transmit = concurrent_state.get_mut(transmit_id)?;
3764        log::trace!(
3765            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3766            transmit.write
3767        );
3768
3769        if transmit.done {
3770            bail!("cannot read from future after previous read succeeded");
3771        }
3772
3773        let new_state = if let WriteState::Dropped = &transmit.write {
3774            WriteState::Dropped
3775        } else {
3776            WriteState::Open
3777        };
3778
3779        let set_guest_ready = |me: &mut ConcurrentState| {
3780            let transmit = me.get_mut(transmit_id)?;
3781            if !matches!(&transmit.read, ReadState::Open) {
3782                bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3783            }
3784            transmit.read = ReadState::GuestReady {
3785                ty,
3786                flat_abi,
3787                options,
3788                address,
3789                count,
3790                handle,
3791                instance: self,
3792                caller_instance,
3793                caller_thread,
3794            };
3795            Ok::<_, crate::Error>(())
3796        };
3797
3798        let mut result = match mem::replace(&mut transmit.write, new_state) {
3799            WriteState::GuestReady {
3800                instance: write_instance,
3801                ty: write_ty,
3802                flat_abi: write_flat_abi,
3803                options: write_options,
3804                address: write_address,
3805                count: write_count,
3806                handle: write_handle,
3807                caller: write_caller,
3808            } => {
3809                if flat_abi != write_flat_abi {
3810                    bail_bug!("expected flat ABI calculations to be the same");
3811                }
3812
3813                if let TransmitIndex::Future(_) = ty {
3814                    transmit.done = true;
3815                }
3816
3817                let write_handle_rep = transmit.write_handle.rep();
3818
3819                // See the comment in `guest_write` for the
3820                // `ReadState::GuestReady` case concerning zero-length reads and
3821                // writes.
3822
3823                let write_complete = write_count == 0 || count > 0;
3824                let read_complete = write_count > 0;
3825                let write_buffer_remaining = count < write_count;
3826
3827                let count = count.min(write_count);
3828
3829                Instance::copy(
3830                    store.as_context_mut(),
3831                    flat_abi,
3832                    write_instance,
3833                    write_caller,
3834                    write_ty,
3835                    write_options,
3836                    write_address,
3837                    self,
3838                    caller_instance,
3839                    caller_thread,
3840                    ty,
3841                    options,
3842                    address,
3843                    count,
3844                    rep,
3845                )?;
3846
3847                let instance = write_instance.id().get(store.0);
3848                let types = instance.component().types();
3849                let item_size = match write_ty.payload(types) {
3850                    Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3851                    None => 0,
3852                };
3853                let concurrent_state = store.0.concurrent_state_mut();
3854
3855                if write_complete {
3856                    let total = if let Some(Event::StreamWrite {
3857                        code: ReturnCode::Completed(old_total),
3858                        ..
3859                    }) = concurrent_state.take_event(write_handle_rep)?
3860                    {
3861                        count.add(old_total)?
3862                    } else {
3863                        count
3864                    };
3865
3866                    let code = ReturnCode::completed(ty.kind(), total);
3867
3868                    concurrent_state.send_write_result(
3869                        write_ty,
3870                        transmit_id,
3871                        write_handle,
3872                        code,
3873                    )?;
3874                }
3875
3876                if write_buffer_remaining {
3877                    let transmit = concurrent_state.get_mut(transmit_id)?;
3878                    transmit.write = WriteState::GuestReady {
3879                        instance: write_instance,
3880                        caller: write_caller,
3881                        ty: write_ty,
3882                        flat_abi: write_flat_abi,
3883                        options: write_options,
3884                        address: write_address + (count.as_usize() * item_size),
3885                        count: write_count.sub(count)?,
3886                        handle: write_handle,
3887                    };
3888                }
3889
3890                if read_complete {
3891                    ReturnCode::completed(ty.kind(), count)
3892                } else {
3893                    set_guest_ready(concurrent_state)?;
3894                    ReturnCode::Blocked
3895                }
3896            }
3897
3898            WriteState::HostReady {
3899                produce,
3900                try_into,
3901                guest_offset,
3902                cancel,
3903                cancel_waker,
3904            } => {
3905                if cancel_waker.is_some() {
3906                    bail_bug!("expected cancel_waker to be none");
3907                }
3908                if cancel {
3909                    bail_bug!("expected cancel to be false");
3910                }
3911                if guest_offset != 0 {
3912                    bail_bug!("expected guest_offset to be 0");
3913                }
3914
3915                set_guest_ready(concurrent_state)?;
3916
3917                let code = self.produce(
3918                    store.0,
3919                    ty.kind(),
3920                    transmit_id,
3921                    produce,
3922                    try_into,
3923                    ItemCount::ZERO,
3924                    false,
3925                )?;
3926
3927                if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3928                    store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3929                }
3930
3931                code
3932            }
3933
3934            WriteState::Open => {
3935                set_guest_ready(concurrent_state)?;
3936                ReturnCode::Blocked
3937            }
3938
3939            WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO),
3940        };
3941
3942        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3943            result = self.wait_for_read(store.0, transmit_handle)?;
3944        }
3945
3946        if result != ReturnCode::Blocked {
3947            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3948                TransmitLocalState::Read {
3949                    done: matches!(
3950                        (result, ty),
3951                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3952                    ),
3953                };
3954        }
3955
3956        log::trace!(
3957            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3958        );
3959
3960        Ok(result)
3961    }
3962
3963    fn wait_for_write(
3964        self,
3965        store: &mut StoreOpaque,
3966        handle: TableId<TransmitHandle>,
3967    ) -> Result<ReturnCode> {
3968        let waitable = Waitable::Transmit(handle);
3969        store.wait_for_event(waitable)?;
3970        let event = waitable.take_event(store.concurrent_state_mut())?;
3971        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3972            event
3973        {
3974            waitable.on_delivery(store, self, event)?;
3975            Ok(code)
3976        } else {
3977            bail_bug!("expected either a stream or future write event")
3978        }
3979    }
3980
3981    /// Cancel a pending stream or future write.
3982    fn cancel_write(
3983        self,
3984        store: &mut StoreOpaque,
3985        transmit_id: TableId<TransmitState>,
3986        async_: bool,
3987    ) -> Result<ReturnCode> {
3988        let state = store.concurrent_state_mut();
3989        let transmit = state.get_mut(transmit_id)?;
3990        log::trace!(
3991            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3992            transmit.read,
3993            transmit.write
3994        );
3995        let waitable = Waitable::Transmit(transmit.write_handle);
3996
3997        let code = if let Some(event) = waitable.take_event(state)? {
3998            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3999                bail_bug!("expected either a stream or future write event")
4000            };
4001            waitable.on_delivery(store, self, event)?;
4002            match (code, event) {
4003                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
4004                    ReturnCode::Cancelled(count)
4005                }
4006                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4007                _ => bail_bug!("unexpected code/event combo"),
4008            }
4009        } else if let ReadState::HostReady {
4010            cancel,
4011            cancel_waker,
4012            ..
4013        } = &mut state.get_mut(transmit_id)?.read
4014        {
4015            *cancel = true;
4016            if let Some(waker) = cancel_waker.take() {
4017                waker.wake();
4018            }
4019
4020            if async_ {
4021                ReturnCode::Blocked
4022            } else {
4023                let handle = store
4024                    .concurrent_state_mut()
4025                    .get_mut(transmit_id)?
4026                    .write_handle;
4027                self.wait_for_write(store, handle)?
4028            }
4029        } else {
4030            ReturnCode::Cancelled(ItemCount::ZERO)
4031        };
4032
4033        if !matches!(code, ReturnCode::Blocked) {
4034            let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4035
4036            match &transmit.write {
4037                WriteState::GuestReady { .. } => {
4038                    transmit.write = WriteState::Open;
4039                }
4040                WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
4041                WriteState::Open | WriteState::Dropped => {}
4042            }
4043        }
4044
4045        log::trace!("cancelled write {transmit_id:?}: {code:?}");
4046
4047        Ok(code)
4048    }
4049
4050    fn wait_for_read(
4051        self,
4052        store: &mut StoreOpaque,
4053        handle: TableId<TransmitHandle>,
4054    ) -> Result<ReturnCode> {
4055        let waitable = Waitable::Transmit(handle);
4056        store.wait_for_event(waitable)?;
4057        let event = waitable.take_event(store.concurrent_state_mut())?;
4058        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
4059            event
4060        {
4061            waitable.on_delivery(store, self, event)?;
4062            Ok(code)
4063        } else {
4064            bail_bug!("expected either a stream or future read event")
4065        }
4066    }
4067
4068    /// Cancel a pending stream or future read.
4069    fn cancel_read(
4070        self,
4071        store: &mut StoreOpaque,
4072        transmit_id: TableId<TransmitState>,
4073        async_: bool,
4074    ) -> Result<ReturnCode> {
4075        let state = store.concurrent_state_mut();
4076        let transmit = state.get_mut(transmit_id)?;
4077        log::trace!(
4078            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4079            transmit.read,
4080            transmit.write
4081        );
4082
4083        let waitable = Waitable::Transmit(transmit.read_handle);
4084        let code = if let Some(event) = waitable.take_event(state)? {
4085            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4086                bail_bug!("expected either a stream or future read event")
4087            };
4088            waitable.on_delivery(store, self, event)?;
4089            match (code, event) {
4090                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4091                    ReturnCode::Cancelled(count)
4092                }
4093                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4094                _ => bail_bug!("unexpected code/event combo"),
4095            }
4096        } else if let WriteState::HostReady {
4097            cancel,
4098            cancel_waker,
4099            ..
4100        } = &mut state.get_mut(transmit_id)?.write
4101        {
4102            *cancel = true;
4103            if let Some(waker) = cancel_waker.take() {
4104                waker.wake();
4105            }
4106
4107            if async_ {
4108                ReturnCode::Blocked
4109            } else {
4110                let handle = store
4111                    .concurrent_state_mut()
4112                    .get_mut(transmit_id)?
4113                    .read_handle;
4114                self.wait_for_read(store, handle)?
4115            }
4116        } else {
4117            ReturnCode::Cancelled(ItemCount::ZERO)
4118        };
4119
4120        if !matches!(code, ReturnCode::Blocked) {
4121            let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4122
4123            match &transmit.read {
4124                ReadState::GuestReady { .. } => {
4125                    transmit.read = ReadState::Open;
4126                }
4127                ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4128                    bail_bug!("support host read cancellation")
4129                }
4130                ReadState::Open | ReadState::Dropped => {}
4131            }
4132        }
4133
4134        log::trace!("cancelled read {transmit_id:?}: {code:?}");
4135
4136        Ok(code)
4137    }
4138
4139    /// Cancel a pending write for the specified stream or future from the guest.
4140    fn guest_cancel_write(
4141        self,
4142        store: &mut StoreOpaque,
4143        ty: TransmitIndex,
4144        async_: bool,
4145        writer: u32,
4146    ) -> Result<ReturnCode> {
4147        if !async_ {
4148            // The caller may only sync call `{stream,future}.cancel-write` from
4149            // an async task (i.e. a task created via a call to an async
4150            // export).  Otherwise, we'll trap.
4151            store.check_blocking()?;
4152        }
4153
4154        let (rep, state) =
4155            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4156        let id = TableId::<TransmitHandle>::new(rep);
4157        log::trace!("guest cancel write {id:?} (handle {writer})");
4158        match state {
4159            TransmitLocalState::Write { .. } => {
4160                bail!("stream or future write cancelled when no write is pending")
4161            }
4162            TransmitLocalState::Read { .. } => {
4163                bail!("passed read end to `{{stream|future}}.cancel-write`")
4164            }
4165            TransmitLocalState::Busy => {}
4166        }
4167        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4168        let code = self.cancel_write(store, transmit_id, async_)?;
4169        if !matches!(code, ReturnCode::Blocked) {
4170            let state =
4171                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4172                    .1;
4173            if let TransmitLocalState::Busy = state {
4174                *state = TransmitLocalState::Write { done: false };
4175            }
4176        }
4177        Ok(code)
4178    }
4179
4180    /// Cancel a pending read for the specified stream or future from the guest.
4181    fn guest_cancel_read(
4182        self,
4183        store: &mut StoreOpaque,
4184        ty: TransmitIndex,
4185        async_: bool,
4186        reader: u32,
4187    ) -> Result<ReturnCode> {
4188        if !async_ {
4189            // The caller may only sync call `{stream,future}.cancel-read` from
4190            // an async task (i.e. a task created via a call to an async
4191            // export).  Otherwise, we'll trap.
4192            store.check_blocking()?;
4193        }
4194
4195        let (rep, state) =
4196            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4197        let id = TableId::<TransmitHandle>::new(rep);
4198        log::trace!("guest cancel read {id:?} (handle {reader})");
4199        match state {
4200            TransmitLocalState::Read { .. } => {
4201                bail!("stream or future read cancelled when no read is pending")
4202            }
4203            TransmitLocalState::Write { .. } => {
4204                bail!("passed write end to `{{stream|future}}.cancel-read`")
4205            }
4206            TransmitLocalState::Busy => {}
4207        }
4208        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4209        let code = self.cancel_read(store, transmit_id, async_)?;
4210        if !matches!(code, ReturnCode::Blocked) {
4211            let state =
4212                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4213                    .1;
4214            if let TransmitLocalState::Busy = state {
4215                *state = TransmitLocalState::Read { done: false };
4216            }
4217        }
4218        Ok(code)
4219    }
4220
4221    /// Drop the readable end of the specified stream or future from the guest.
4222    fn guest_drop_readable(
4223        self,
4224        store: &mut StoreOpaque,
4225        ty: TransmitIndex,
4226        reader: u32,
4227    ) -> Result<()> {
4228        let table = self.id().get_mut(store).table_for_transmit(ty);
4229        let (rep, _is_done) = match ty {
4230            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4231            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4232        };
4233        let kind = match ty {
4234            TransmitIndex::Stream(_) => TransmitKind::Stream,
4235            TransmitIndex::Future(_) => TransmitKind::Future,
4236        };
4237        let id = TableId::<TransmitHandle>::new(rep);
4238        log::trace!("guest_drop_readable: drop reader {id:?}");
4239        store.host_drop_reader(id, kind)
4240    }
4241
4242    /// Create a new error context for the given component.
4243    pub(crate) fn error_context_new(
4244        self,
4245        store: &mut StoreOpaque,
4246        ty: TypeComponentLocalErrorContextTableIndex,
4247        options: OptionsIndex,
4248        debug_msg_address: u32,
4249        debug_msg_len: u32,
4250    ) -> Result<u32> {
4251        let lift_ctx = &mut LiftContext::new(store, options, self);
4252        let debug_msg = String::linear_lift_from_flat(
4253            lift_ctx,
4254            InterfaceType::String,
4255            &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4256        )?;
4257
4258        // Create a new ErrorContext that is tracked along with other concurrent state
4259        let err_ctx = ErrorContextState { debug_msg };
4260        let state = store.concurrent_state_mut();
4261        let table_id = state.push(err_ctx)?;
4262        let global_ref_count_idx =
4263            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4264
4265        // Add to the global error context ref counts
4266        let _ = state
4267            .global_error_context_ref_counts
4268            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4269
4270        // Error context are tracked both locally (to a single component instance) and globally
4271        // the counts for both must stay in sync.
4272        //
4273        // Here we reflect the newly created global concurrent error context state into the
4274        // component instance's locally tracked count, along with the appropriate key into the global
4275        // ref tracking data structures to enable later lookup
4276        let local_idx = self
4277            .id()
4278            .get_mut(store)
4279            .table_for_error_context(ty)
4280            .error_context_insert(table_id.rep())?;
4281
4282        Ok(local_idx)
4283    }
4284
4285    /// Retrieve the debug message from the specified error context.
4286    pub(super) fn error_context_debug_message<T>(
4287        self,
4288        store: StoreContextMut<T>,
4289        ty: TypeComponentLocalErrorContextTableIndex,
4290        options: OptionsIndex,
4291        err_ctx_handle: u32,
4292        debug_msg_address: u32,
4293    ) -> Result<()> {
4294        // Retrieve the error context and internal debug message
4295        let handle_table_id_rep = self
4296            .id()
4297            .get_mut(store.0)
4298            .table_for_error_context(ty)
4299            .error_context_rep(err_ctx_handle)?;
4300
4301        let state = store.0.concurrent_state_mut();
4302        // Get the state associated with the error context
4303        let ErrorContextState { debug_msg } =
4304            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4305        let debug_msg = debug_msg.clone();
4306
4307        let lower_cx = &mut LowerContext::new(store, options, self);
4308        let debug_msg_address = usize::try_from(debug_msg_address)?;
4309        // Lower the string into the component's memory.
4310        //
4311        // Note that the "8" here is the size of a WIT `string` in linear
4312        // memory, the ptr+length. This'll need to be updated when `memory64`
4313        // comes along. (FIXME(#4311))
4314        let offset = lower_cx
4315            .as_slice_mut()
4316            .get(debug_msg_address..)
4317            .and_then(|b| b.get(..8))
4318            .map(|_| debug_msg_address)
4319            .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4320        debug_msg
4321            .as_str()
4322            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4323
4324        Ok(())
4325    }
4326
4327    /// Implements the `future.cancel-read` intrinsic.
4328    pub(crate) fn future_cancel_read(
4329        self,
4330        store: &mut StoreOpaque,
4331        ty: TypeFutureTableIndex,
4332        async_: bool,
4333        reader: u32,
4334    ) -> Result<u32> {
4335        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4336            .map(|v| v.encode())
4337    }
4338
4339    /// Implements the `future.cancel-write` intrinsic.
4340    pub(crate) fn future_cancel_write(
4341        self,
4342        store: &mut StoreOpaque,
4343        ty: TypeFutureTableIndex,
4344        async_: bool,
4345        writer: u32,
4346    ) -> Result<u32> {
4347        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4348            .map(|v| v.encode())
4349    }
4350
4351    /// Implements the `stream.cancel-read` intrinsic.
4352    pub(crate) fn stream_cancel_read(
4353        self,
4354        store: &mut StoreOpaque,
4355        ty: TypeStreamTableIndex,
4356        async_: bool,
4357        reader: u32,
4358    ) -> Result<u32> {
4359        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4360            .map(|v| v.encode())
4361    }
4362
4363    /// Implements the `stream.cancel-write` intrinsic.
4364    pub(crate) fn stream_cancel_write(
4365        self,
4366        store: &mut StoreOpaque,
4367        ty: TypeStreamTableIndex,
4368        async_: bool,
4369        writer: u32,
4370    ) -> Result<u32> {
4371        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4372            .map(|v| v.encode())
4373    }
4374
4375    /// Implements the `future.drop-readable` intrinsic.
4376    pub(crate) fn future_drop_readable(
4377        self,
4378        store: &mut StoreOpaque,
4379        ty: TypeFutureTableIndex,
4380        reader: u32,
4381    ) -> Result<()> {
4382        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4383    }
4384
4385    /// Implements the `stream.drop-readable` intrinsic.
4386    pub(crate) fn stream_drop_readable(
4387        self,
4388        store: &mut StoreOpaque,
4389        ty: TypeStreamTableIndex,
4390        reader: u32,
4391    ) -> Result<()> {
4392        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4393    }
4394
4395    /// Allocate a new future or stream and grant ownership of both the read and
4396    /// write ends to the (sub-)component instance to which the specified
4397    /// `TransmitIndex` belongs.
4398    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4399        let (write, read) = store
4400            .concurrent_state_mut()
4401            .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4402
4403        let table = self.id().get_mut(store).table_for_transmit(ty);
4404        let (read_handle, write_handle) = match ty {
4405            TransmitIndex::Future(ty) => (
4406                table.future_insert_read(ty, read.rep())?,
4407                table.future_insert_write(ty, write.rep())?,
4408            ),
4409            TransmitIndex::Stream(ty) => (
4410                table.stream_insert_read(ty, read.rep())?,
4411                table.stream_insert_write(ty, write.rep())?,
4412            ),
4413        };
4414
4415        let state = store.concurrent_state_mut();
4416        state.get_mut(read)?.common.handle = Some(read_handle);
4417        state.get_mut(write)?.common.handle = Some(write_handle);
4418
4419        Ok(ResourcePair {
4420            write: write_handle,
4421            read: read_handle,
4422        })
4423    }
4424
4425    /// Drop the specified error context.
4426    pub(crate) fn error_context_drop(
4427        self,
4428        store: &mut StoreOpaque,
4429        ty: TypeComponentLocalErrorContextTableIndex,
4430        error_context: u32,
4431    ) -> Result<()> {
4432        let instance = self.id().get_mut(store);
4433
4434        let local_handle_table = instance.table_for_error_context(ty);
4435
4436        let rep = local_handle_table.error_context_drop(error_context)?;
4437
4438        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4439
4440        let state = store.concurrent_state_mut();
4441        let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4442            .global_error_context_ref_counts
4443            .get_mut(&global_ref_count_idx)
4444        else {
4445            bail_bug!("retrieve concurrent state for error context during drop")
4446        };
4447
4448        // Reduce the component-global ref count, removing tracking if necessary
4449        if *global_ref_count < 1 {
4450            bail_bug!("ref count unexpectedly zero");
4451        }
4452        *global_ref_count -= 1;
4453        if *global_ref_count == 0 {
4454            state
4455                .global_error_context_ref_counts
4456                .remove(&global_ref_count_idx);
4457
4458            state
4459                .delete(TableId::<ErrorContextState>::new(rep))
4460                .context("deleting component-global error context data")?;
4461        }
4462
4463        Ok(())
4464    }
4465
4466    /// Transfer ownership of the specified stream or future read end from one
4467    /// guest to another.
4468    fn guest_transfer(
4469        self,
4470        store: &mut StoreOpaque,
4471        src_idx: u32,
4472        src: TransmitIndex,
4473        dst: TransmitIndex,
4474    ) -> Result<u32> {
4475        let id = self.lift_index_to_transmit(store, src, src_idx)?;
4476        self.lower_transmit_to_index(store, dst, id)
4477    }
4478
4479    fn lift_index_to_transmit(
4480        self,
4481        store: &mut StoreOpaque,
4482        ty: TransmitIndex,
4483        src_idx: u32,
4484    ) -> Result<TableId<TransmitHandle>> {
4485        let (state, _, _, instance) = store.lift_context_parts(self);
4486        lift_index_to_transmit(instance, state.concurrent_state_mut(), ty, src_idx)
4487    }
4488
4489    fn lower_transmit_to_index(
4490        self,
4491        store: &mut StoreOpaque,
4492        ty: TransmitIndex,
4493        id: TableId<TransmitHandle>,
4494    ) -> Result<u32> {
4495        let (state, _, _, instance) = store.lift_context_parts(self);
4496        lower_transmit_to_index(instance, state.concurrent_state_mut(), ty, id)
4497    }
4498
4499    /// Implements the `future.new` intrinsic.
4500    pub(crate) fn future_new(
4501        self,
4502        store: &mut StoreOpaque,
4503        ty: TypeFutureTableIndex,
4504    ) -> Result<ResourcePair> {
4505        self.guest_new(store, TransmitIndex::Future(ty))
4506    }
4507
4508    /// Implements the `stream.new` intrinsic.
4509    pub(crate) fn stream_new(
4510        self,
4511        store: &mut StoreOpaque,
4512        ty: TypeStreamTableIndex,
4513    ) -> Result<ResourcePair> {
4514        self.guest_new(store, TransmitIndex::Stream(ty))
4515    }
4516
4517    /// Transfer ownership of the specified future read end from one guest to
4518    /// another.
4519    pub(crate) fn future_transfer(
4520        self,
4521        store: &mut StoreOpaque,
4522        src_idx: u32,
4523        src: TypeFutureTableIndex,
4524        dst: TypeFutureTableIndex,
4525    ) -> Result<u32> {
4526        self.guest_transfer(
4527            store,
4528            src_idx,
4529            TransmitIndex::Future(src),
4530            TransmitIndex::Future(dst),
4531        )
4532    }
4533
4534    /// Transfer ownership of the specified stream read end from one guest to
4535    /// another.
4536    pub(crate) fn stream_transfer(
4537        self,
4538        store: &mut StoreOpaque,
4539        src_idx: u32,
4540        src: TypeStreamTableIndex,
4541        dst: TypeStreamTableIndex,
4542    ) -> Result<u32> {
4543        self.guest_transfer(
4544            store,
4545            src_idx,
4546            TransmitIndex::Stream(src),
4547            TransmitIndex::Stream(dst),
4548        )
4549    }
4550
4551    /// Copy the specified error context from one component to another.
4552    pub(crate) fn error_context_transfer(
4553        self,
4554        store: &mut StoreOpaque,
4555        src_idx: u32,
4556        src: TypeComponentLocalErrorContextTableIndex,
4557        dst: TypeComponentLocalErrorContextTableIndex,
4558    ) -> Result<u32> {
4559        let mut instance = self.id().get_mut(store);
4560        let rep = instance
4561            .as_mut()
4562            .table_for_error_context(src)
4563            .error_context_rep(src_idx)?;
4564        let dst_idx = instance
4565            .table_for_error_context(dst)
4566            .error_context_insert(rep)?;
4567
4568        // Update the global (cross-subcomponent) count for error contexts
4569        // as the new component has essentially created a new reference that will
4570        // be dropped/handled independently
4571        let global_ref_count = store
4572            .concurrent_state_mut()
4573            .global_error_context_ref_counts
4574            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4575            .context("global ref count present for existing (sub)component error context")?;
4576
4577        global_ref_count.0 = global_ref_count
4578            .0
4579            .checked_add(1)
4580            .ok_or_else(|| format_err!(Trap::ReferenceCountOverflow))?;
4581
4582        Ok(dst_idx)
4583    }
4584}
4585
4586/// Performs the opertion of lifting a future or stream from and instance into
4587/// the `TransmitHandle` for it.
4588///
4589/// The `src_idx` is the guest-specified index within `instance` and `ty` is the
4590/// expected type of future/stream.
4591fn lift_index_to_transmit(
4592    instance: Pin<&mut ComponentInstance>,
4593    concurrent_state: &mut ConcurrentState,
4594    ty: TransmitIndex,
4595    src_idx: u32,
4596) -> Result<TableId<TransmitHandle>> {
4597    let handle_table = instance.table_for_transmit(ty);
4598    let (rep, is_done) = match ty {
4599        TransmitIndex::Future(idx) => handle_table.future_remove_readable(idx, src_idx)?,
4600        TransmitIndex::Stream(idx) => handle_table.stream_remove_readable(idx, src_idx)?,
4601    };
4602    let desc = match ty {
4603        TransmitIndex::Future(_) => "future",
4604        TransmitIndex::Stream(_) => "stream",
4605    };
4606    if is_done {
4607        bail!("cannot lift {desc} after being notified that the writable end dropped");
4608    }
4609    let id = TableId::<TransmitHandle>::new(rep);
4610    let future = concurrent_state.get_mut(id)?;
4611    if future.common.set.is_some() {
4612        bail!("cannot lift {desc} while it's in a waitable set");
4613    }
4614    future.common.handle = None;
4615
4616    let state = future.state;
4617    if concurrent_state.get_mut(state)?.done {
4618        bail!("cannot lift {desc} after previous read succeeded");
4619    }
4620
4621    Ok(id)
4622}
4623
4624/// Performs the opertion of lowering a future or stream `TransmitHandle` into
4625/// an instance.
4626fn lower_transmit_to_index(
4627    instance: Pin<&mut ComponentInstance>,
4628    concurrent_state: &mut ConcurrentState,
4629    ty: TransmitIndex,
4630    id: TableId<TransmitHandle>,
4631) -> Result<u32> {
4632    let state = concurrent_state.get_mut(id)?.state;
4633    debug_assert_eq!(concurrent_state.get_mut(state)?.read_handle, id);
4634    let handle_table = instance.table_for_transmit(ty);
4635    let handle = match ty {
4636        TransmitIndex::Future(idx) => handle_table.future_insert_read(idx, id.rep()),
4637        TransmitIndex::Stream(idx) => handle_table.stream_insert_read(idx, id.rep()),
4638    }?;
4639    concurrent_state.get_mut(id)?.common.handle = Some(handle);
4640    Ok(handle)
4641}
4642
4643impl ComponentInstance {
4644    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4645        let (states, types) = self.instance_states();
4646        let runtime_instance = match ty {
4647            TransmitIndex::Stream(ty) => types[ty].instance,
4648            TransmitIndex::Future(ty) => types[ty].instance,
4649        };
4650        states[runtime_instance].handle_table()
4651    }
4652
4653    fn table_for_error_context(
4654        self: Pin<&mut Self>,
4655        ty: TypeComponentLocalErrorContextTableIndex,
4656    ) -> &mut HandleTable {
4657        let (states, types) = self.instance_states();
4658        let runtime_instance = types[ty].instance;
4659        states[runtime_instance].handle_table()
4660    }
4661
4662    fn get_mut_by_index(
4663        self: Pin<&mut Self>,
4664        ty: TransmitIndex,
4665        index: u32,
4666    ) -> Result<(u32, &mut TransmitLocalState)> {
4667        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4668    }
4669}
4670
4671impl ConcurrentState {
4672    fn send_write_result(
4673        &mut self,
4674        ty: TransmitIndex,
4675        id: TableId<TransmitState>,
4676        handle: u32,
4677        code: ReturnCode,
4678    ) -> Result<()> {
4679        let write_handle = self.get_mut(id)?.write_handle.rep();
4680        self.set_event(
4681            write_handle,
4682            match ty {
4683                TransmitIndex::Future(ty) => Event::FutureWrite {
4684                    code,
4685                    pending: Some((ty, handle)),
4686                },
4687                TransmitIndex::Stream(ty) => Event::StreamWrite {
4688                    code,
4689                    pending: Some((ty, handle)),
4690                },
4691            },
4692        )
4693    }
4694
4695    fn send_read_result(
4696        &mut self,
4697        ty: TransmitIndex,
4698        id: TableId<TransmitState>,
4699        handle: u32,
4700        code: ReturnCode,
4701    ) -> Result<()> {
4702        let read_handle = self.get_mut(id)?.read_handle.rep();
4703        self.set_event(
4704            read_handle,
4705            match ty {
4706                TransmitIndex::Future(ty) => Event::FutureRead {
4707                    code,
4708                    pending: Some((ty, handle)),
4709                },
4710                TransmitIndex::Stream(ty) => Event::StreamRead {
4711                    code,
4712                    pending: Some((ty, handle)),
4713                },
4714            },
4715        )
4716    }
4717
4718    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4719        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4720    }
4721
4722    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4723        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4724    }
4725
4726    /// Set or update the event for the specified waitable.
4727    ///
4728    /// If there is already an event set for this waitable, we assert that it is
4729    /// of the same variant as the new one and reuse the `ReturnCode` count and
4730    /// the `pending` field if applicable.
4731    // TODO: This is a bit awkward due to how
4732    // `Event::{Stream,Future}{Write,Read}` and
4733    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4734    // Consider updating those representations in a way that allows this
4735    // function to be simplified.
4736    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4737        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4738
4739        fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4740            let (ReturnCode::Completed(count)
4741            | ReturnCode::Dropped(count)
4742            | ReturnCode::Cancelled(count)) = old
4743            else {
4744                bail_bug!("unexpected old return code")
4745            };
4746
4747            Ok(match new {
4748                ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count),
4749                ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count),
4750                _ => bail_bug!("unexpected new return code"),
4751            })
4752        }
4753
4754        let event = match (waitable.take_event(self)?, event) {
4755            (None, _) => event,
4756            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4757            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4758            (
4759                Some(Event::StreamWrite {
4760                    code: old_code,
4761                    pending: old_pending,
4762                }),
4763                Event::StreamWrite { code, pending },
4764            ) => Event::StreamWrite {
4765                code: update_code(old_code, code)?,
4766                pending: old_pending.or(pending),
4767            },
4768            (
4769                Some(Event::StreamRead {
4770                    code: old_code,
4771                    pending: old_pending,
4772                }),
4773                Event::StreamRead { code, pending },
4774            ) => Event::StreamRead {
4775                code: update_code(old_code, code)?,
4776                pending: old_pending.or(pending),
4777            },
4778            _ => bail_bug!("unexpected event combination"),
4779        };
4780
4781        waitable.set_event(self, Some(event))
4782    }
4783
4784    /// Allocate a new future or stream, including the `TransmitState` and the
4785    /// `TransmitHandle`s corresponding to the read and write ends.
4786    fn new_transmit(
4787        &mut self,
4788        origin: TransmitOrigin,
4789    ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4790        let state_id = self.push(TransmitState::new(origin))?;
4791
4792        let write = self.push(TransmitHandle::new(state_id))?;
4793        let read = self.push(TransmitHandle::new(state_id))?;
4794
4795        let state = self.get_mut(state_id)?;
4796        state.write_handle = write;
4797        state.read_handle = read;
4798
4799        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4800
4801        Ok((write, read))
4802    }
4803
4804    /// Delete the specified future or stream, including the read and write ends.
4805    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4806        let state = self.delete(state_id)?;
4807        self.delete(state.write_handle)?;
4808        self.delete(state.read_handle)?;
4809
4810        log::trace!(
4811            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4812            state.write_handle,
4813            state.read_handle,
4814        );
4815
4816        Ok(())
4817    }
4818}
4819
4820pub(crate) struct ResourcePair {
4821    pub(crate) write: u32,
4822    pub(crate) read: u32,
4823}
4824
4825impl Waitable {
4826    /// Handle the imminent delivery of the specified event, e.g. by updating
4827    /// the state of the stream or future.
4828    pub(super) fn on_delivery(
4829        &self,
4830        store: &mut StoreOpaque,
4831        instance: Instance,
4832        event: Event,
4833    ) -> Result<()> {
4834        let instance = instance.id().get_mut(store);
4835        let (rep, state, code) = match event {
4836            Event::FutureRead {
4837                pending: Some((ty, handle)),
4838                code,
4839            }
4840            | Event::FutureWrite {
4841                pending: Some((ty, handle)),
4842                code,
4843            } => {
4844                let runtime_instance = instance.component().types()[ty].instance;
4845                let (rep, state) = instance.instance_states().0[runtime_instance]
4846                    .handle_table()
4847                    .future_rep(ty, handle)?;
4848                (rep, state, code)
4849            }
4850            Event::StreamRead {
4851                pending: Some((ty, handle)),
4852                code,
4853            }
4854            | Event::StreamWrite {
4855                pending: Some((ty, handle)),
4856                code,
4857            } => {
4858                let runtime_instance = instance.component().types()[ty].instance;
4859                let (rep, state) = instance.instance_states().0[runtime_instance]
4860                    .handle_table()
4861                    .stream_rep(ty, handle)?;
4862                (rep, state, code)
4863            }
4864            _ => return Ok(()),
4865        };
4866        if rep != self.rep() {
4867            bail_bug!("unexpected rep mismatch");
4868        }
4869        if *state != TransmitLocalState::Busy {
4870            bail_bug!("expected state to be busy");
4871        }
4872        let done = matches!(code, ReturnCode::Dropped(_));
4873        *state = match event {
4874            Event::FutureRead { .. } | Event::StreamRead { .. } => {
4875                TransmitLocalState::Read { done }
4876            }
4877            Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
4878                TransmitLocalState::Write { done }
4879            }
4880            _ => bail_bug!("unexpected event for stream"),
4881        };
4882
4883        let transmit_handle = TableId::<TransmitHandle>::new(rep);
4884        let state = store.concurrent_state_mut();
4885        let transmit_id = state.get_mut(transmit_handle)?.state;
4886        let transmit = state.get_mut(transmit_id)?;
4887
4888        match event {
4889            Event::StreamRead { .. } => {
4890                transmit.read = ReadState::Open;
4891            }
4892            Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4893            _ => {}
4894        }
4895        Ok(())
4896    }
4897}
4898
4899/// Determine whether an intra-component read/write is allowed for the specified
4900/// `stream` or `future` payload type according to the component model
4901/// specification.
4902fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
4903    matches!(
4904        ty,
4905        None | Some(
4906            InterfaceType::S8
4907                | InterfaceType::U8
4908                | InterfaceType::S16
4909                | InterfaceType::U16
4910                | InterfaceType::S32
4911                | InterfaceType::U32
4912                | InterfaceType::S64
4913                | InterfaceType::U64
4914                | InterfaceType::Float32
4915                | InterfaceType::Float64
4916        )
4917    )
4918}
4919
4920/// Helper structure to manage moving a `T` in/out of an interior `Mutex` which
4921/// contains an
4922/// `Option<T>`
4923struct LockedState<T> {
4924    inner: TryMutex<Option<T>>,
4925}
4926
4927impl<T> LockedState<T> {
4928    /// Creates a new initial state with `value` stored.
4929    fn new(value: T) -> Self {
4930        Self {
4931            inner: TryMutex::new(Some(value)),
4932        }
4933    }
4934
4935    /// Attempts to lock the inner mutex and return its guard.
4936    ///
4937    /// # Errors
4938    ///
4939    /// Fails if this lock is either poisoned or if it's currently locked.
4940    /// As-used in this file there should never actually be contention on this
4941    /// lock nor recursive access so failing to acquire the lock is a fatal
4942    /// error that gets propagated upwards.
4943    fn try_lock(&self) -> Result<TryMutexGuard<'_, Option<T>>> {
4944        match self.inner.try_lock() {
4945            Some(lock) => Ok(lock),
4946            None => bail_bug!("should not have contention on state lock"),
4947        }
4948    }
4949
4950    /// Takes the inner `T` out of this state, returning it as a guard which
4951    /// will put it back when finished.
4952    ///
4953    /// # Errors
4954    ///
4955    /// Returns an error if the state `T` isn't present.
4956    fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4957        let result = self.try_lock()?.take();
4958        match result {
4959            Some(result) => Ok(LockedStateGuard {
4960                value: ManuallyDrop::new(result),
4961                state: self,
4962            }),
4963            None => bail_bug!("lock value unexpectedly missing"),
4964        }
4965    }
4966
4967    /// Performs the operation `f` on the inner state `&mut T`.
4968    ///
4969    /// This will acquire the internal lock and invoke `f`, so `f` should not
4970    /// expect to be able to recursively acquire this lock.
4971    ///
4972    /// # Errors
4973    ///
4974    /// Returns an error if the state `T` isn't present.
4975    fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4976        let mut inner = self.try_lock()?;
4977        match &mut *inner {
4978            Some(state) => Ok(f(state)),
4979            None => bail_bug!("lock value unexpectedly missing"),
4980        }
4981    }
4982}
4983
4984/// Helper structure returned from [`LockedState::take`] which will put the
4985/// state specified by `value` back into the original lock once this is dropped.
4986struct LockedStateGuard<'a, T> {
4987    value: ManuallyDrop<T>,
4988    state: &'a LockedState<T>,
4989}
4990
4991impl<T> Deref for LockedStateGuard<'_, T> {
4992    type Target = T;
4993
4994    fn deref(&self) -> &T {
4995        &self.value
4996    }
4997}
4998
4999impl<T> DerefMut for LockedStateGuard<'_, T> {
5000    fn deref_mut(&mut self) -> &mut T {
5001        &mut self.value
5002    }
5003}
5004
5005impl<T> Drop for LockedStateGuard<'_, T> {
5006    fn drop(&mut self) {
5007        // SAFETY: `ManuallyDrop::take` requires that after invoked the
5008        // original value is not read. This is the `Drop` for this type which
5009        // means we have exclusive ownership and it is not read further in the
5010        // destructor, satisfying this requirement.
5011        let value = unsafe { ManuallyDrop::take(&mut self.value) };
5012
5013        // If this fails due to contention that's a bug, but we're not in a
5014        // position to panic due to this being a destructor nor return an error,
5015        // so defer the bug to showing up later.
5016        if let Ok(mut lock) = self.state.try_lock() {
5017            *lock = Some(value);
5018        }
5019    }
5020}
5021
5022#[cfg(test)]
5023mod tests {
5024    use super::*;
5025    use crate::{Engine, Store};
5026    use core::future::pending;
5027    use core::pin::pin;
5028    use std::sync::LazyLock;
5029
5030    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
5031
5032    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
5033    where
5034        T: FutureProducer<()>,
5035    {
5036        rx.poll_produce(
5037            &mut Context::from_waker(Waker::noop()),
5038            Store::new(&ENGINE, ()).as_context_mut(),
5039            finish,
5040        )
5041    }
5042
5043    #[test]
5044    fn future_producer() {
5045        let mut fut = pin!(async { crate::error::Ok(()) });
5046        assert!(matches!(
5047            poll_future_producer(fut.as_mut(), false),
5048            Poll::Ready(Ok(Some(()))),
5049        ));
5050
5051        let mut fut = pin!(async { crate::error::Ok(()) });
5052        assert!(matches!(
5053            poll_future_producer(fut.as_mut(), true),
5054            Poll::Ready(Ok(Some(()))),
5055        ));
5056
5057        let mut fut = pin!(pending::<Result<()>>());
5058        assert!(matches!(
5059            poll_future_producer(fut.as_mut(), false),
5060            Poll::Pending,
5061        ));
5062        assert!(matches!(
5063            poll_future_producer(fut.as_mut(), true),
5064            Poll::Ready(Ok(None)),
5065        ));
5066
5067        let (tx, rx) = oneshot::channel();
5068        let mut rx = pin!(rx);
5069        assert!(matches!(
5070            poll_future_producer(rx.as_mut(), false),
5071            Poll::Pending,
5072        ));
5073        assert!(matches!(
5074            poll_future_producer(rx.as_mut(), true),
5075            Poll::Ready(Ok(None)),
5076        ));
5077        tx.send(()).unwrap();
5078        assert!(matches!(
5079            poll_future_producer(rx.as_mut(), true),
5080            Poll::Ready(Ok(Some(()))),
5081        ));
5082
5083        let (tx, rx) = oneshot::channel();
5084        let mut rx = pin!(rx);
5085        tx.send(()).unwrap();
5086        assert!(matches!(
5087            poll_future_producer(rx.as_mut(), false),
5088            Poll::Ready(Ok(Some(()))),
5089        ));
5090
5091        let (tx, rx) = oneshot::channel::<()>();
5092        let mut rx = pin!(rx);
5093        drop(tx);
5094        assert!(matches!(
5095            poll_future_producer(rx.as_mut(), false),
5096            Poll::Ready(Err(..)),
5097        ));
5098
5099        let (tx, rx) = oneshot::channel::<()>();
5100        let mut rx = pin!(rx);
5101        drop(tx);
5102        assert!(matches!(
5103            poll_future_producer(rx.as_mut(), true),
5104            Poll::Ready(Err(..)),
5105        ));
5106    }
5107}