Skip to main content

wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9    AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10    Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17    Error, Result, Trap, bail, bail_bug, ensure,
18    error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, ManuallyDrop, MaybeUninit};
26use core::ops::{Deref, DerefMut};
27use core::pin::Pin;
28use core::task::{Context, Poll, Waker, ready};
29use futures::channel::oneshot;
30use futures::{FutureExt as _, stream};
31use std::any::{Any, TypeId};
32use std::boxed::Box;
33use std::io::Cursor;
34use std::string::String;
35use std::sync::{Arc, Mutex, MutexGuard};
36use std::vec::Vec;
37use wasmtime_environ::component::{
38    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
39    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
40    TypeFutureTableIndex, TypeStreamTableIndex,
41};
42
43pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
44
45mod buffers;
46
47/// Enum for distinguishing between a stream or future in functions that handle
48/// both.
49#[derive(Copy, Clone, Debug)]
50pub enum TransmitKind {
51    Stream,
52    Future,
53}
54
55/// Represents `{stream,future}.{read,write}` results.
56#[derive(Copy, Clone, Debug, PartialEq)]
57pub enum ReturnCode {
58    Blocked,
59    Completed(ItemCount),
60    Dropped(ItemCount),
61    Cancelled(ItemCount),
62}
63
64impl ReturnCode {
65    /// Pack `self` into a single 32-bit integer that may be returned to the
66    /// guest.
67    ///
68    /// This corresponds to `pack_copy_result` in the Component Model spec.
69    pub fn encode(&self) -> u32 {
70        const BLOCKED: u32 = 0xffff_ffff;
71        const COMPLETED: u32 = 0x0;
72        const DROPPED: u32 = 0x1;
73        const CANCELLED: u32 = 0x2;
74        match self {
75            ReturnCode::Blocked => BLOCKED,
76            ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED,
77            ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED,
78            ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED,
79        }
80    }
81
82    /// Returns `Self::Completed` with the specified count (or zero if
83    /// `matches!(kind, TransmitKind::Future)`)
84    fn completed(kind: TransmitKind, count: ItemCount) -> Self {
85        Self::Completed(if let TransmitKind::Future = kind {
86            ItemCount::ZERO
87        } else {
88            count
89        })
90    }
91}
92
93/// Representation of how many items are being operated on in a stream read or
94/// write.
95///
96/// The component model requires that stream operations are limited to `1<<28`
97/// items in one go. This type is a newtype wrapper around `u32` with the
98/// invariant that the internal value is limited by this amount.
99#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
100#[repr(transparent)]
101pub struct ItemCount {
102    raw: u32,
103}
104
105impl ItemCount {
106    const MAX: u32 = 1 << 28;
107    const ZERO: ItemCount = ItemCount { raw: 0 };
108
109    /// Creates a new `ItemCount` with the specified count, or a trap if it's
110    /// too large.
111    fn new(count: u32) -> Result<Self, Trap> {
112        if count < Self::MAX {
113            Ok(Self { raw: count })
114        } else {
115            Err(Trap::StreamOpTooBig)
116        }
117    }
118
119    /// Same as `Self::new` but takes a `usize`.
120    fn new_usize(count: usize) -> Result<Self, Trap> {
121        let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?;
122        Self::new(count)
123    }
124
125    fn as_u32(&self) -> u32 {
126        self.raw
127    }
128
129    fn as_usize(&self) -> usize {
130        usize::try_from(self.raw).unwrap()
131    }
132
133    /// Increments `self` by `amt`, returning a trap if the amount would exceed
134    /// the maximum item count.
135    fn inc(&mut self, amt: usize) -> Result<(), Trap> {
136        let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?;
137        let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?;
138        if new_raw < Self::MAX {
139            self.raw = new_raw;
140            Ok(())
141        } else {
142            Err(Trap::StreamOpTooBig)
143        }
144    }
145
146    /// Helper to add two `ItemCount`s together, fallibly.
147    ///
148    /// It's considered a bug if this overflows, so this is only suitable in
149    /// situations where overflow and/or exceeding the total item count is known
150    /// that it may be possible.
151    fn add(&self, other: ItemCount) -> Result<ItemCount> {
152        match self.raw.checked_add(other.raw) {
153            Some(raw) => Ok(ItemCount::new(raw)?),
154            None => bail_bug!("overflow in `ItemCount::add`"),
155        }
156    }
157
158    /// Same as `add`, but for subtraction.
159    ///
160    /// Like with `add` this is only suitable for situations where the result is
161    /// known to not underflow.
162    fn sub(&self, other: ItemCount) -> Result<ItemCount> {
163        match self.raw.checked_sub(other.raw) {
164            Some(raw) => Ok(ItemCount { raw }),
165            None => bail_bug!("underflow in `ItemCount::sub`"),
166        }
167    }
168}
169
170impl fmt::Display for ItemCount {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        self.raw.fmt(f)
173    }
174}
175
176impl fmt::Debug for ItemCount {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        self.raw.fmt(f)
179    }
180}
181
182impl PartialEq<u32> for ItemCount {
183    fn eq(&self, other: &u32) -> bool {
184        self.raw == *other
185    }
186}
187
188impl PartialOrd<u32> for ItemCount {
189    fn partial_cmp(&self, other: &u32) -> Option<std::cmp::Ordering> {
190        self.raw.partial_cmp(other)
191    }
192}
193
194/// Represents a stream or future type index.
195///
196/// This is useful as a parameter type for functions which operate on either a
197/// future or a stream.
198#[derive(Copy, Clone, Debug)]
199pub enum TransmitIndex {
200    Stream(TypeStreamTableIndex),
201    Future(TypeFutureTableIndex),
202}
203
204impl TransmitIndex {
205    pub fn kind(&self) -> TransmitKind {
206        match self {
207            TransmitIndex::Stream(_) => TransmitKind::Stream,
208            TransmitIndex::Future(_) => TransmitKind::Future,
209        }
210    }
211
212    /// Retrieve the payload type of the specified stream or future, or `None`
213    /// if it has no payload type.
214    fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
215        match self {
216            TransmitIndex::Stream(i) => {
217                let ty = types[*i].ty;
218                types[ty].payload.as_ref()
219            }
220            TransmitIndex::Future(i) => {
221                let ty = types[*i].ty;
222                types[ty].payload.as_ref()
223            }
224        }
225    }
226}
227
228/// Retrieve the host rep and state for the specified guest-visible waitable
229/// handle.
230fn get_mut_by_index_from(
231    handle_table: &mut HandleTable,
232    ty: TransmitIndex,
233    index: u32,
234) -> Result<(u32, &mut TransmitLocalState)> {
235    match ty {
236        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
237        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
238    }
239}
240
241fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
242    mut store: StoreContextMut<U>,
243    instance: Instance,
244    caller_thread: QualifiedThreadId,
245    options: OptionsIndex,
246    ty: TransmitIndex,
247    address: usize,
248    count: usize,
249    buffer: &mut B,
250) -> Result<()> {
251    let count = buffer.remaining().len().min(count);
252
253    // If lowering may call realloc in the guest, then the guest may need
254    // to access its thread context, so we need to set the current thread before lowering
255    // and restore the old one afterward.
256    let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
257        let old_thread = store.0.set_thread(caller_thread)?;
258        (
259            &mut LowerContext::new(store.as_context_mut(), options, instance),
260            Some(old_thread),
261        )
262    } else {
263        (
264            &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
265            None,
266        )
267    };
268
269    if address % usize::try_from(T::ALIGN32)? != 0 {
270        bail!("read pointer not aligned");
271    }
272    lower
273        .as_slice_mut()
274        .get_mut(address..)
275        .and_then(|b| b.get_mut(..T::SIZE32 * count))
276        .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
277
278    if let Some(ty) = ty.payload(lower.types) {
279        T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
280    }
281
282    if let Some(old_thread) = old_thread {
283        store.0.set_thread(old_thread)?;
284    }
285
286    buffer.skip(count);
287
288    Ok(())
289}
290
291fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
292    lift: &mut LiftContext<'_>,
293    ty: Option<InterfaceType>,
294    buffer: &mut B,
295    address: usize,
296    count: usize,
297) -> Result<()> {
298    let count = count.min(buffer.remaining_capacity());
299    if T::IS_RUST_UNIT_TYPE {
300        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
301        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
302        // is a valid way to populate the zero-sized buffer.
303        buffer.extend(
304            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
305        )
306    } else {
307        let ty = match ty {
308            Some(ty) => ty,
309            None => bail_bug!("type required for non-unit lift"),
310        };
311        if address % usize::try_from(T::ALIGN32)? != 0 {
312            bail!("write pointer not aligned");
313        }
314        lift.memory()
315            .get(address..)
316            .and_then(|b| b.get(..T::SIZE32 * count))
317            .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
318
319        let list = &WasmList::new(address, count, lift, ty)?;
320        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
321    }
322    Ok(())
323}
324
325/// Represents the state associated with an error context
326#[derive(Debug, PartialEq, Eq, PartialOrd)]
327pub(super) struct ErrorContextState {
328    /// Debug message associated with the error context
329    pub(crate) debug_msg: String,
330}
331
332/// Represents the size and alignment for a "flat" Component Model type,
333/// i.e. one containing no pointers or handles.
334#[derive(Debug, Clone, Copy, PartialEq, Eq)]
335pub(super) struct FlatAbi {
336    pub(super) size: u32,
337    pub(super) align: u32,
338}
339
340/// Represents the buffer for a host- or guest-initiated stream read.
341pub struct Destination<'a, T, B> {
342    id: TableId<TransmitState>,
343    buffer: &'a mut B,
344    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
345    _phantom: PhantomData<fn() -> T>,
346}
347
348impl<'a, T, B> Destination<'a, T, B> {
349    /// Reborrow `self` so it can be used again later.
350    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
351        Destination {
352            id: self.id,
353            buffer: &mut *self.buffer,
354            host_buffer: self.host_buffer.as_deref_mut(),
355            _phantom: PhantomData,
356        }
357    }
358
359    /// Take the buffer out of `self`, leaving a default-initialized one in its
360    /// place.
361    ///
362    /// This can be useful for reusing the previously-stored buffer's capacity
363    /// instead of allocating a fresh one.
364    pub fn take_buffer(&mut self) -> B
365    where
366        B: Default,
367    {
368        mem::take(self.buffer)
369    }
370
371    /// Store the specified buffer in `self`.
372    ///
373    /// Any items contained in the buffer will be delivered to the reader after
374    /// the `StreamProducer::poll_produce` call to which this `Destination` was
375    /// passed returns (unless overwritten by another call to `set_buffer`).
376    ///
377    /// If items are stored via this buffer _and_ written via a
378    /// `DirectDestination` view of `self`, then the items in the buffer will be
379    /// delivered after the ones written using `DirectDestination`.
380    pub fn set_buffer(&mut self, buffer: B) {
381        *self.buffer = buffer;
382    }
383
384    /// Return the remaining number of items the current read has capacity to
385    /// accept, if known.
386    ///
387    /// This will return `Some(_)` if the reader is a guest; it will return
388    /// `None` if the reader is the host.
389    ///
390    /// Note that this can return `Some(0)`. This means that the guest is
391    /// attempting to perform a zero-length read which typically means that it's
392    /// trying to wait for this stream to be ready-to-read but is not actually
393    /// ready to receive the items yet. The host in this case is allowed to
394    /// either block waiting for readiness or immediately complete the
395    /// operation. The guest is expected to handle both cases. Some more
396    /// discussion about this case can be found in the discussion of ["Stream
397    /// Readiness" in the component-model repo][docs].
398    ///
399    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
400    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
401        // Note that this unwrap should only trigger for bugs in Wasmtime, and
402        // this is modeled here to centralize the `.unwrap()` for this method in
403        // one location.
404        self.remaining_(store.as_context_mut().0).unwrap()
405    }
406
407    fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
408        let transmit = store.concurrent_state_mut().get_mut(self.id)?;
409
410        if let &ReadState::GuestReady { count, .. } = &transmit.read {
411            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
412                bail_bug!("expected WriteState::HostReady")
413            };
414
415            Ok(Some(count.as_usize() - guest_offset.as_usize()))
416        } else {
417            Ok(None)
418        }
419    }
420}
421
422impl<'a, B> Destination<'a, u8, B> {
423    /// Return a `DirectDestination` view of `self`.
424    ///
425    /// If the reader is a guest, this will provide direct access to the guest's
426    /// read buffer.  If the reader is a host, this will provide access to a
427    /// buffer which will be delivered to the host before any items stored using
428    /// `Destination::set_buffer`.
429    ///
430    /// `capacity` will only be used if the reader is a host, in which case it
431    /// will update the length of the buffer, possibly zero-initializing the new
432    /// elements if the new length is larger than the old length.
433    pub fn as_direct<D>(
434        mut self,
435        store: StoreContextMut<'a, D>,
436        capacity: usize,
437    ) -> DirectDestination<'a, D> {
438        if let Some(buffer) = self.host_buffer.as_deref_mut() {
439            buffer.set_position(0);
440            if buffer.get_mut().is_empty() {
441                buffer.get_mut().resize(capacity, 0);
442            }
443        }
444
445        DirectDestination {
446            id: self.id,
447            host_buffer: self.host_buffer,
448            store,
449        }
450    }
451}
452
453/// Represents a read from a `stream<u8>`, providing direct access to the
454/// writer's buffer.
455pub struct DirectDestination<'a, D: 'static> {
456    id: TableId<TransmitState>,
457    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
458    store: StoreContextMut<'a, D>,
459}
460
461impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
462    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
463        let rem = self.remaining();
464        let n = rem.len().min(buf.len());
465        rem[..n].copy_from_slice(&buf[..n]);
466        self.mark_written(n);
467        Ok(n)
468    }
469
470    fn flush(&mut self) -> std::io::Result<()> {
471        Ok(())
472    }
473}
474
475impl<D: 'static> DirectDestination<'_, D> {
476    /// Provide direct access to the writer's buffer.
477    pub fn remaining(&mut self) -> &mut [u8] {
478        // Note that this unwrap should only trigger for bugs in Wasmtime, and
479        // this is modeled here to centralize the `.unwrap()` for this method in
480        // one location.
481        self.remaining_().unwrap()
482    }
483
484    fn remaining_(&mut self) -> Result<&mut [u8]> {
485        if let Some(buffer) = self.host_buffer.as_deref_mut() {
486            return Ok(buffer.get_mut());
487        }
488        let transmit = self
489            .store
490            .as_context_mut()
491            .0
492            .concurrent_state_mut()
493            .get_mut(self.id)?;
494
495        let &ReadState::GuestReady {
496            address,
497            count,
498            options,
499            instance,
500            ..
501        } = &transmit.read
502        else {
503            bail_bug!("expected ReadState::GuestReady")
504        };
505
506        let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
507            bail_bug!("expected WriteState::HostReady")
508        };
509
510        let memory = instance
511            .options_memory_mut(self.store.0, options)
512            .get_mut((address + guest_offset.as_usize())..)
513            .and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize())));
514        match memory {
515            Some(memory) => Ok(memory),
516            None => bail_bug!("guest buffer unexpectedly out of bounds"),
517        }
518    }
519
520    /// Mark the specified number of bytes as written to the writer's buffer.
521    ///
522    /// # Panics
523    ///
524    /// This will panic if the count is larger than the size of the
525    /// buffer returned by `Self::remaining`.
526    pub fn mark_written(&mut self, count: usize) {
527        // Note that this unwrap should only trigger for bugs in Wasmtime, and
528        // this is modeled here to centralize the `.unwrap()` for this method in
529        // one location.
530        self.mark_written_(count).unwrap()
531    }
532
533    fn mark_written_(&mut self, count: usize) -> Result<()> {
534        if let Some(buffer) = self.host_buffer.as_deref_mut() {
535            buffer.set_position(
536                // Note that these `.unwrap`s are documented panic conditions of
537                // `mark_written`.
538                buffer
539                    .position()
540                    .checked_add(u64::try_from(count).unwrap())
541                    .unwrap(),
542            );
543        } else {
544            let transmit = self
545                .store
546                .as_context_mut()
547                .0
548                .concurrent_state_mut()
549                .get_mut(self.id)?;
550
551            let ReadState::GuestReady {
552                count: read_count, ..
553            } = &transmit.read
554            else {
555                bail_bug!("expected ReadState::GuestReady")
556            };
557
558            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
559                bail_bug!("expected WriteState::HostReady");
560            };
561
562            if guest_offset.as_usize() + count > read_count.as_usize() {
563                // Note that this `panic` is a documented panic condition of
564                // `mark_written`.
565                panic!(
566                    "write count ({count}) must be less than or equal to read count ({read_count})"
567                )
568            } else {
569                guest_offset.inc(count)?;
570            }
571        }
572        Ok(())
573    }
574}
575
576/// Represents the state of a `Stream{Producer,Consumer}`.
577#[derive(Copy, Clone, Debug)]
578pub enum StreamResult {
579    /// The operation completed normally, and the producer or consumer may be
580    /// able to produce or consume more items, respectively.
581    Completed,
582    /// The operation was interrupted (i.e. it wrapped up early after receiving
583    /// a `finish` parameter value of true in a call to `poll_produce` or
584    /// `poll_consume`), and the producer or consumer may be able to produce or
585    /// consume more items, respectively.
586    Cancelled,
587    /// The operation completed normally, but the producer or consumer will
588    /// _not_ able to produce or consume more items, respectively.
589    Dropped,
590}
591
592/// Represents the host-owned write end of a stream.
593pub trait StreamProducer<D>: Send + 'static {
594    /// The payload type of this stream.
595    type Item;
596
597    /// The `WriteBuffer` type to use when delivering items.
598    type Buffer: WriteBuffer<Self::Item> + Default;
599
600    /// Handle a host- or guest-initiated read by delivering zero or more items
601    /// to the specified destination.
602    ///
603    /// This will be called whenever the reader starts a read.
604    ///
605    /// # Arguments
606    ///
607    /// * `self` - a `Pin`'d version of self to perform Rust-level
608    ///   future-related operations on.
609    /// * `cx` - a Rust-related [`Context`] which is passed to other
610    ///   future-related operations or used to acquire a waker.
611    /// * `store` - the Wasmtime store that this operation is happening within.
612    ///   Used, for example, to consult the state `D` associated with the store.
613    /// * `destination` - the location that items are to be written to.
614    /// * `finish` - a flag indicating whether the host should strive to
615    ///   immediately complete/cancel any pending operation. See below for more
616    ///   details.
617    ///
618    /// # Behavior
619    ///
620    /// If the implementation is able to produce one or more items immediately,
621    /// it should write them to `destination` and return either
622    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
623    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
624    /// any more items.
625    ///
626    /// If the implementation is unable to produce any items immediately, but
627    /// expects to do so later, and `finish` is _false_, it should store the
628    /// waker from `cx` for later and return `Poll::Pending` without writing
629    /// anything to `destination`.  Later, it should alert the waker when either
630    /// the items arrive, the stream has ended, or an error occurs.
631    ///
632    /// If more items are written to `destination` than the reader has immediate
633    /// capacity to accept, they will be retained in memory by the caller and
634    /// used to satisfy future reads, in which case `poll_produce` will only be
635    /// called again once all those items have been delivered.
636    ///
637    /// # Zero-length reads
638    ///
639    /// This function may be called with a zero-length capacity buffer
640    /// (i.e. `Destination::remaining` returns `Some(0)`). This indicates that
641    /// the guest wants to wait to see if an item is ready without actually
642    /// reading the item. For example think of a UNIX `poll` function run on a
643    /// TCP stream, seeing if it's readable without actually reading it.
644    ///
645    /// In this situation the host is allowed to either return immediately or
646    /// wait for readiness. Note that waiting for readiness is not always
647    /// possible. For example it's impossible to test if a Rust-native `Future`
648    /// is ready without actually reading the item. Stream-specific
649    /// optimizations, such as testing if a TCP stream is readable, may be
650    /// possible however.
651    ///
652    /// For a zero-length read, the host is allowed to:
653    ///
654    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
655    ///   anything if it expects to be able to produce items immediately (i.e.
656    ///   without first returning `Poll::Pending`) the next time `poll_produce`
657    ///   is called with non-zero capacity. This is the best-case scenario of
658    ///   fulfilling the guest's desire -- items aren't read/buffered but the
659    ///   host is saying it's ready when the guest is.
660    ///
661    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without actually
662    ///   testing for readiness. The guest doesn't know this yet, but the guest
663    ///   will realize that zero-length reads won't work on this stream when a
664    ///   subsequent nonzero read attempt is made which returns `Poll::Pending`
665    ///   here.
666    ///
667    /// - Return `Poll::Pending` if the host has performed necessary async work
668    ///   to wait for this stream to be readable without actually reading
669    ///   anything. This is also a best-case scenario where the host is letting
670    ///   the guest know that nothing is ready yet. Later the zero-length read
671    ///   will complete and then the guest will attempt a nonzero-length read to
672    ///   actually read some bytes.
673    ///
674    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
675    ///   `Destination::set_buffer` with one more more items. Note, however,
676    ///   that this creates the hazard that the items will never be received by
677    ///   the guest if it decides not to do another non-zero-length read before
678    ///   closing the stream.  Moreover, if `Self::Item` is e.g. a
679    ///   `Resource<_>`, they may end up leaking in that scenario. It is not
680    ///   recommended to do this and it's better to return
681    ///   `StreamResult::Completed` without buffering anything instead.
682    ///
683    /// For more discussion on zero-length reads see the [documentation in the
684    /// component-model repo itself][docs].
685    ///
686    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
687    ///
688    /// # Return
689    ///
690    /// This function can return a number of possible cases from this function:
691    ///
692    /// * `Poll::Pending` - this operation cannot complete at this time. The
693    ///   Rust-level `Future::poll` contract applies here where a waker should
694    ///   be stored from the `cx` argument and be arranged to receive a
695    ///   notification when this implementation can make progress. For example
696    ///   if you call `Future::poll` on a sub-future, that's enough. If items
697    ///   were written to `destination` then a trap in the guest will be raised.
698    ///
699    ///   Note that implementations should strive to avoid this return value
700    ///   when `finish` is `true`. In such a situation the guest is attempting
701    ///   to, for example, cancel a previous operation. By returning
702    ///   `Poll::Pending` the guest will be blocked during the cancellation
703    ///   request. If `finish` is `true` then `StreamResult::Cancelled` is
704    ///   favored to indicate that no items were read. If a short read happened,
705    ///   however, it's ok to return `StreamResult::Completed` indicating some
706    ///   items were read.
707    ///
708    /// * `Poll::Ok(StreamResult::Completed)` - items, if applicable, were
709    ///   written to the `destination`.
710    ///
711    /// * `Poll::Ok(StreamResult::Cancelled)` - used when `finish` is `true` and
712    ///   the implementation was able to successfully cancel any async work that
713    ///   a previous read kicked off, if any. The host should not buffer values
714    ///   received after returning `Cancelled` because the guest will not be
715    ///   aware of these values and the guest could close the stream after
716    ///   cancelling a read. Hosts should only return `Cancelled` when there are
717    ///   no more async operations in flight for a previous read.
718    ///
719    ///   If items were written to `destination` then a trap in the guest will
720    ///   be raised. If `finish` is `false` then this return value will raise a
721    ///   trap in the guest.
722    ///
723    /// * `Poll::Ok(StreamResult::Dropped)` - end-of-stream marker, indicating
724    ///   that this producer should not be polled again. Note that items may
725    ///   still be written to `destination`.
726    ///
727    /// # Errors
728    ///
729    /// The implementation may alternatively choose to return `Err(_)` to
730    /// indicate an unrecoverable error. This will cause the guest (if any) to
731    /// trap and render the component instance (if any) unusable. The
732    /// implementation should report errors that _are_ recoverable by other
733    /// means (e.g. by writing to a `future`) and return
734    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
735    fn poll_produce<'a>(
736        self: Pin<&mut Self>,
737        cx: &mut Context<'_>,
738        store: StoreContextMut<'a, D>,
739        destination: Destination<'a, Self::Item, Self::Buffer>,
740        finish: bool,
741    ) -> Poll<Result<StreamResult>>;
742
743    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
744    /// be downcast to the specified type.
745    ///
746    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
747    /// to the specified type is guaranteed to succeed.
748    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
749        Err(me)
750    }
751}
752
753impl<T, D> StreamProducer<D> for iter::Empty<T>
754where
755    T: Send + Sync + 'static,
756{
757    type Item = T;
758    type Buffer = Option<Self::Item>;
759
760    fn poll_produce<'a>(
761        self: Pin<&mut Self>,
762        _: &mut Context<'_>,
763        _: StoreContextMut<'a, D>,
764        _: Destination<'a, Self::Item, Self::Buffer>,
765        _: bool,
766    ) -> Poll<Result<StreamResult>> {
767        Poll::Ready(Ok(StreamResult::Dropped))
768    }
769}
770
771impl<T, D> StreamProducer<D> for stream::Empty<T>
772where
773    T: Send + Sync + 'static,
774{
775    type Item = T;
776    type Buffer = Option<Self::Item>;
777
778    fn poll_produce<'a>(
779        self: Pin<&mut Self>,
780        _: &mut Context<'_>,
781        _: StoreContextMut<'a, D>,
782        _: Destination<'a, Self::Item, Self::Buffer>,
783        _: bool,
784    ) -> Poll<Result<StreamResult>> {
785        Poll::Ready(Ok(StreamResult::Dropped))
786    }
787}
788
789impl<T, D> StreamProducer<D> for Vec<T>
790where
791    T: Unpin + Send + Sync + 'static,
792{
793    type Item = T;
794    type Buffer = VecBuffer<T>;
795
796    fn poll_produce<'a>(
797        self: Pin<&mut Self>,
798        _: &mut Context<'_>,
799        _: StoreContextMut<'a, D>,
800        mut dst: Destination<'a, Self::Item, Self::Buffer>,
801        _: bool,
802    ) -> Poll<Result<StreamResult>> {
803        dst.set_buffer(mem::take(self.get_mut()).into());
804        Poll::Ready(Ok(StreamResult::Dropped))
805    }
806}
807
808impl<T, D> StreamProducer<D> for Box<[T]>
809where
810    T: Unpin + Send + Sync + 'static,
811{
812    type Item = T;
813    type Buffer = VecBuffer<T>;
814
815    fn poll_produce<'a>(
816        self: Pin<&mut Self>,
817        _: &mut Context<'_>,
818        _: StoreContextMut<'a, D>,
819        mut dst: Destination<'a, Self::Item, Self::Buffer>,
820        _: bool,
821    ) -> Poll<Result<StreamResult>> {
822        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
823        Poll::Ready(Ok(StreamResult::Dropped))
824    }
825}
826
827#[cfg(feature = "component-model-async-bytes")]
828impl<D> StreamProducer<D> for bytes::Bytes {
829    type Item = u8;
830    type Buffer = Cursor<Self>;
831
832    fn poll_produce<'a>(
833        self: Pin<&mut Self>,
834        _: &mut Context<'_>,
835        _store: StoreContextMut<'a, D>,
836        mut dst: Destination<'a, Self::Item, Self::Buffer>,
837        _: bool,
838    ) -> Poll<Result<StreamResult>> {
839        dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
840        Poll::Ready(Ok(StreamResult::Dropped))
841    }
842}
843
844#[cfg(feature = "component-model-async-bytes")]
845impl<D> StreamProducer<D> for bytes::BytesMut {
846    type Item = u8;
847    type Buffer = Cursor<Self>;
848
849    fn poll_produce<'a>(
850        self: Pin<&mut Self>,
851        _: &mut Context<'_>,
852        _store: StoreContextMut<'a, D>,
853        mut dst: Destination<'a, Self::Item, Self::Buffer>,
854        _: bool,
855    ) -> Poll<Result<StreamResult>> {
856        dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
857        Poll::Ready(Ok(StreamResult::Dropped))
858    }
859}
860
861/// Represents the buffer for a host- or guest-initiated stream write.
862pub struct Source<'a, T> {
863    id: TableId<TransmitState>,
864    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
865}
866
867impl<'a, T> Source<'a, T> {
868    /// Reborrow `self` so it can be used again later.
869    pub fn reborrow(&mut self) -> Source<'_, T> {
870        Source {
871            id: self.id,
872            host_buffer: self.host_buffer.as_deref_mut(),
873        }
874    }
875
876    /// Accept zero or more items from the writer.
877    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
878    where
879        T: func::Lift + 'static,
880        B: ReadBuffer<T>,
881    {
882        if let Some(input) = &mut self.host_buffer {
883            let count = input.remaining().len().min(buffer.remaining_capacity());
884            buffer.move_from(*input, count);
885        } else {
886            let store = store.as_context_mut();
887            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
888
889            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
890                bail_bug!("expected ReadState::HostReady");
891            };
892
893            let &WriteState::GuestReady {
894                ty,
895                address,
896                count,
897                options,
898                instance,
899                ..
900            } = &transmit.write
901            else {
902                bail_bug!("expected WriteState::GuestReady");
903            };
904
905            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
906            let ty = ty.payload(cx.types);
907            let old_remaining = buffer.remaining_capacity();
908            lift::<T, B>(
909                cx,
910                ty.copied(),
911                buffer,
912                address + (T::SIZE32 * guest_offset.as_usize()),
913                count.as_usize() - guest_offset.as_usize(),
914            )?;
915
916            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
917
918            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
919                bail_bug!("expected ReadState::HostReady");
920            };
921
922            guest_offset.inc(old_remaining - buffer.remaining_capacity())?;
923        }
924
925        Ok(())
926    }
927
928    /// Return the number of items remaining to be read from the current write
929    /// operation.
930    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
931    where
932        T: 'static,
933    {
934        // Note that this unwrap should only trigger for bugs in Wasmtime, and
935        // this is modeled here to centralize the `.unwrap()` for this method in
936        // one location.
937        self.remaining_(store.as_context_mut().0).unwrap()
938    }
939
940    fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
941    where
942        T: 'static,
943    {
944        let transmit = store.concurrent_state_mut().get_mut(self.id)?;
945
946        if let &WriteState::GuestReady { count, .. } = &transmit.write {
947            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
948                bail_bug!("expected ReadState::HostReady")
949            };
950
951            Ok(count.as_usize() - guest_offset.as_usize())
952        } else if let Some(host_buffer) = &self.host_buffer {
953            Ok(host_buffer.remaining().len())
954        } else {
955            bail_bug!("expected either WriteState::GuestReady or host buffer")
956        }
957    }
958}
959
960impl<'a> Source<'a, u8> {
961    /// Return a `DirectSource` view of `self`.
962    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
963        DirectSource {
964            id: self.id,
965            host_buffer: self.host_buffer,
966            store,
967        }
968    }
969}
970
971/// Represents a write to a `stream<u8>`, providing direct access to the
972/// writer's buffer.
973pub struct DirectSource<'a, D: 'static> {
974    id: TableId<TransmitState>,
975    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
976    store: StoreContextMut<'a, D>,
977}
978
979impl<D: 'static> std::io::Read for DirectSource<'_, D> {
980    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
981        let rem = self.remaining();
982        let n = rem.len().min(buf.len());
983        buf[..n].copy_from_slice(&rem[..n]);
984        self.mark_read(n);
985        Ok(n)
986    }
987}
988
989impl<D: 'static> DirectSource<'_, D> {
990    /// Provide direct access to the writer's buffer.
991    pub fn remaining(&mut self) -> &[u8] {
992        // Note that this unwrap should only trigger for bugs in Wasmtime, and
993        // this is modeled here to centralize the `.unwrap()` for this method in
994        // one location.
995        self.remaining_().unwrap()
996    }
997
998    fn remaining_(&mut self) -> Result<&[u8]> {
999        if let Some(buffer) = self.host_buffer.as_deref_mut() {
1000            return Ok(buffer.remaining());
1001        }
1002        let transmit = self
1003            .store
1004            .as_context_mut()
1005            .0
1006            .concurrent_state_mut()
1007            .get_mut(self.id)?;
1008
1009        let &WriteState::GuestReady {
1010            address,
1011            count,
1012            options,
1013            instance,
1014            ..
1015        } = &transmit.write
1016        else {
1017            bail_bug!("expected WriteState::GuestReady")
1018        };
1019
1020        let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
1021            bail_bug!("expected ReadState::HostReady")
1022        };
1023
1024        let memory = instance
1025            .options_memory(self.store.0, options)
1026            .get((address + guest_offset.as_usize())..)
1027            .and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize())));
1028        match memory {
1029            Some(memory) => Ok(memory),
1030            None => bail_bug!("guest buffer unexpectedly out of bounds"),
1031        }
1032    }
1033
1034    /// Mark the specified number of bytes as read from the writer's buffer.
1035    ///
1036    /// # Panics
1037    ///
1038    /// This will panic if the count is larger than the size of the buffer
1039    /// returned by `Self::remaining`.
1040    pub fn mark_read(&mut self, count: usize) {
1041        // Note that this unwrap should only trigger for bugs in Wasmtime, and
1042        // this is modeled here to centralize the `.unwrap()` for this method in
1043        // one location.
1044        self.mark_read_(count).unwrap()
1045    }
1046
1047    fn mark_read_(&mut self, count: usize) -> Result<()> {
1048        if let Some(buffer) = self.host_buffer.as_deref_mut() {
1049            buffer.skip(count);
1050            return Ok(());
1051        }
1052
1053        let transmit = self
1054            .store
1055            .as_context_mut()
1056            .0
1057            .concurrent_state_mut()
1058            .get_mut(self.id)?;
1059
1060        let WriteState::GuestReady {
1061            count: write_count, ..
1062        } = &transmit.write
1063        else {
1064            bail_bug!("expected WriteState::GuestReady");
1065        };
1066
1067        let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
1068            bail_bug!("expected ReadState::HostReady");
1069        };
1070
1071        if guest_offset.as_usize() + count > write_count.as_usize() {
1072            // Note that this is a documented panic condition of `mark_read`.
1073            panic!("read count ({count}) must be less than or equal to write count ({write_count})")
1074        } else {
1075            guest_offset.inc(count)?;
1076        }
1077        Ok(())
1078    }
1079}
1080
1081/// Represents the host-owned read end of a stream.
1082pub trait StreamConsumer<D>: Send + 'static {
1083    /// The payload type of this stream.
1084    type Item;
1085
1086    /// Handle a host- or guest-initiated write by accepting zero or more items
1087    /// from the specified source.
1088    ///
1089    /// This will be called whenever the writer starts a write.
1090    ///
1091    /// If the implementation is able to consume one or more items immediately,
1092    /// it should take them from `source` and return either
1093    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
1094    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
1095    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
1096    /// indicate that the caller should delay sending a `COMPLETED` event to the
1097    /// writer until a later call to this function returns `Poll::Ready(_)`.
1098    /// For more about that, see the `Backpressure` section below.
1099    ///
1100    /// If the implementation cannot consume any items immediately and `finish`
1101    /// is _false_, it should store the waker from `cx` for later and return
1102    /// `Poll::Pending` without writing anything to `destination`.  Later, it
1103    /// should alert the waker when either (1) the items arrive, (2) the stream
1104    /// has ended, or (3) an error occurs.
1105    ///
1106    /// If the implementation cannot consume any items immediately and `finish`
1107    /// is _true_, it should, if possible, return
1108    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
1109    /// anything from `source`.  However, that might not be possible if an
1110    /// earlier call to `poll_consume` kicked off an asynchronous operation
1111    /// which needs to be completed (and possibly interrupted) gracefully, in
1112    /// which case the implementation may return `Poll::Pending` and later alert
1113    /// the waker as described above.  In other words, when `finish` is true,
1114    /// the implementation should prioritize returning a result to the reader
1115    /// (even if no items can be consumed) rather than wait indefinitely for at
1116    /// capacity to free up.
1117    ///
1118    /// In all of the above cases, the implementation may alternatively choose
1119    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
1120    /// the guest (if any) to trap and render the component instance (if any)
1121    /// unusable.  The implementation should report errors that _are_
1122    /// recoverable by other means (e.g. by writing to a `future`) and return
1123    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
1124    ///
1125    /// Note that the implementation should only return
1126    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
1127    /// items from `source` if called with `finish` set to true.  If it does so
1128    /// when `finish` is false, the caller will trap.  Additionally, it should
1129    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
1130    /// least one item from `source` if there is an item available; otherwise,
1131    /// the caller will trap.  If `poll_consume` is called with no items in
1132    /// `source`, it should only return `Poll::Ready(_)` once it is able to
1133    /// accept at least one item during the next call to `poll_consume`.
1134    ///
1135    /// Note that any items which the implementation of this trait takes from
1136    /// `source` become the responsibility of that implementation.  For that
1137    /// reason, an implementation which forwards items to an upstream sink
1138    /// should reserve capacity in that sink before taking items out of
1139    /// `source`, if possible.  Alternatively, it might buffer items which can't
1140    /// be forwarded immediately and send them once capacity is freed up.
1141    ///
1142    /// ## Backpressure
1143    ///
1144    /// As mentioned above, an implementation might choose to return
1145    /// `Poll::Pending` after taking items from `source`, which tells the caller
1146    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
1147    /// a form of backpressure when the items are forwarded to an upstream sink
1148    /// asynchronously.  Note, however, that it's not possible to "put back"
1149    /// items into `source` once they've been taken out, so if the upstream sink
1150    /// is unable to accept all the items, that cannot be communicated to the
1151    /// writer at this level of abstraction.  Just as with application-specific,
1152    /// recoverable errors, information about which items could be forwarded and
1153    /// which could not must be communicated out-of-band, e.g. by writing to an
1154    /// application-specific `future`.
1155    ///
1156    /// Similarly, if the writer cancels the write after items have been taken
1157    /// from `source` but before the items have all been forwarded to an
1158    /// upstream sink, `poll_consume` will be called with `finish` set to true,
1159    /// and the implementation may either:
1160    ///
1161    /// - Interrupt the forwarding process gracefully.  This may be preferable
1162    /// if there is an out-of-band channel for communicating to the writer how
1163    /// many items were forwarded before being interrupted.
1164    ///
1165    /// - Allow the forwarding to complete without interrupting it.  This is
1166    /// usually preferable if there's no out-of-band channel for reporting back
1167    /// to the writer how many items were forwarded.
1168    fn poll_consume(
1169        self: Pin<&mut Self>,
1170        cx: &mut Context<'_>,
1171        store: StoreContextMut<D>,
1172        source: Source<'_, Self::Item>,
1173        finish: bool,
1174    ) -> Poll<Result<StreamResult>>;
1175}
1176
1177/// Represents a host-owned write end of a future.
1178pub trait FutureProducer<D>: Send + 'static {
1179    /// The payload type of this future.
1180    type Item;
1181
1182    /// Handle a host- or guest-initiated read by producing a value.
1183    ///
1184    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1185    /// simplified interface for futures.
1186    ///
1187    /// If `finish` is true, the implementation may return
1188    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
1189    /// could produce a value.  Otherwise, it must either return
1190    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
1191    fn poll_produce(
1192        self: Pin<&mut Self>,
1193        cx: &mut Context<'_>,
1194        store: StoreContextMut<D>,
1195        finish: bool,
1196    ) -> Poll<Result<Option<Self::Item>>>;
1197}
1198
1199impl<T, E, D, Fut> FutureProducer<D> for Fut
1200where
1201    E: Into<Error>,
1202    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1203{
1204    type Item = T;
1205
1206    fn poll_produce<'a>(
1207        self: Pin<&mut Self>,
1208        cx: &mut Context<'_>,
1209        _: StoreContextMut<'a, D>,
1210        finish: bool,
1211    ) -> Poll<Result<Option<T>>> {
1212        match self.poll(cx) {
1213            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1214            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1215            Poll::Pending if finish => Poll::Ready(Ok(None)),
1216            Poll::Pending => Poll::Pending,
1217        }
1218    }
1219}
1220
1221/// Represents a host-owned read end of a future.
1222pub trait FutureConsumer<D>: Send + 'static {
1223    /// The payload type of this future.
1224    type Item;
1225
1226    /// Handle a host- or guest-initiated write by consuming a value.
1227    ///
1228    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1229    /// simplified interface for futures.
1230    ///
1231    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
1232    /// without taking the item from `source`, which indicates the operation was
1233    /// canceled before it could consume the value.  Otherwise, it must either
1234    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
1235    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
1236    /// the item).
1237    fn poll_consume(
1238        self: Pin<&mut Self>,
1239        cx: &mut Context<'_>,
1240        store: StoreContextMut<D>,
1241        source: Source<'_, Self::Item>,
1242        finish: bool,
1243    ) -> Poll<Result<()>>;
1244}
1245
1246/// Represents the readable end of a Component Model `future`.
1247///
1248/// Note that `FutureReader` instances must be disposed of using either `pipe`
1249/// or `close`; otherwise the in-store representation will leak and the writer
1250/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1251/// ensure that disposal happens automatically.
1252pub struct FutureReader<T> {
1253    id: TableId<TransmitHandle>,
1254    _phantom: PhantomData<T>,
1255}
1256
1257impl<T> FutureReader<T> {
1258    /// Create a new future with the specified producer.
1259    ///
1260    /// # Errors
1261    ///
1262    /// Returns an error if the resource table for this store is full or if
1263    /// [`Config::concurrency_support`] is not enabled.
1264    ///
1265    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1266    pub fn new<S: AsContextMut>(
1267        mut store: S,
1268        producer: impl FutureProducer<S::Data, Item = T>,
1269    ) -> Result<Self>
1270    where
1271        T: func::Lower + func::Lift + Send + Sync + 'static,
1272    {
1273        ensure!(
1274            store.as_context().0.concurrency_support(),
1275            "concurrency support is not enabled"
1276        );
1277
1278        struct Producer<P>(P);
1279
1280        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1281            for Producer<P>
1282        {
1283            type Item = P::Item;
1284            type Buffer = Option<P::Item>;
1285
1286            fn poll_produce<'a>(
1287                self: Pin<&mut Self>,
1288                cx: &mut Context<'_>,
1289                store: StoreContextMut<D>,
1290                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1291                finish: bool,
1292            ) -> Poll<Result<StreamResult>> {
1293                // SAFETY: This is a standard pin-projection, and we never move
1294                // out of `self`.
1295                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1296
1297                Poll::Ready(Ok(
1298                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1299                        destination.set_buffer(Some(value));
1300
1301                        // Here we return `StreamResult::Completed` even though
1302                        // we've produced the last item we'll ever produce.
1303                        // That's because the ABI expects
1304                        // `ReturnCode::Completed(1)` rather than
1305                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1306                        // called again since the future will have resolved.
1307                        StreamResult::Completed
1308                    } else {
1309                        StreamResult::Cancelled
1310                    },
1311                ))
1312            }
1313        }
1314
1315        Ok(Self::new_(
1316            store
1317                .as_context_mut()
1318                .new_transmit(TransmitKind::Future, Producer(producer))?,
1319        ))
1320    }
1321
1322    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1323        Self {
1324            id,
1325            _phantom: PhantomData,
1326        }
1327    }
1328
1329    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1330        self.id
1331    }
1332
1333    /// Set the consumer that accepts the result of this future.
1334    ///
1335    /// # Errors
1336    ///
1337    /// Returns an error if this future has already been closed.
1338    ///
1339    /// # Panics
1340    ///
1341    /// Panics if this future does not belong to `store`.
1342    pub fn pipe<S: AsContextMut>(
1343        self,
1344        mut store: S,
1345        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1346    ) -> Result<()>
1347    where
1348        T: func::Lift + 'static,
1349    {
1350        struct Consumer<C>(C);
1351
1352        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1353            for Consumer<C>
1354        {
1355            type Item = T;
1356
1357            fn poll_consume(
1358                self: Pin<&mut Self>,
1359                cx: &mut Context<'_>,
1360                mut store: StoreContextMut<D>,
1361                mut source: Source<Self::Item>,
1362                finish: bool,
1363            ) -> Poll<Result<StreamResult>> {
1364                // SAFETY: This is a standard pin-projection, and we never move
1365                // out of `self`.
1366                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1367
1368                ready!(consumer.poll_consume(
1369                    cx,
1370                    store.as_context_mut(),
1371                    source.reborrow(),
1372                    finish
1373                ))?;
1374
1375                Poll::Ready(Ok(if source.remaining(store) == 0 {
1376                    // Here we return `StreamResult::Completed` even though
1377                    // we've consumed the last item we'll ever consume.  That's
1378                    // because the ABI expects `ReturnCode::Completed(1)` rather
1379                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1380                    // called again since the future will have resolved.
1381                    StreamResult::Completed
1382                } else {
1383                    StreamResult::Cancelled
1384                }))
1385            }
1386        }
1387
1388        store
1389            .as_context_mut()
1390            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1391    }
1392
1393    /// Transfer ownership of the read end of a future from a guest to the host.
1394    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1395        let id = lift_index_to_future(cx, ty, index)?;
1396        Ok(Self::new_(id))
1397    }
1398
1399    /// Close this `FutureReader`.
1400    ///
1401    /// This will close this half of the future which will signal to a pending
1402    /// write, if any, that the reader side is dropped. If the writer half has
1403    /// not yet written a value then when it attempts to write a value it will
1404    /// see that this end is closed.
1405    ///
1406    /// # Errors
1407    ///
1408    /// Returns an error if this future has already been closed.
1409    ///
1410    /// # Panics
1411    ///
1412    /// Panics if the store that the [`Accessor`] is derived from does not own
1413    /// this future.
1414    ///
1415    /// [`Accessor`]: crate::component::Accessor
1416    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1417        future_close(store.as_context_mut().0, &mut self.id)
1418    }
1419
1420    /// Convenience method around [`Self::close`].
1421    pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1422        accessor.as_accessor().with(|access| self.close(access))
1423    }
1424
1425    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1426    /// drop and clean it up from the store.
1427    ///
1428    /// Note that the `accessor` provided must own this future and is
1429    /// additionally transferred to the `GuardedFutureReader` return value.
1430    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1431    where
1432        A: AsAccessor,
1433    {
1434        GuardedFutureReader::new(accessor, self)
1435    }
1436
1437    /// Attempts to convert this [`FutureReader<T>`] to a [`FutureAny`].
1438    ///
1439    /// # Errors
1440    ///
1441    /// This function will return an error if `self` does not belong to
1442    /// `store`.
1443    pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1444    where
1445        T: ComponentType + 'static,
1446    {
1447        FutureAny::try_from_future_reader(store, self)
1448    }
1449
1450    /// Attempts to convert a [`FutureAny`] into a [`FutureReader<T>`].
1451    ///
1452    /// # Errors
1453    ///
1454    /// This function will fail if `T` doesn't match the type of the value that
1455    /// `future` is sending.
1456    pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1457    where
1458        T: ComponentType + 'static,
1459    {
1460        future.try_into_future_reader()
1461    }
1462}
1463
1464impl<T> fmt::Debug for FutureReader<T> {
1465    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1466        f.debug_struct("FutureReader")
1467            .field("id", &self.id)
1468            .finish()
1469    }
1470}
1471
1472pub(super) fn future_close(
1473    store: &mut StoreOpaque,
1474    id: &mut TableId<TransmitHandle>,
1475) -> Result<()> {
1476    let id = mem::replace(id, TableId::new(u32::MAX));
1477    store.host_drop_reader(id, TransmitKind::Future)
1478}
1479
1480/// Transfer ownership of the read end of a future from the host to a guest.
1481pub(super) fn lift_index_to_future(
1482    cx: &mut LiftContext<'_>,
1483    ty: InterfaceType,
1484    index: u32,
1485) -> Result<TableId<TransmitHandle>> {
1486    match ty {
1487        InterfaceType::Future(src) => {
1488            let handle_table = cx
1489                .instance_mut()
1490                .table_for_transmit(TransmitIndex::Future(src));
1491            let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1492            if is_done {
1493                bail!("cannot lift future after being notified that the writable end dropped");
1494            }
1495            let id = TableId::<TransmitHandle>::new(rep);
1496            let concurrent_state = cx.concurrent_state_mut();
1497            let future = concurrent_state.get_mut(id)?;
1498            future.common.handle = None;
1499            let state = future.state;
1500
1501            if concurrent_state.get_mut(state)?.done {
1502                bail!("cannot lift future after previous read succeeded");
1503            }
1504
1505            Ok(id)
1506        }
1507        _ => func::bad_type_info(),
1508    }
1509}
1510
1511/// Transfer ownership of the read end of a future from the host to a guest.
1512pub(super) fn lower_future_to_index<U>(
1513    id: TableId<TransmitHandle>,
1514    cx: &mut LowerContext<'_, U>,
1515    ty: InterfaceType,
1516) -> Result<u32> {
1517    match ty {
1518        InterfaceType::Future(dst) => {
1519            let concurrent_state = cx.store.0.concurrent_state_mut();
1520            let state = concurrent_state.get_mut(id)?.state;
1521            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1522
1523            let handle = cx
1524                .instance_mut()
1525                .table_for_transmit(TransmitIndex::Future(dst))
1526                .future_insert_read(dst, rep)?;
1527
1528            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1529
1530            Ok(handle)
1531        }
1532        _ => func::bad_type_info(),
1533    }
1534}
1535
1536// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1537// safe and correct since we lift and lower future handles as `u32`s.
1538unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1539    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1540
1541    type Lower = <u32 as func::ComponentType>::Lower;
1542
1543    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1544        match ty {
1545            InterfaceType::Future(ty) => {
1546                let ty = types.types[*ty].ty;
1547                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1548            }
1549            other => bail!("expected `future`, found `{}`", func::desc(other)),
1550        }
1551    }
1552}
1553
1554// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1555unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1556    fn linear_lower_to_flat<U>(
1557        &self,
1558        cx: &mut LowerContext<'_, U>,
1559        ty: InterfaceType,
1560        dst: &mut MaybeUninit<Self::Lower>,
1561    ) -> Result<()> {
1562        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1563    }
1564
1565    fn linear_lower_to_memory<U>(
1566        &self,
1567        cx: &mut LowerContext<'_, U>,
1568        ty: InterfaceType,
1569        offset: usize,
1570    ) -> Result<()> {
1571        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1572            cx,
1573            InterfaceType::U32,
1574            offset,
1575        )
1576    }
1577}
1578
1579// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1580unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1581    fn linear_lift_from_flat(
1582        cx: &mut LiftContext<'_>,
1583        ty: InterfaceType,
1584        src: &Self::Lower,
1585    ) -> Result<Self> {
1586        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1587        Self::lift_from_index(cx, ty, index)
1588    }
1589
1590    fn linear_lift_from_memory(
1591        cx: &mut LiftContext<'_>,
1592        ty: InterfaceType,
1593        bytes: &[u8],
1594    ) -> Result<Self> {
1595        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1596        Self::lift_from_index(cx, ty, index)
1597    }
1598}
1599
1600/// A [`FutureReader`] paired with an [`Accessor`].
1601///
1602/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1603/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1604/// [`FutureReader::guard`].
1605///
1606/// [`Accessor`]: crate::component::Accessor
1607pub struct GuardedFutureReader<T, A>
1608where
1609    A: AsAccessor,
1610{
1611    // This field is `None` to implement the conversion from this guard back to
1612    // `FutureReader`. When `None` is seen in the destructor it will cause the
1613    // destructor to do nothing.
1614    reader: Option<FutureReader<T>>,
1615    accessor: A,
1616}
1617
1618impl<T, A> GuardedFutureReader<T, A>
1619where
1620    A: AsAccessor,
1621{
1622    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1623    ///
1624    /// # Panics
1625    ///
1626    /// Panics if [`Config::concurrency_support`] is not enabled.
1627    ///
1628    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1629    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1630        assert!(
1631            accessor
1632                .as_accessor()
1633                .with(|a| a.as_context().0.concurrency_support())
1634        );
1635        Self {
1636            reader: Some(reader),
1637            accessor,
1638        }
1639    }
1640
1641    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1642    /// back.
1643    pub fn into_future(self) -> FutureReader<T> {
1644        self.into()
1645    }
1646}
1647
1648impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1649where
1650    A: AsAccessor,
1651{
1652    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1653        guard.reader.take().unwrap()
1654    }
1655}
1656
1657impl<T, A> Drop for GuardedFutureReader<T, A>
1658where
1659    A: AsAccessor,
1660{
1661    fn drop(&mut self) {
1662        if let Some(reader) = &mut self.reader {
1663            // Currently this can only fail if the future is closed twice, which
1664            // this guard prevents, so this error shouldn't happen.
1665            let result = reader.close_with(&self.accessor);
1666            debug_assert!(result.is_ok());
1667        }
1668    }
1669}
1670
1671/// Represents the readable end of a Component Model `stream`.
1672///
1673/// Note that `StreamReader` instances must be disposed of using `close`;
1674/// otherwise the in-store representation will leak and the writer end will hang
1675/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1676/// disposal happens automatically.
1677pub struct StreamReader<T> {
1678    id: TableId<TransmitHandle>,
1679    _phantom: PhantomData<T>,
1680}
1681
1682impl<T> StreamReader<T> {
1683    /// Create a new stream with the specified producer.
1684    ///
1685    /// # Errors
1686    ///
1687    /// Returns an error if the resource table for this store is full or if
1688    /// [`Config::concurrency_support`] is not enabled.
1689    ///
1690    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1691    pub fn new<S: AsContextMut>(
1692        mut store: S,
1693        producer: impl StreamProducer<S::Data, Item = T>,
1694    ) -> Result<Self>
1695    where
1696        T: func::Lower + func::Lift + Send + Sync + 'static,
1697    {
1698        ensure!(
1699            store.as_context().0.concurrency_support(),
1700            "concurrency support is not enabled",
1701        );
1702        Ok(Self::new_(
1703            store
1704                .as_context_mut()
1705                .new_transmit(TransmitKind::Stream, producer)?,
1706        ))
1707    }
1708
1709    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1710        Self {
1711            id,
1712            _phantom: PhantomData,
1713        }
1714    }
1715
1716    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1717        self.id
1718    }
1719
1720    /// Attempt to consume this object by converting it into the specified type.
1721    ///
1722    /// This can be useful for "short-circuiting" host-to-host streams,
1723    /// bypassing the guest entirely.  For example, if a guest task returns a
1724    /// host-created stream and then exits, this function may be used to
1725    /// retrieve the write end, after which the guest instance and store may be
1726    /// disposed of if no longer needed.
1727    ///
1728    /// This will return `Ok(_)` if and only if the following conditions are
1729    /// met:
1730    ///
1731    /// - The stream was created by the host (i.e. not by the guest).
1732    ///
1733    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1734    /// producer provided to `StreamReader::new` when the stream was created,
1735    /// along with `TypeId::of::<V>()`.
1736    ///
1737    /// # Panics
1738    ///
1739    /// Panics if this stream has already been closed, or if this stream doesn't
1740    /// belong to the specified `store`.
1741    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1742        let store = store.as_context_mut();
1743        let state = store.0.concurrent_state_mut();
1744        let id = state.get_mut(self.id).unwrap().state;
1745        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1746            match try_into(TypeId::of::<V>()) {
1747                Some(result) => {
1748                    self.close(store).unwrap();
1749                    Ok(*result.downcast::<V>().unwrap())
1750                }
1751                None => Err(self),
1752            }
1753        } else {
1754            Err(self)
1755        }
1756    }
1757
1758    /// Set the consumer that accepts the items delivered to this stream.
1759    ///
1760    /// # Errors
1761    ///
1762    /// Returns an error if this stream has already been closed.
1763    ///
1764    /// # Panics
1765    ///
1766    /// Panics if this stream does not belong to `store`.
1767    pub fn pipe<S: AsContextMut>(
1768        self,
1769        mut store: S,
1770        consumer: impl StreamConsumer<S::Data, Item = T>,
1771    ) -> Result<()>
1772    where
1773        T: 'static,
1774    {
1775        store
1776            .as_context_mut()
1777            .set_consumer(self.id, TransmitKind::Stream, consumer)
1778    }
1779
1780    /// Transfer ownership of the read end of a stream from a guest to the host.
1781    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1782        let id = lift_index_to_stream(cx, ty, index)?;
1783        Ok(Self::new_(id))
1784    }
1785
1786    /// Close this `StreamReader`.
1787    ///
1788    /// This will signal that this portion of the stream is closed causing all
1789    /// future writes to return immediately with "DROPPED".
1790    ///
1791    /// # Errors
1792    ///
1793    /// Returns an error if this stream has already been closed.
1794    ///
1795    /// # Panics
1796    ///
1797    /// Panics if the store that the [`Accessor`] is derived from does not own
1798    /// this stream.
1799    ///
1800    /// [`Accessor`]: crate::component::Accessor
1801    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1802        stream_close(store.as_context_mut().0, &mut self.id)
1803    }
1804
1805    /// Convenience method around [`Self::close`].
1806    pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1807        accessor.as_accessor().with(|access| self.close(access))
1808    }
1809
1810    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1811    /// drop and clean it up from the store.
1812    ///
1813    /// Note that the `accessor` provided must own this future and is
1814    /// additionally transferred to the `GuardedStreamReader` return value.
1815    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1816    where
1817        A: AsAccessor,
1818    {
1819        GuardedStreamReader::new(accessor, self)
1820    }
1821
1822    /// Attempts to convert this [`StreamReader<T>`] to a [`StreamAny`].
1823    ///
1824    /// # Errors
1825    ///
1826    /// This function will return an error if `self` does not belong to
1827    /// `store`.
1828    pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1829    where
1830        T: ComponentType + 'static,
1831    {
1832        StreamAny::try_from_stream_reader(store, self)
1833    }
1834
1835    /// Attempts to convert a [`StreamAny`] into a [`StreamReader<T>`].
1836    ///
1837    /// # Errors
1838    ///
1839    /// This function will fail if `T` doesn't match the type of the value that
1840    /// `stream` is sending.
1841    pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1842    where
1843        T: ComponentType + 'static,
1844    {
1845        stream.try_into_stream_reader()
1846    }
1847}
1848
1849impl<T> fmt::Debug for StreamReader<T> {
1850    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1851        f.debug_struct("StreamReader")
1852            .field("id", &self.id)
1853            .finish()
1854    }
1855}
1856
1857pub(super) fn stream_close(
1858    store: &mut StoreOpaque,
1859    id: &mut TableId<TransmitHandle>,
1860) -> Result<()> {
1861    let id = mem::replace(id, TableId::new(u32::MAX));
1862    store.host_drop_reader(id, TransmitKind::Stream)
1863}
1864
1865/// Transfer ownership of the read end of a stream from a guest to the host.
1866pub(super) fn lift_index_to_stream(
1867    cx: &mut LiftContext<'_>,
1868    ty: InterfaceType,
1869    index: u32,
1870) -> Result<TableId<TransmitHandle>> {
1871    match ty {
1872        InterfaceType::Stream(src) => {
1873            let handle_table = cx
1874                .instance_mut()
1875                .table_for_transmit(TransmitIndex::Stream(src));
1876            let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1877            if is_done {
1878                bail!("cannot lift stream after being notified that the writable end dropped");
1879            }
1880            let id = TableId::<TransmitHandle>::new(rep);
1881            cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1882            Ok(id)
1883        }
1884        _ => func::bad_type_info(),
1885    }
1886}
1887
1888/// Transfer ownership of the read end of a stream from the host to a guest.
1889pub(super) fn lower_stream_to_index<U>(
1890    id: TableId<TransmitHandle>,
1891    cx: &mut LowerContext<'_, U>,
1892    ty: InterfaceType,
1893) -> Result<u32> {
1894    match ty {
1895        InterfaceType::Stream(dst) => {
1896            let concurrent_state = cx.store.0.concurrent_state_mut();
1897            let state = concurrent_state.get_mut(id)?.state;
1898            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1899
1900            let handle = cx
1901                .instance_mut()
1902                .table_for_transmit(TransmitIndex::Stream(dst))
1903                .stream_insert_read(dst, rep)?;
1904
1905            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1906
1907            Ok(handle)
1908        }
1909        _ => func::bad_type_info(),
1910    }
1911}
1912
1913// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1914// safe and correct since we lift and lower stream handles as `u32`s.
1915unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1916    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1917
1918    type Lower = <u32 as func::ComponentType>::Lower;
1919
1920    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1921        match ty {
1922            InterfaceType::Stream(ty) => {
1923                let ty = types.types[*ty].ty;
1924                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1925            }
1926            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1927        }
1928    }
1929}
1930
1931// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1932unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1933    fn linear_lower_to_flat<U>(
1934        &self,
1935        cx: &mut LowerContext<'_, U>,
1936        ty: InterfaceType,
1937        dst: &mut MaybeUninit<Self::Lower>,
1938    ) -> Result<()> {
1939        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1940    }
1941
1942    fn linear_lower_to_memory<U>(
1943        &self,
1944        cx: &mut LowerContext<'_, U>,
1945        ty: InterfaceType,
1946        offset: usize,
1947    ) -> Result<()> {
1948        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1949            cx,
1950            InterfaceType::U32,
1951            offset,
1952        )
1953    }
1954}
1955
1956// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1957unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1958    fn linear_lift_from_flat(
1959        cx: &mut LiftContext<'_>,
1960        ty: InterfaceType,
1961        src: &Self::Lower,
1962    ) -> Result<Self> {
1963        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1964        Self::lift_from_index(cx, ty, index)
1965    }
1966
1967    fn linear_lift_from_memory(
1968        cx: &mut LiftContext<'_>,
1969        ty: InterfaceType,
1970        bytes: &[u8],
1971    ) -> Result<Self> {
1972        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1973        Self::lift_from_index(cx, ty, index)
1974    }
1975}
1976
1977/// A [`StreamReader`] paired with an [`Accessor`].
1978///
1979/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1980/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1981/// [`StreamReader::guard`].
1982///
1983/// [`Accessor`]: crate::component::Accessor
1984pub struct GuardedStreamReader<T, A>
1985where
1986    A: AsAccessor,
1987{
1988    // This field is `None` to implement the conversion from this guard back to
1989    // `StreamReader`. When `None` is seen in the destructor it will cause the
1990    // destructor to do nothing.
1991    reader: Option<StreamReader<T>>,
1992    accessor: A,
1993}
1994
1995impl<T, A> GuardedStreamReader<T, A>
1996where
1997    A: AsAccessor,
1998{
1999    /// Create a new `GuardedStreamReader` with the specified `accessor` and
2000    /// `reader`.
2001    ///
2002    /// # Panics
2003    ///
2004    /// Panics if [`Config::concurrency_support`] is not enabled.
2005    ///
2006    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
2007    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
2008        assert!(
2009            accessor
2010                .as_accessor()
2011                .with(|a| a.as_context().0.concurrency_support())
2012        );
2013        Self {
2014            reader: Some(reader),
2015            accessor,
2016        }
2017    }
2018
2019    /// Extracts the underlying [`StreamReader`] from this guard, returning it
2020    /// back.
2021    pub fn into_stream(self) -> StreamReader<T> {
2022        self.into()
2023    }
2024}
2025
2026impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
2027where
2028    A: AsAccessor,
2029{
2030    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
2031        guard.reader.take().unwrap()
2032    }
2033}
2034
2035impl<T, A> Drop for GuardedStreamReader<T, A>
2036where
2037    A: AsAccessor,
2038{
2039    fn drop(&mut self) {
2040        if let Some(reader) = &mut self.reader {
2041            // Currently this can only fail if the future is closed twice, which
2042            // this guard prevents, so this error shouldn't happen.
2043            let result = reader.close_with(&self.accessor);
2044            debug_assert!(result.is_ok());
2045        }
2046    }
2047}
2048
2049/// Represents a Component Model `error-context`.
2050pub struct ErrorContext {
2051    rep: u32,
2052}
2053
2054impl ErrorContext {
2055    pub(crate) fn new(rep: u32) -> Self {
2056        Self { rep }
2057    }
2058
2059    /// Convert this `ErrorContext` into a [`Val`].
2060    pub fn into_val(self) -> Val {
2061        Val::ErrorContext(ErrorContextAny(self.rep))
2062    }
2063
2064    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
2065    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
2066        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
2067            bail!("expected `error-context`; got `{}`", value.desc());
2068        };
2069        Ok(Self::new(*rep))
2070    }
2071
2072    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
2073        match ty {
2074            InterfaceType::ErrorContext(src) => {
2075                let rep = cx
2076                    .instance_mut()
2077                    .table_for_error_context(src)
2078                    .error_context_rep(index)?;
2079
2080                Ok(Self { rep })
2081            }
2082            _ => func::bad_type_info(),
2083        }
2084    }
2085}
2086
2087pub(crate) fn lower_error_context_to_index<U>(
2088    rep: u32,
2089    cx: &mut LowerContext<'_, U>,
2090    ty: InterfaceType,
2091) -> Result<u32> {
2092    match ty {
2093        InterfaceType::ErrorContext(dst) => {
2094            let tbl = cx.instance_mut().table_for_error_context(dst);
2095            tbl.error_context_insert(rep)
2096        }
2097        _ => func::bad_type_info(),
2098    }
2099}
2100// SAFETY: This relies on the `ComponentType` implementation for `u32` being
2101// safe and correct since we lift and lower future handles as `u32`s.
2102unsafe impl func::ComponentType for ErrorContext {
2103    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2104
2105    type Lower = <u32 as func::ComponentType>::Lower;
2106
2107    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2108        match ty {
2109            InterfaceType::ErrorContext(_) => Ok(()),
2110            other => bail!("expected `error`, found `{}`", func::desc(other)),
2111        }
2112    }
2113}
2114
2115// SAFETY: See the comment on the `ComponentType` `impl` for this type.
2116unsafe impl func::Lower for ErrorContext {
2117    fn linear_lower_to_flat<T>(
2118        &self,
2119        cx: &mut LowerContext<'_, T>,
2120        ty: InterfaceType,
2121        dst: &mut MaybeUninit<Self::Lower>,
2122    ) -> Result<()> {
2123        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2124            cx,
2125            InterfaceType::U32,
2126            dst,
2127        )
2128    }
2129
2130    fn linear_lower_to_memory<T>(
2131        &self,
2132        cx: &mut LowerContext<'_, T>,
2133        ty: InterfaceType,
2134        offset: usize,
2135    ) -> Result<()> {
2136        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2137            cx,
2138            InterfaceType::U32,
2139            offset,
2140        )
2141    }
2142}
2143
2144// SAFETY: See the comment on the `ComponentType` `impl` for this type.
2145unsafe impl func::Lift for ErrorContext {
2146    fn linear_lift_from_flat(
2147        cx: &mut LiftContext<'_>,
2148        ty: InterfaceType,
2149        src: &Self::Lower,
2150    ) -> Result<Self> {
2151        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2152        Self::lift_from_index(cx, ty, index)
2153    }
2154
2155    fn linear_lift_from_memory(
2156        cx: &mut LiftContext<'_>,
2157        ty: InterfaceType,
2158        bytes: &[u8],
2159    ) -> Result<Self> {
2160        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2161        Self::lift_from_index(cx, ty, index)
2162    }
2163}
2164
2165/// Represents the read or write end of a stream or future.
2166pub(super) struct TransmitHandle {
2167    pub(super) common: WaitableCommon,
2168    /// See `TransmitState`
2169    state: TableId<TransmitState>,
2170}
2171
2172impl TransmitHandle {
2173    fn new(state: TableId<TransmitState>) -> Self {
2174        Self {
2175            common: WaitableCommon::default(),
2176            state,
2177        }
2178    }
2179}
2180
2181impl TableDebug for TransmitHandle {
2182    fn type_name() -> &'static str {
2183        "TransmitHandle"
2184    }
2185}
2186
2187/// Represents the state of a stream or future.
2188struct TransmitState {
2189    /// The write end of the stream or future.
2190    write_handle: TableId<TransmitHandle>,
2191    /// The read end of the stream or future.
2192    read_handle: TableId<TransmitHandle>,
2193    /// See `WriteState`
2194    write: WriteState,
2195    /// See `ReadState`
2196    read: ReadState,
2197    /// Whether further values may be transmitted via this stream or future.
2198    done: bool,
2199    /// The original creator of this stream, used for type-checking with
2200    /// `{Future,Stream}Any`.
2201    pub(super) origin: TransmitOrigin,
2202}
2203
2204#[derive(Copy, Clone)]
2205pub(super) enum TransmitOrigin {
2206    Host,
2207    GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2208    GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2209}
2210
2211impl TransmitState {
2212    fn new(origin: TransmitOrigin) -> Self {
2213        Self {
2214            write_handle: TableId::new(u32::MAX),
2215            read_handle: TableId::new(u32::MAX),
2216            read: ReadState::Open,
2217            write: WriteState::Open,
2218            done: false,
2219            origin,
2220        }
2221    }
2222}
2223
2224impl TableDebug for TransmitState {
2225    fn type_name() -> &'static str {
2226        "TransmitState"
2227    }
2228}
2229
2230impl TransmitOrigin {
2231    fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2232        match index {
2233            TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2234            TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2235        }
2236    }
2237}
2238
2239type PollStream = Box<
2240    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2241>;
2242
2243type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2244
2245/// Represents the state of the write end of a stream or future.
2246enum WriteState {
2247    /// The write end is open, but no write is pending.
2248    Open,
2249    /// The write end is owned by a guest task and a write is pending.
2250    GuestReady {
2251        instance: Instance,
2252        caller: RuntimeComponentInstanceIndex,
2253        ty: TransmitIndex,
2254        flat_abi: Option<FlatAbi>,
2255        options: OptionsIndex,
2256        address: usize,
2257        count: ItemCount,
2258        handle: u32,
2259    },
2260    /// The write end is owned by the host, which is ready to produce items.
2261    HostReady {
2262        produce: PollStream,
2263        try_into: TryInto,
2264        guest_offset: ItemCount,
2265        cancel: bool,
2266        cancel_waker: Option<Waker>,
2267    },
2268    /// The write end has been dropped.
2269    Dropped,
2270}
2271
2272impl fmt::Debug for WriteState {
2273    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2274        match self {
2275            Self::Open => f.debug_tuple("Open").finish(),
2276            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2277            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2278            Self::Dropped => f.debug_tuple("Dropped").finish(),
2279        }
2280    }
2281}
2282
2283/// Represents the state of the read end of a stream or future.
2284enum ReadState {
2285    /// The read end is open, but no read is pending.
2286    Open,
2287    /// The read end is owned by a guest task and a read is pending.
2288    GuestReady {
2289        ty: TransmitIndex,
2290        caller_instance: RuntimeComponentInstanceIndex,
2291        caller_thread: QualifiedThreadId,
2292        flat_abi: Option<FlatAbi>,
2293        instance: Instance,
2294        options: OptionsIndex,
2295        address: usize,
2296        count: ItemCount,
2297        handle: u32,
2298    },
2299    /// The read end is owned by a host task, and it is ready to consume items.
2300    HostReady {
2301        consume: PollStream,
2302        guest_offset: ItemCount,
2303        cancel: bool,
2304        cancel_waker: Option<Waker>,
2305    },
2306    /// Both the read and write ends are owned by the host.
2307    HostToHost {
2308        accept: Box<
2309            dyn for<'a> Fn(
2310                    &'a mut UntypedWriteBuffer<'a>,
2311                )
2312                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2313                + Send
2314                + Sync,
2315        >,
2316        buffer: Vec<u8>,
2317        limit: usize,
2318    },
2319    /// The read end has been dropped.
2320    Dropped,
2321}
2322
2323impl fmt::Debug for ReadState {
2324    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2325        match self {
2326            Self::Open => f.debug_tuple("Open").finish(),
2327            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2328            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2329            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2330            Self::Dropped => f.debug_tuple("Dropped").finish(),
2331        }
2332    }
2333}
2334
2335fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result<ReturnCode> {
2336    Ok(match state {
2337        StreamResult::Dropped => ReturnCode::Dropped(count),
2338        StreamResult::Completed => ReturnCode::completed(kind, count),
2339        StreamResult::Cancelled => ReturnCode::Cancelled(count),
2340    })
2341}
2342
2343impl StoreOpaque {
2344    fn pipe_from_guest(
2345        &mut self,
2346        kind: TransmitKind,
2347        id: TableId<TransmitState>,
2348        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2349    ) {
2350        let future = async move {
2351            let stream_state = future.await?;
2352            tls::get(|store| {
2353                let state = store.concurrent_state_mut();
2354                let transmit = state.get_mut(id)?;
2355                let ReadState::HostReady {
2356                    consume,
2357                    guest_offset,
2358                    ..
2359                } = mem::replace(&mut transmit.read, ReadState::Open)
2360                else {
2361                    bail_bug!("expected ReadState::HostReady")
2362                };
2363                let code = return_code(kind, stream_state, guest_offset)?;
2364                transmit.read = match stream_state {
2365                    StreamResult::Dropped => ReadState::Dropped,
2366                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2367                        consume,
2368                        guest_offset: ItemCount::ZERO,
2369                        cancel: false,
2370                        cancel_waker: None,
2371                    },
2372                };
2373                let WriteState::GuestReady { ty, handle, .. } =
2374                    mem::replace(&mut transmit.write, WriteState::Open)
2375                else {
2376                    bail_bug!("expected WriteState::GuestReady")
2377                };
2378                state.send_write_result(ty, id, handle, code)?;
2379                Ok(())
2380            })
2381        };
2382
2383        self.concurrent_state_mut().push_future(future.boxed());
2384    }
2385
2386    fn pipe_to_guest(
2387        &mut self,
2388        kind: TransmitKind,
2389        id: TableId<TransmitState>,
2390        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2391    ) {
2392        let future = async move {
2393            let stream_state = future.await?;
2394            tls::get(|store| {
2395                let state = store.concurrent_state_mut();
2396                let transmit = state.get_mut(id)?;
2397                let WriteState::HostReady {
2398                    produce,
2399                    try_into,
2400                    guest_offset,
2401                    ..
2402                } = mem::replace(&mut transmit.write, WriteState::Open)
2403                else {
2404                    bail_bug!("expected WriteState::HostReady")
2405                };
2406                let code = return_code(kind, stream_state, guest_offset)?;
2407                transmit.write = match stream_state {
2408                    StreamResult::Dropped => WriteState::Dropped,
2409                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2410                        produce,
2411                        try_into,
2412                        guest_offset: ItemCount::ZERO,
2413                        cancel: false,
2414                        cancel_waker: None,
2415                    },
2416                };
2417                let ReadState::GuestReady { ty, handle, .. } =
2418                    mem::replace(&mut transmit.read, ReadState::Open)
2419                else {
2420                    bail_bug!("expected ReadState::GuestReady")
2421                };
2422                state.send_read_result(ty, id, handle, code)?;
2423                Ok(())
2424            })
2425        };
2426
2427        self.concurrent_state_mut().push_future(future.boxed());
2428    }
2429
2430    /// Drop the read end of a stream or future read from the host.
2431    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2432        let state = self.concurrent_state_mut();
2433        let transmit_id = state.get_mut(id)?.state;
2434        let transmit = state
2435            .get_mut(transmit_id)
2436            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2437        log::trace!(
2438            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2439            transmit.read,
2440            transmit.write
2441        );
2442
2443        transmit.read = ReadState::Dropped;
2444
2445        // If the write end is already dropped, it should stay dropped,
2446        // otherwise, it should be opened.
2447        let new_state = if let WriteState::Dropped = &transmit.write {
2448            WriteState::Dropped
2449        } else {
2450            WriteState::Open
2451        };
2452
2453        let write_handle = transmit.write_handle;
2454
2455        match mem::replace(&mut transmit.write, new_state) {
2456            // If a guest is waiting to write, notify it that the read end has
2457            // been dropped.
2458            WriteState::GuestReady { ty, handle, .. } => {
2459                state.update_event(
2460                    write_handle.rep(),
2461                    match ty {
2462                        TransmitIndex::Future(ty) => Event::FutureWrite {
2463                            code: ReturnCode::Dropped(ItemCount::ZERO),
2464                            pending: Some((ty, handle)),
2465                        },
2466                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2467                            code: ReturnCode::Dropped(ItemCount::ZERO),
2468                            pending: Some((ty, handle)),
2469                        },
2470                    },
2471                )?;
2472            }
2473
2474            WriteState::HostReady { .. } => {}
2475
2476            WriteState::Open => {
2477                state.update_event(
2478                    write_handle.rep(),
2479                    match kind {
2480                        TransmitKind::Future => Event::FutureWrite {
2481                            code: ReturnCode::Dropped(ItemCount::ZERO),
2482                            pending: None,
2483                        },
2484                        TransmitKind::Stream => Event::StreamWrite {
2485                            code: ReturnCode::Dropped(ItemCount::ZERO),
2486                            pending: None,
2487                        },
2488                    },
2489                )?;
2490            }
2491
2492            WriteState::Dropped => {
2493                log::trace!("host_drop_reader delete {transmit_id:?}");
2494                state.delete_transmit(transmit_id)?;
2495            }
2496        }
2497        Ok(())
2498    }
2499
2500    /// Drop the write end of a stream or future read from the host.
2501    fn host_drop_writer(
2502        &mut self,
2503        id: TableId<TransmitHandle>,
2504        on_drop_open: Option<fn() -> Result<()>>,
2505    ) -> Result<()> {
2506        let state = self.concurrent_state_mut();
2507        let transmit_id = state.get_mut(id)?.state;
2508        let transmit = state
2509            .get_mut(transmit_id)
2510            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2511        log::trace!(
2512            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2513            transmit.read,
2514            transmit.write
2515        );
2516
2517        // Existing queued transmits must be updated with information for the impending writer closure
2518        match &mut transmit.write {
2519            WriteState::GuestReady { .. } => {
2520                bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2521            }
2522            WriteState::HostReady { .. } => {}
2523            v @ WriteState::Open => {
2524                if let (Some(on_drop_open), false) = (
2525                    on_drop_open,
2526                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2527                ) {
2528                    on_drop_open()?;
2529                } else {
2530                    *v = WriteState::Dropped;
2531                }
2532            }
2533            WriteState::Dropped => bail_bug!("write state is already dropped"),
2534        }
2535
2536        let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2537
2538        // If the existing read state is dropped, then there's nothing to read
2539        // and we can keep it that way.
2540        //
2541        // If the read state was any other state, then we must set the new state to open
2542        // to indicate that there *is* data to be read
2543        let new_state = if let ReadState::Dropped = &transmit.read {
2544            ReadState::Dropped
2545        } else {
2546            ReadState::Open
2547        };
2548
2549        let read_handle = transmit.read_handle;
2550
2551        // Swap in the new read state
2552        match mem::replace(&mut transmit.read, new_state) {
2553            // If the guest was ready to read, then we cannot drop the reader (or writer);
2554            // we must deliver the event, and update the state associated with the handle to
2555            // represent that a read must be performed
2556            ReadState::GuestReady { ty, handle, .. } => {
2557                // Ensure the final read of the guest is queued, with appropriate closure indicator
2558                self.concurrent_state_mut().update_event(
2559                    read_handle.rep(),
2560                    match ty {
2561                        TransmitIndex::Future(ty) => Event::FutureRead {
2562                            code: ReturnCode::Dropped(ItemCount::ZERO),
2563                            pending: Some((ty, handle)),
2564                        },
2565                        TransmitIndex::Stream(ty) => Event::StreamRead {
2566                            code: ReturnCode::Dropped(ItemCount::ZERO),
2567                            pending: Some((ty, handle)),
2568                        },
2569                    },
2570                )?;
2571            }
2572
2573            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2574
2575            // If the read state is open, then there are no registered readers of the stream/future
2576            ReadState::Open => {
2577                self.concurrent_state_mut().update_event(
2578                    read_handle.rep(),
2579                    match on_drop_open {
2580                        Some(_) => Event::FutureRead {
2581                            code: ReturnCode::Dropped(ItemCount::ZERO),
2582                            pending: None,
2583                        },
2584                        None => Event::StreamRead {
2585                            code: ReturnCode::Dropped(ItemCount::ZERO),
2586                            pending: None,
2587                        },
2588                    },
2589                )?;
2590            }
2591
2592            // If the read state was already dropped, then we can remove the transmit state completely
2593            // (both writer and reader have been dropped)
2594            ReadState::Dropped => {
2595                log::trace!("host_drop_writer delete {transmit_id:?}");
2596                self.concurrent_state_mut().delete_transmit(transmit_id)?;
2597            }
2598        }
2599        Ok(())
2600    }
2601
2602    pub(super) fn transmit_origin(
2603        &mut self,
2604        id: TableId<TransmitHandle>,
2605    ) -> Result<TransmitOrigin> {
2606        let state = self.concurrent_state_mut();
2607        let state_id = state.get_mut(id)?.state;
2608        Ok(state.get_mut(state_id)?.origin)
2609    }
2610}
2611
2612impl<T> StoreContextMut<'_, T> {
2613    fn new_transmit<P: StreamProducer<T>>(
2614        mut self,
2615        kind: TransmitKind,
2616        producer: P,
2617    ) -> Result<TableId<TransmitHandle>>
2618    where
2619        P::Item: func::Lower,
2620    {
2621        let token = StoreToken::new(self.as_context_mut());
2622        let state = self.0.concurrent_state_mut();
2623        let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2624        let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2625        let id = state.get_mut(read)?.state;
2626        let mut dropped = false;
2627        let produce = Box::new({
2628            let producer = producer.clone();
2629            move || {
2630                let producer = producer.clone();
2631                async move {
2632                    let mut state = producer.take()?;
2633                    let (mine, buffer) = &mut *state;
2634
2635                    let (result, cancelled) = if buffer.remaining().is_empty() {
2636                        future::poll_fn(|cx| {
2637                            tls::get(|store| {
2638                                let transmit = store.concurrent_state_mut().get_mut(id)?;
2639
2640                                let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2641                                    bail_bug!("expected WriteState::HostReady")
2642                                };
2643
2644                                let mut host_buffer =
2645                                    if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2646                                        Some(Cursor::new(mem::take(buffer)))
2647                                    } else {
2648                                        None
2649                                    };
2650
2651                                let poll = mine.as_mut().poll_produce(
2652                                    cx,
2653                                    token.as_context_mut(store),
2654                                    Destination {
2655                                        id,
2656                                        buffer,
2657                                        host_buffer: host_buffer.as_mut(),
2658                                        _phantom: PhantomData,
2659                                    },
2660                                    cancel,
2661                                );
2662
2663                                let transmit = store.concurrent_state_mut().get_mut(id)?;
2664
2665                                let host_offset = if let (
2666                                    Some(host_buffer),
2667                                    ReadState::HostToHost { buffer, limit, .. },
2668                                ) = (host_buffer, &mut transmit.read)
2669                                {
2670                                    *limit = usize::try_from(host_buffer.position())?;
2671                                    *buffer = host_buffer.into_inner();
2672                                    *limit
2673                                } else {
2674                                    0
2675                                };
2676
2677                                {
2678                                    let WriteState::HostReady {
2679                                        guest_offset,
2680                                        cancel,
2681                                        cancel_waker,
2682                                        ..
2683                                    } = &mut transmit.write
2684                                    else {
2685                                        bail_bug!("expected WriteState::HostReady")
2686                                    };
2687
2688                                    if poll.is_pending() {
2689                                        if !buffer.remaining().is_empty()
2690                                            || *guest_offset > 0
2691                                            || host_offset > 0
2692                                        {
2693                                            bail!(
2694                                                "StreamProducer::poll_produce returned Poll::Pending \
2695                                                 after producing at least one item"
2696                                            )
2697                                        }
2698                                        *cancel_waker = Some(cx.waker().clone());
2699                                    } else {
2700                                        *cancel_waker = None;
2701                                        *cancel = false;
2702                                    }
2703                                }
2704
2705                                Ok(poll.map(|v| v.map(|result| (result, cancel))))
2706                            })?
2707                        })
2708                            .await?
2709                    } else {
2710                        (StreamResult::Completed, false)
2711                    };
2712
2713                    let (guest_offset, host_offset, count) = tls::get(|store| {
2714                        let transmit = store.concurrent_state_mut().get_mut(id)?;
2715                        let (count, host_offset) = match &transmit.read {
2716                            &ReadState::GuestReady { count, .. } => (count.as_u32(), 0),
2717                            &ReadState::HostToHost { limit, .. } => (1, limit),
2718                            _ => bail_bug!("invalid read state"),
2719                        };
2720                        let guest_offset = match &transmit.write {
2721                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2722                            _ => bail_bug!("invalid write state"),
2723                        };
2724                        Ok((guest_offset, host_offset, count))
2725                    })?;
2726
2727                    match result {
2728                        StreamResult::Completed => {
2729                            if count > 1
2730                                && buffer.remaining().is_empty()
2731                                && guest_offset == 0
2732                                && host_offset == 0
2733                            {
2734                                bail!(
2735                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2736                                     without producing any items"
2737                                );
2738                            }
2739                        }
2740                        StreamResult::Cancelled => {
2741                            if !cancelled {
2742                                bail!(
2743                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2744                                     without being given a `finish` parameter value of true"
2745                                );
2746                            }
2747                        }
2748                        StreamResult::Dropped => {
2749                            dropped = true;
2750                        }
2751                    }
2752
2753                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2754
2755                    drop(state);
2756
2757                    if write_buffer {
2758                        write(token, id, producer.clone(), kind).await?;
2759                    }
2760
2761                    Ok(if dropped {
2762                        if producer.with(|p| p.1.remaining().is_empty())?  {
2763                            StreamResult::Dropped
2764                        } else {
2765                            StreamResult::Completed
2766                        }
2767                    } else {
2768                        result
2769                    })
2770                }
2771                .boxed()
2772            }
2773        });
2774        let try_into = Box::new(move |ty| {
2775            let (mine, buffer) = producer.try_lock().ok()?.take()?;
2776            match P::try_into(mine, ty) {
2777                Ok(value) => Some(value),
2778                Err(mine) => {
2779                    *producer.try_lock().ok()? = Some((mine, buffer));
2780                    None
2781                }
2782            }
2783        });
2784        state.get_mut(id)?.write = WriteState::HostReady {
2785            produce,
2786            try_into,
2787            guest_offset: ItemCount::ZERO,
2788            cancel: false,
2789            cancel_waker: None,
2790        };
2791        Ok(read)
2792    }
2793
2794    fn set_consumer<C: StreamConsumer<T>>(
2795        mut self,
2796        id: TableId<TransmitHandle>,
2797        kind: TransmitKind,
2798        consumer: C,
2799    ) -> Result<()> {
2800        let token = StoreToken::new(self.as_context_mut());
2801        let state = self.0.concurrent_state_mut();
2802        let id = state.get_mut(id)?.state;
2803        let transmit = state.get_mut(id)?;
2804        let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2805        let consume_with_buffer = {
2806            let consumer = consumer.clone();
2807            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2808                let mut mine = consumer.take()?;
2809
2810                let host_buffer_remaining_before =
2811                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2812
2813                let (result, cancelled) = future::poll_fn(|cx| {
2814                    tls::get(|store| {
2815                        let cancel = match &store.concurrent_state_mut().get_mut(id)?.read {
2816                            &ReadState::HostReady { cancel, .. } => cancel,
2817                            ReadState::Open => false,
2818                            _ => bail_bug!("unexpected read state"),
2819                        };
2820
2821                        let poll = mine.as_mut().poll_consume(
2822                            cx,
2823                            token.as_context_mut(store),
2824                            Source {
2825                                id,
2826                                host_buffer: host_buffer.as_deref_mut(),
2827                            },
2828                            cancel,
2829                        );
2830
2831                        if let ReadState::HostReady {
2832                            cancel_waker,
2833                            cancel,
2834                            ..
2835                        } = &mut store.concurrent_state_mut().get_mut(id)?.read
2836                        {
2837                            if poll.is_pending() {
2838                                *cancel_waker = Some(cx.waker().clone());
2839                            } else {
2840                                *cancel_waker = None;
2841                                *cancel = false;
2842                            }
2843                        }
2844
2845                        Ok(poll.map(|v| v.map(|result| (result, cancel))))
2846                    })?
2847                })
2848                .await?;
2849
2850                let (guest_offset, count) = tls::get(|store| {
2851                    let transmit = store.concurrent_state_mut().get_mut(id)?;
2852                    Ok((
2853                        match &transmit.read {
2854                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2855                            ReadState::Open => ItemCount::ZERO,
2856                            _ => bail_bug!("invalid read state"),
2857                        },
2858                        match &transmit.write {
2859                            WriteState::GuestReady { count, .. } => count.as_usize(),
2860                            WriteState::HostReady { .. } => match host_buffer_remaining_before {
2861                                Some(n) => n,
2862                                None => bail_bug!("host_buffer_remaining_before should be set"),
2863                            },
2864                            _ => bail_bug!("invalid write state"),
2865                        },
2866                    ))
2867                })?;
2868
2869                match result {
2870                    StreamResult::Completed => {
2871                        if count > 0
2872                            && guest_offset == 0
2873                            && host_buffer_remaining_before
2874                                .zip(host_buffer.map(|v| v.remaining().len()))
2875                                .map(|(before, after)| before == after)
2876                                .unwrap_or(false)
2877                        {
2878                            bail!(
2879                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2880                                 without consuming any items"
2881                            );
2882                        }
2883
2884                        if let TransmitKind::Future = kind {
2885                            tls::get(|store| {
2886                                store.concurrent_state_mut().get_mut(id)?.done = true;
2887                                crate::error::Ok(())
2888                            })?;
2889                        }
2890                    }
2891                    StreamResult::Cancelled => {
2892                        if !cancelled {
2893                            bail!(
2894                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2895                                 without being given a `finish` parameter value of true"
2896                            );
2897                        }
2898                    }
2899                    StreamResult::Dropped => {}
2900                }
2901
2902                Ok(result)
2903            }
2904        };
2905        let consume = {
2906            let consume = consume_with_buffer.clone();
2907            Box::new(move || {
2908                let consume = consume.clone();
2909                async move { consume(None).await }.boxed()
2910            })
2911        };
2912
2913        match &transmit.write {
2914            WriteState::Open => {
2915                transmit.read = ReadState::HostReady {
2916                    consume,
2917                    guest_offset: ItemCount::ZERO,
2918                    cancel: false,
2919                    cancel_waker: None,
2920                };
2921            }
2922            &WriteState::GuestReady { .. } => {
2923                let future = consume();
2924                transmit.read = ReadState::HostReady {
2925                    consume,
2926                    guest_offset: ItemCount::ZERO,
2927                    cancel: false,
2928                    cancel_waker: None,
2929                };
2930                self.0.pipe_from_guest(kind, id, future);
2931            }
2932            WriteState::HostReady { .. } => {
2933                let WriteState::HostReady { produce, .. } = mem::replace(
2934                    &mut transmit.write,
2935                    WriteState::HostReady {
2936                        produce: Box::new(|| {
2937                            Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2938                        }),
2939                        try_into: Box::new(|_| None),
2940                        guest_offset: ItemCount::ZERO,
2941                        cancel: false,
2942                        cancel_waker: None,
2943                    },
2944                ) else {
2945                    bail_bug!("expected WriteState::HostReady")
2946                };
2947
2948                transmit.read = ReadState::HostToHost {
2949                    accept: Box::new(move |input| {
2950                        let consume = consume_with_buffer.clone();
2951                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2952                    }),
2953                    buffer: Vec::new(),
2954                    limit: 0,
2955                };
2956
2957                let future = async move {
2958                    loop {
2959                        if tls::get(|store| {
2960                            crate::error::Ok(matches!(
2961                                store.concurrent_state_mut().get_mut(id)?.read,
2962                                ReadState::Dropped
2963                            ))
2964                        })? {
2965                            break Ok(());
2966                        }
2967
2968                        match produce().await? {
2969                            StreamResult::Completed | StreamResult::Cancelled => {}
2970                            StreamResult::Dropped => break Ok(()),
2971                        }
2972
2973                        if let TransmitKind::Future = kind {
2974                            break Ok(());
2975                        }
2976                    }
2977                }
2978                .map(move |result| {
2979                    tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2980                    result
2981                });
2982
2983                state.push_future(Box::pin(future));
2984            }
2985            WriteState::Dropped => {
2986                let reader = transmit.read_handle;
2987                self.0.host_drop_reader(reader, kind)?;
2988            }
2989        }
2990        Ok(())
2991    }
2992}
2993
2994async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2995    token: StoreToken<D>,
2996    id: TableId<TransmitState>,
2997    pair: Arc<LockedState<(P, B)>>,
2998    kind: TransmitKind,
2999) -> Result<()> {
3000    let (read, guest_offset) = tls::get(|store| {
3001        let transmit = store.concurrent_state_mut().get_mut(id)?;
3002
3003        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
3004            Some(guest_offset)
3005        } else {
3006            None
3007        };
3008
3009        crate::error::Ok((
3010            mem::replace(&mut transmit.read, ReadState::Open),
3011            guest_offset,
3012        ))
3013    })?;
3014
3015    match read {
3016        ReadState::GuestReady {
3017            ty,
3018            flat_abi,
3019            options,
3020            address,
3021            count,
3022            handle,
3023            instance,
3024            caller_instance,
3025            caller_thread,
3026        } => {
3027            let guest_offset = match guest_offset {
3028                Some(i) => i,
3029                None => bail_bug!("guest_offset should be present if ready"),
3030            };
3031
3032            if let TransmitKind::Future = kind {
3033                tls::get(|store| {
3034                    store.concurrent_state_mut().get_mut(id)?.done = true;
3035                    crate::error::Ok(())
3036                })?;
3037            }
3038
3039            let old_remaining = pair.with(|p| p.1.remaining().len())?;
3040            let accept = {
3041                let pair = pair.clone();
3042                move |mut store: StoreContextMut<D>| {
3043                    let mut state = pair.take()?;
3044                    lower::<T, B, D>(
3045                        store.as_context_mut(),
3046                        instance,
3047                        caller_thread,
3048                        options,
3049                        ty,
3050                        address + (T::SIZE32 * guest_offset.as_usize()),
3051                        count.as_usize() - guest_offset.as_usize(),
3052                        &mut state.1,
3053                    )?;
3054                    crate::error::Ok(())
3055                }
3056            };
3057
3058            if guest_offset < count {
3059                if T::MAY_REQUIRE_REALLOC {
3060                    // For payloads which may require a realloc call, use a
3061                    // oneshot::channel and background task.  This is
3062                    // necessary because calling the guest while there are
3063                    // host embedder frames on the stack is unsound.
3064                    let (tx, rx) = oneshot::channel();
3065                    tls::get(move |store| {
3066                        store
3067                            .concurrent_state_mut()
3068                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3069                                move |store| {
3070                                    _ = tx.send(accept(token.as_context_mut(store))?);
3071                                    Ok(())
3072                                },
3073                            ))))
3074                    });
3075                    rx.await?
3076                } else {
3077                    // Optimize flat payloads (i.e. those which do not
3078                    // require calling the guest's realloc function) by
3079                    // lowering directly instead of using a oneshot::channel
3080                    // and background task.
3081                    tls::get(|store| accept(token.as_context_mut(store)))?
3082                };
3083            }
3084
3085            tls::get(|store| {
3086                let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3087
3088                let transmit = store.concurrent_state_mut().get_mut(id)?;
3089
3090                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3091                    bail_bug!("expected WriteState::HostReady")
3092                };
3093
3094                guest_offset.inc(count)?;
3095
3096                transmit.read = ReadState::GuestReady {
3097                    ty,
3098                    flat_abi,
3099                    options,
3100                    address,
3101                    count: ItemCount::new_usize(count)?,
3102                    handle,
3103                    instance,
3104                    caller_instance,
3105                    caller_thread,
3106                };
3107
3108                crate::error::Ok(())
3109            })?;
3110
3111            Ok(())
3112        }
3113
3114        ReadState::HostToHost {
3115            accept,
3116            mut buffer,
3117            limit,
3118        } => {
3119            let mut state = StreamResult::Completed;
3120            let mut position = 0;
3121
3122            while !matches!(state, StreamResult::Dropped) && position < limit {
3123                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3124                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3125                (buffer, position, _) = slice_buffer.into_parts();
3126            }
3127
3128            {
3129                let mut pair = pair.take()?;
3130                let (_, buffer) = &mut *pair;
3131
3132                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3133                    state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3134                }
3135            }
3136
3137            tls::get(|store| {
3138                store.concurrent_state_mut().get_mut(id)?.read = match state {
3139                    StreamResult::Dropped => ReadState::Dropped,
3140                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3141                        accept,
3142                        buffer,
3143                        limit: 0,
3144                    },
3145                };
3146
3147                crate::error::Ok(())
3148            })?;
3149            Ok(())
3150        }
3151
3152        _ => bail_bug!("unexpected read state"),
3153    }
3154}
3155
3156impl Instance {
3157    /// Handle a host- or guest-initiated write by delivering the item(s) to the
3158    /// `StreamConsumer` for the specified stream or future.
3159    fn consume(
3160        self,
3161        store: &mut dyn VMStore,
3162        kind: TransmitKind,
3163        transmit_id: TableId<TransmitState>,
3164        consume: PollStream,
3165        guest_offset: ItemCount,
3166        cancel: bool,
3167    ) -> Result<ReturnCode> {
3168        let mut future = consume();
3169        store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
3170            consume,
3171            guest_offset,
3172            cancel,
3173            cancel_waker: None,
3174        };
3175        let poll = tls::set(store, || {
3176            future
3177                .as_mut()
3178                .poll(&mut Context::from_waker(&Waker::noop()))
3179        });
3180
3181        Ok(match poll {
3182            Poll::Ready(state) => {
3183                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3184                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3185                    bail_bug!("expected ReadState::HostReady")
3186                };
3187                let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3188                transmit.write = WriteState::Open;
3189                code
3190            }
3191            Poll::Pending => {
3192                store.pipe_from_guest(kind, transmit_id, future);
3193                ReturnCode::Blocked
3194            }
3195        })
3196    }
3197
3198    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
3199    /// for the specified stream or future for items.
3200    fn produce(
3201        self,
3202        store: &mut dyn VMStore,
3203        kind: TransmitKind,
3204        transmit_id: TableId<TransmitState>,
3205        produce: PollStream,
3206        try_into: TryInto,
3207        guest_offset: ItemCount,
3208        cancel: bool,
3209    ) -> Result<ReturnCode> {
3210        let mut future = produce();
3211        store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3212            produce,
3213            try_into,
3214            guest_offset,
3215            cancel,
3216            cancel_waker: None,
3217        };
3218        let poll = tls::set(store, || {
3219            future
3220                .as_mut()
3221                .poll(&mut Context::from_waker(&Waker::noop()))
3222        });
3223
3224        Ok(match poll {
3225            Poll::Ready(state) => {
3226                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3227                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3228                    bail_bug!("expected WriteState::HostReady")
3229                };
3230                let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
3231                transmit.read = ReadState::Open;
3232                code
3233            }
3234            Poll::Pending => {
3235                store.pipe_to_guest(kind, transmit_id, future);
3236                ReturnCode::Blocked
3237            }
3238        })
3239    }
3240
3241    /// Drop the writable end of the specified stream or future from the guest.
3242    pub(super) fn guest_drop_writable(
3243        self,
3244        store: &mut StoreOpaque,
3245        ty: TransmitIndex,
3246        writer: u32,
3247    ) -> Result<()> {
3248        let table = self.id().get_mut(store).table_for_transmit(ty);
3249        let transmit_rep = match ty {
3250            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3251            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3252        };
3253
3254        let id = TableId::<TransmitHandle>::new(transmit_rep);
3255        log::trace!("guest_drop_writable: drop writer {id:?}");
3256        match ty {
3257            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3258            TransmitIndex::Future(_) => store.host_drop_writer(
3259                id,
3260                Some(|| {
3261                    Err(format_err!(
3262                        "cannot drop future write end without first writing a value"
3263                    ))
3264                }),
3265            ),
3266        }
3267    }
3268
3269    /// Copy `count` items from `read_address` to `write_address` for the
3270    /// specified stream or future.
3271    fn copy<T: 'static>(
3272        self,
3273        store: StoreContextMut<T>,
3274        flat_abi: Option<FlatAbi>,
3275        write_caller_instance: RuntimeComponentInstanceIndex,
3276        write_ty: TransmitIndex,
3277        write_options: OptionsIndex,
3278        write_address: usize,
3279        read_caller_instance: RuntimeComponentInstanceIndex,
3280        read_caller_thread: QualifiedThreadId,
3281        read_ty: TransmitIndex,
3282        read_options: OptionsIndex,
3283        read_address: usize,
3284        count: ItemCount,
3285        rep: u32,
3286    ) -> Result<()> {
3287        let (component, mut store) = self.component_and_store_mut(store.0);
3288        let types = component.types();
3289        let count = count.as_usize();
3290
3291        // Validate `write_ty` w.r.t. `write_address` to ensure it's properly
3292        // aligned and in-bounds.
3293        let write_payload_ty = write_ty.payload(types);
3294        let write_abi = match write_payload_ty {
3295            Some(ty) => types.canonical_abi(ty),
3296            None => &CanonicalAbiInfo::ZERO,
3297        };
3298        let write_length_in_bytes = match flat_abi {
3299            Some(abi) => usize::try_from(abi.size)? * count,
3300            None => usize::try_from(write_abi.size32)? * count,
3301        };
3302        if write_length_in_bytes > 0 {
3303            if write_address % usize::try_from(write_abi.align32)? != 0 {
3304                bail!("write pointer not aligned");
3305            }
3306            self.options_memory(store, write_options)
3307                .get(write_address..)
3308                .and_then(|b| b.get(..write_length_in_bytes))
3309                .ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
3310        }
3311
3312        let read_payload_ty = read_ty.payload(types);
3313        let read_abi = match read_payload_ty {
3314            Some(ty) => types.canonical_abi(ty),
3315            None => &CanonicalAbiInfo::ZERO,
3316        };
3317        let read_length_in_bytes = match flat_abi {
3318            Some(abi) => usize::try_from(abi.size)? * count,
3319            None => usize::try_from(read_abi.size32)? * count,
3320        };
3321        if read_length_in_bytes > 0 {
3322            if read_address % usize::try_from(read_abi.align32)? != 0 {
3323                bail!("read pointer not aligned");
3324            }
3325            self.options_memory(store, read_options)
3326                .get(read_address..)
3327                .and_then(|b| b.get(..read_length_in_bytes))
3328                .ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
3329        }
3330
3331        if write_caller_instance == read_caller_instance
3332            && !allow_intra_component_read_write(write_payload_ty)
3333        {
3334            bail!(
3335                "cannot read from and write to intra-component future/stream with non-numeric payload"
3336            )
3337        }
3338
3339        match (write_ty, read_ty) {
3340            (TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
3341                if count != 1 {
3342                    bail_bug!("futures can only send 1 item");
3343                }
3344
3345                let val = write_payload_ty
3346                    .map(|ty| {
3347                        let lift = &mut LiftContext::new(store, write_options, self);
3348                        let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3349                        Val::load(lift, *ty, bytes)
3350                    })
3351                    .transpose()?;
3352
3353                if let Some(val) = val {
3354                    // Serializing the value may require calling the guest's realloc function, so we
3355                    // set the guest's thread context in case realloc requires it, and restore the original
3356                    // thread context after the copy is complete.
3357                    let old_thread = store.set_thread(read_caller_thread)?;
3358                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3359                    let ptr = func::validate_inbounds_dynamic(
3360                        read_abi,
3361                        lower.as_slice_mut(),
3362                        &ValRaw::u32(read_address.try_into()?),
3363                    )?;
3364                    let ty = match read_payload_ty {
3365                        Some(ty) => ty,
3366                        None => bail_bug!("expected read payload type to be present"),
3367                    };
3368                    val.store(lower, *ty, ptr)?;
3369                    store.set_thread(old_thread)?;
3370                }
3371            }
3372            (TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
3373                if write_length_in_bytes == 0 {
3374                    return Ok(());
3375                }
3376                let write_payload_ty = match write_payload_ty {
3377                    Some(ty) => ty,
3378                    None => bail_bug!("expected write payload type to be present"),
3379                };
3380                let read_payload_ty = match read_payload_ty {
3381                    Some(ty) => ty,
3382                    None => bail_bug!("expected read payload type to be present"),
3383                };
3384                if flat_abi.is_some() {
3385                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
3386                    let store_opaque = store.store_opaque_mut();
3387
3388                    assert_eq!(read_length_in_bytes, write_length_in_bytes);
3389
3390                    if self.options_memory(store_opaque, read_options).as_ptr()
3391                        == self.options_memory(store_opaque, write_options).as_ptr()
3392                    {
3393                        let memory = self.options_memory_mut(store_opaque, read_options);
3394                        memory.copy_within(
3395                            write_address..write_address + write_length_in_bytes,
3396                            read_address,
3397                        );
3398                    } else {
3399                        let src = self.options_memory(store_opaque, write_options)[write_address..]
3400                            [..write_length_in_bytes]
3401                            .as_ptr();
3402                        let dst = self.options_memory_mut(store_opaque, read_options)
3403                            [read_address..][..read_length_in_bytes]
3404                            .as_mut_ptr();
3405
3406                        // SAFETY: Both `src` and `dst` have been validated
3407                        // above to be valid pointers as they're derived from
3408                        // slices that have the desired length with the desired
3409                        // read/write permission. The `unsafe` bit here is that
3410                        // the memories are disjoint (different base pointers)
3411                        // and there's no easy way to borrow both
3412                        // simultaneously from the store. Different memories
3413                        // are guaranteed to be disjoint, however, so the
3414                        // `unsafe` here should be ok.
3415                        unsafe {
3416                            src.copy_to_nonoverlapping(dst, write_length_in_bytes);
3417                        }
3418                    }
3419                } else {
3420                    let store_opaque = store.store_opaque_mut();
3421                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
3422                    let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3423                    lift.consume_fuel_array(count, size_of::<Val>())?;
3424
3425                    let values = (0..count)
3426                        .map(|index| {
3427                            let size = usize::try_from(write_abi.size32)?;
3428                            Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
3429                        })
3430                        .collect::<Result<Vec<_>>>()?;
3431
3432                    let id = TableId::<TransmitHandle>::new(rep);
3433                    log::trace!("copy values {values:?} for {id:?}");
3434
3435                    // Serializing the value may require calling the guest's realloc function, so we
3436                    // set the guest's thread context in case realloc requires it, and restore the original
3437                    // thread context after the copy is complete.
3438                    let old_thread = store.set_thread(read_caller_thread)?;
3439                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3440                    let mut ptr = read_address;
3441                    for value in values {
3442                        value.store(lower, *read_payload_ty, ptr)?;
3443                        ptr += usize::try_from(read_abi.size32)?;
3444                    }
3445                    store.set_thread(old_thread)?;
3446                }
3447            }
3448            _ => bail_bug!("mismatched transmit types in copy"),
3449        }
3450
3451        Ok(())
3452    }
3453
3454    fn check_bounds(
3455        self,
3456        store: &StoreOpaque,
3457        options: OptionsIndex,
3458        ty: TransmitIndex,
3459        address: usize,
3460        count: usize,
3461    ) -> Result<()> {
3462        let types = self.id().get(store).component().types();
3463        let size = usize::try_from(
3464            match ty {
3465                TransmitIndex::Future(ty) => types[types[ty].ty]
3466                    .payload
3467                    .map(|ty| types.canonical_abi(&ty).size32),
3468                TransmitIndex::Stream(ty) => types[types[ty].ty]
3469                    .payload
3470                    .map(|ty| types.canonical_abi(&ty).size32),
3471            }
3472            .unwrap_or(0),
3473        )?;
3474
3475        if count > 0 && size > 0 {
3476            self.options_memory(store, options)
3477                .get(address..)
3478                .and_then(|b| b.get(..(size * count)))
3479                .map(drop)
3480                .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3481        } else {
3482            Ok(())
3483        }
3484    }
3485
3486    /// Write to the specified stream or future from the guest.
3487    pub(super) fn guest_write<T: 'static>(
3488        self,
3489        mut store: StoreContextMut<T>,
3490        caller: RuntimeComponentInstanceIndex,
3491        ty: TransmitIndex,
3492        options: OptionsIndex,
3493        flat_abi: Option<FlatAbi>,
3494        handle: u32,
3495        address: u32,
3496        count: u32,
3497    ) -> Result<ReturnCode> {
3498        let count = ItemCount::new(count)?;
3499
3500        if !self.options(store.0, options).async_ {
3501            // The caller may only sync call `{stream,future}.write` from an
3502            // async task (i.e. a task created via a call to an async export).
3503            // Otherwise, we'll trap.
3504            store.0.check_blocking()?;
3505        }
3506
3507        let address = usize::try_from(address)?;
3508        self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3509        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3510        let TransmitLocalState::Write { done } = *state else {
3511            bail!(Trap::ConcurrentFutureStreamOp);
3512        };
3513
3514        if done {
3515            bail!("cannot write to stream after being notified that the readable end dropped");
3516        }
3517
3518        *state = TransmitLocalState::Busy;
3519        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3520        let concurrent_state = store.0.concurrent_state_mut();
3521        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3522        let transmit = concurrent_state.get_mut(transmit_id)?;
3523        log::trace!(
3524            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3525            transmit.read
3526        );
3527
3528        if transmit.done {
3529            bail!("cannot write to future after previous write succeeded or readable end dropped");
3530        }
3531
3532        let new_state = if let ReadState::Dropped = &transmit.read {
3533            ReadState::Dropped
3534        } else {
3535            ReadState::Open
3536        };
3537
3538        let set_guest_ready = |me: &mut ConcurrentState| {
3539            let transmit = me.get_mut(transmit_id)?;
3540            if !matches!(&transmit.write, WriteState::Open) {
3541                bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3542            }
3543            transmit.write = WriteState::GuestReady {
3544                instance: self,
3545                caller,
3546                ty,
3547                flat_abi,
3548                options,
3549                address,
3550                count,
3551                handle,
3552            };
3553            Ok::<_, crate::Error>(())
3554        };
3555
3556        let mut result = match mem::replace(&mut transmit.read, new_state) {
3557            ReadState::GuestReady {
3558                ty: read_ty,
3559                flat_abi: read_flat_abi,
3560                options: read_options,
3561                address: read_address,
3562                count: read_count,
3563                handle: read_handle,
3564                instance: read_instance,
3565                caller_instance: read_caller_instance,
3566                caller_thread: read_caller_thread,
3567            } => {
3568                if flat_abi != read_flat_abi {
3569                    bail_bug!("expected flat ABI calculations to be the same");
3570                }
3571
3572                if let TransmitIndex::Future(_) = ty {
3573                    transmit.done = true;
3574                }
3575
3576                // Note that zero-length reads and writes are handling specially
3577                // by the spec to allow each end to signal readiness to the
3578                // other.  Quoting the spec:
3579                //
3580                // ```
3581                // The meaning of a read or write when the length is 0 is that
3582                // the caller is querying the "readiness" of the other
3583                // side. When a 0-length read/write rendezvous with a
3584                // non-0-length read/write, only the 0-length read/write
3585                // completes; the non-0-length read/write is kept pending (and
3586                // ready for a subsequent rendezvous).
3587                //
3588                // In the corner case where a 0-length read and write
3589                // rendezvous, only the writer is notified of readiness. To
3590                // avoid livelock, the Canonical ABI requires that a writer must
3591                // (eventually) follow a completed 0-length write with a
3592                // non-0-length write that is allowed to block (allowing the
3593                // reader end to run and rendezvous with its own non-0-length
3594                // read).
3595                // ```
3596
3597                let write_complete = count == 0 || read_count > 0;
3598                let read_complete = count > 0;
3599                let read_buffer_remaining = count < read_count;
3600
3601                let read_handle_rep = transmit.read_handle.rep();
3602
3603                let count = count.min(read_count);
3604
3605                self.copy(
3606                    store.as_context_mut(),
3607                    flat_abi,
3608                    caller,
3609                    ty,
3610                    options,
3611                    address,
3612                    read_caller_instance,
3613                    read_caller_thread,
3614                    read_ty,
3615                    read_options,
3616                    read_address,
3617                    count,
3618                    rep,
3619                )?;
3620
3621                let instance = self.id().get_mut(store.0);
3622                let types = instance.component().types();
3623                let item_size = match ty.payload(types) {
3624                    Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3625                    None => 0,
3626                };
3627                let concurrent_state = store.0.concurrent_state_mut();
3628                if read_complete {
3629                    let total = if let Some(Event::StreamRead {
3630                        code: ReturnCode::Completed(old_total),
3631                        ..
3632                    }) = concurrent_state.take_event(read_handle_rep)?
3633                    {
3634                        count.add(old_total)?
3635                    } else {
3636                        count
3637                    };
3638
3639                    let code = ReturnCode::completed(ty.kind(), total);
3640
3641                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3642                }
3643
3644                // If the reader still has buffer remaining, or if this was a
3645                // zero-length rendezvous, then restore the state of the reader
3646                // back to what it was when we found it. Note that for the
3647                // zero-length rendezvous case this specifically won't execute
3648                // the `read_complete` logic above, which is intentional, as the
3649                // reader remains blocked.
3650                if read_buffer_remaining || (count == 0 && read_count == 0) {
3651                    let transmit = concurrent_state.get_mut(transmit_id)?;
3652                    transmit.read = ReadState::GuestReady {
3653                        ty: read_ty,
3654                        flat_abi: read_flat_abi,
3655                        options: read_options,
3656                        address: read_address + (count.as_usize() * item_size),
3657                        count: read_count.sub(count)?,
3658                        handle: read_handle,
3659                        instance: read_instance,
3660                        caller_instance: read_caller_instance,
3661                        caller_thread: read_caller_thread,
3662                    };
3663                }
3664
3665                if write_complete {
3666                    ReturnCode::completed(ty.kind(), count)
3667                } else {
3668                    set_guest_ready(concurrent_state)?;
3669                    ReturnCode::Blocked
3670                }
3671            }
3672
3673            ReadState::HostReady {
3674                consume,
3675                guest_offset,
3676                cancel,
3677                cancel_waker,
3678            } => {
3679                if cancel_waker.is_some() {
3680                    bail_bug!("expected cancel_waker to be none");
3681                }
3682                if cancel {
3683                    bail_bug!("expected cancel to be false");
3684                }
3685                if guest_offset != 0 {
3686                    bail_bug!("expected guest_offset to be 0");
3687                }
3688
3689                if let TransmitIndex::Future(_) = ty {
3690                    transmit.done = true;
3691                }
3692
3693                set_guest_ready(concurrent_state)?;
3694                self.consume(
3695                    store.0,
3696                    ty.kind(),
3697                    transmit_id,
3698                    consume,
3699                    ItemCount::ZERO,
3700                    false,
3701                )?
3702            }
3703
3704            ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3705
3706            ReadState::Open => {
3707                set_guest_ready(concurrent_state)?;
3708                ReturnCode::Blocked
3709            }
3710
3711            ReadState::Dropped => {
3712                if let TransmitIndex::Future(_) = ty {
3713                    transmit.done = true;
3714                }
3715
3716                ReturnCode::Dropped(ItemCount::ZERO)
3717            }
3718        };
3719
3720        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3721            result = self.wait_for_write(store.0, transmit_handle)?;
3722        }
3723
3724        if result != ReturnCode::Blocked {
3725            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3726                TransmitLocalState::Write {
3727                    done: matches!(
3728                        (result, ty),
3729                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3730                    ),
3731                };
3732        }
3733
3734        log::trace!(
3735            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3736        );
3737
3738        Ok(result)
3739    }
3740
3741    /// Read from the specified stream or future from the guest.
3742    pub(super) fn guest_read<T: 'static>(
3743        self,
3744        mut store: StoreContextMut<T>,
3745        caller_instance: RuntimeComponentInstanceIndex,
3746        ty: TransmitIndex,
3747        options: OptionsIndex,
3748        flat_abi: Option<FlatAbi>,
3749        handle: u32,
3750        address: u32,
3751        count: u32,
3752    ) -> Result<ReturnCode> {
3753        let count = ItemCount::new(count)?;
3754
3755        if !self.options(store.0, options).async_ {
3756            // The caller may only sync call `{stream,future}.read` from an
3757            // async task (i.e. a task created via a call to an async export).
3758            // Otherwise, we'll trap.
3759            store.0.check_blocking()?;
3760        }
3761
3762        let address = usize::try_from(address)?;
3763        self.check_bounds(store.0, options, ty, address, count.as_usize())?;
3764        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3765        let TransmitLocalState::Read { done } = *state else {
3766            bail!(Trap::ConcurrentFutureStreamOp);
3767        };
3768
3769        if done {
3770            bail!("cannot read from stream after being notified that the writable end dropped");
3771        }
3772
3773        *state = TransmitLocalState::Busy;
3774        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3775        let concurrent_state = store.0.concurrent_state_mut();
3776        let caller_thread = concurrent_state.current_guest_thread()?;
3777        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3778        let transmit = concurrent_state.get_mut(transmit_id)?;
3779        log::trace!(
3780            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3781            transmit.write
3782        );
3783
3784        if transmit.done {
3785            bail!("cannot read from future after previous read succeeded");
3786        }
3787
3788        let new_state = if let WriteState::Dropped = &transmit.write {
3789            WriteState::Dropped
3790        } else {
3791            WriteState::Open
3792        };
3793
3794        let set_guest_ready = |me: &mut ConcurrentState| {
3795            let transmit = me.get_mut(transmit_id)?;
3796            if !matches!(&transmit.read, ReadState::Open) {
3797                bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3798            }
3799            transmit.read = ReadState::GuestReady {
3800                ty,
3801                flat_abi,
3802                options,
3803                address,
3804                count,
3805                handle,
3806                instance: self,
3807                caller_instance,
3808                caller_thread,
3809            };
3810            Ok::<_, crate::Error>(())
3811        };
3812
3813        let mut result = match mem::replace(&mut transmit.write, new_state) {
3814            WriteState::GuestReady {
3815                instance: _,
3816                ty: write_ty,
3817                flat_abi: write_flat_abi,
3818                options: write_options,
3819                address: write_address,
3820                count: write_count,
3821                handle: write_handle,
3822                caller: write_caller,
3823            } => {
3824                if flat_abi != write_flat_abi {
3825                    bail_bug!("expected flat ABI calculations to be the same");
3826                }
3827
3828                if let TransmitIndex::Future(_) = ty {
3829                    transmit.done = true;
3830                }
3831
3832                let write_handle_rep = transmit.write_handle.rep();
3833
3834                // See the comment in `guest_write` for the
3835                // `ReadState::GuestReady` case concerning zero-length reads and
3836                // writes.
3837
3838                let write_complete = write_count == 0 || count > 0;
3839                let read_complete = write_count > 0;
3840                let write_buffer_remaining = count < write_count;
3841
3842                let count = count.min(write_count);
3843
3844                self.copy(
3845                    store.as_context_mut(),
3846                    flat_abi,
3847                    write_caller,
3848                    write_ty,
3849                    write_options,
3850                    write_address,
3851                    caller_instance,
3852                    caller_thread,
3853                    ty,
3854                    options,
3855                    address,
3856                    count,
3857                    rep,
3858                )?;
3859
3860                let instance = self.id().get_mut(store.0);
3861                let types = instance.component().types();
3862                let item_size = match ty.payload(types) {
3863                    Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
3864                    None => 0,
3865                };
3866                let concurrent_state = store.0.concurrent_state_mut();
3867
3868                if write_complete {
3869                    let total = if let Some(Event::StreamWrite {
3870                        code: ReturnCode::Completed(old_total),
3871                        ..
3872                    }) = concurrent_state.take_event(write_handle_rep)?
3873                    {
3874                        count.add(old_total)?
3875                    } else {
3876                        count
3877                    };
3878
3879                    let code = ReturnCode::completed(ty.kind(), total);
3880
3881                    concurrent_state.send_write_result(
3882                        write_ty,
3883                        transmit_id,
3884                        write_handle,
3885                        code,
3886                    )?;
3887                }
3888
3889                if write_buffer_remaining {
3890                    let transmit = concurrent_state.get_mut(transmit_id)?;
3891                    transmit.write = WriteState::GuestReady {
3892                        instance: self,
3893                        caller: write_caller,
3894                        ty: write_ty,
3895                        flat_abi: write_flat_abi,
3896                        options: write_options,
3897                        address: write_address + (count.as_usize() * item_size),
3898                        count: write_count.sub(count)?,
3899                        handle: write_handle,
3900                    };
3901                }
3902
3903                if read_complete {
3904                    ReturnCode::completed(ty.kind(), count)
3905                } else {
3906                    set_guest_ready(concurrent_state)?;
3907                    ReturnCode::Blocked
3908                }
3909            }
3910
3911            WriteState::HostReady {
3912                produce,
3913                try_into,
3914                guest_offset,
3915                cancel,
3916                cancel_waker,
3917            } => {
3918                if cancel_waker.is_some() {
3919                    bail_bug!("expected cancel_waker to be none");
3920                }
3921                if cancel {
3922                    bail_bug!("expected cancel to be false");
3923                }
3924                if guest_offset != 0 {
3925                    bail_bug!("expected guest_offset to be 0");
3926                }
3927
3928                set_guest_ready(concurrent_state)?;
3929
3930                let code = self.produce(
3931                    store.0,
3932                    ty.kind(),
3933                    transmit_id,
3934                    produce,
3935                    try_into,
3936                    ItemCount::ZERO,
3937                    false,
3938                )?;
3939
3940                if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3941                    store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3942                }
3943
3944                code
3945            }
3946
3947            WriteState::Open => {
3948                set_guest_ready(concurrent_state)?;
3949                ReturnCode::Blocked
3950            }
3951
3952            WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO),
3953        };
3954
3955        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3956            result = self.wait_for_read(store.0, transmit_handle)?;
3957        }
3958
3959        if result != ReturnCode::Blocked {
3960            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3961                TransmitLocalState::Read {
3962                    done: matches!(
3963                        (result, ty),
3964                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3965                    ),
3966                };
3967        }
3968
3969        log::trace!(
3970            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3971        );
3972
3973        Ok(result)
3974    }
3975
3976    fn wait_for_write(
3977        self,
3978        store: &mut StoreOpaque,
3979        handle: TableId<TransmitHandle>,
3980    ) -> Result<ReturnCode> {
3981        let waitable = Waitable::Transmit(handle);
3982        store.wait_for_event(waitable)?;
3983        let event = waitable.take_event(store.concurrent_state_mut())?;
3984        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3985            event
3986        {
3987            waitable.on_delivery(store, self, event)?;
3988            Ok(code)
3989        } else {
3990            bail_bug!("expected either a stream or future write event")
3991        }
3992    }
3993
3994    /// Cancel a pending stream or future write.
3995    fn cancel_write(
3996        self,
3997        store: &mut StoreOpaque,
3998        transmit_id: TableId<TransmitState>,
3999        async_: bool,
4000    ) -> Result<ReturnCode> {
4001        let state = store.concurrent_state_mut();
4002        let transmit = state.get_mut(transmit_id)?;
4003        log::trace!(
4004            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
4005            transmit.read,
4006            transmit.write
4007        );
4008        let waitable = Waitable::Transmit(transmit.write_handle);
4009
4010        let code = if let Some(event) = waitable.take_event(state)? {
4011            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
4012                bail_bug!("expected either a stream or future write event")
4013            };
4014            waitable.on_delivery(store, self, event)?;
4015            match (code, event) {
4016                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
4017                    ReturnCode::Cancelled(count)
4018                }
4019                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4020                _ => bail_bug!("unexpected code/event combo"),
4021            }
4022        } else if let ReadState::HostReady {
4023            cancel,
4024            cancel_waker,
4025            ..
4026        } = &mut state.get_mut(transmit_id)?.read
4027        {
4028            *cancel = true;
4029            if let Some(waker) = cancel_waker.take() {
4030                waker.wake();
4031            }
4032
4033            if async_ {
4034                ReturnCode::Blocked
4035            } else {
4036                let handle = store
4037                    .concurrent_state_mut()
4038                    .get_mut(transmit_id)?
4039                    .write_handle;
4040                self.wait_for_write(store, handle)?
4041            }
4042        } else {
4043            ReturnCode::Cancelled(ItemCount::ZERO)
4044        };
4045
4046        if !matches!(code, ReturnCode::Blocked) {
4047            let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4048
4049            match &transmit.write {
4050                WriteState::GuestReady { .. } => {
4051                    transmit.write = WriteState::Open;
4052                }
4053                WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
4054                WriteState::Open | WriteState::Dropped => {}
4055            }
4056        }
4057
4058        log::trace!("cancelled write {transmit_id:?}: {code:?}");
4059
4060        Ok(code)
4061    }
4062
4063    fn wait_for_read(
4064        self,
4065        store: &mut StoreOpaque,
4066        handle: TableId<TransmitHandle>,
4067    ) -> Result<ReturnCode> {
4068        let waitable = Waitable::Transmit(handle);
4069        store.wait_for_event(waitable)?;
4070        let event = waitable.take_event(store.concurrent_state_mut())?;
4071        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
4072            event
4073        {
4074            waitable.on_delivery(store, self, event)?;
4075            Ok(code)
4076        } else {
4077            bail_bug!("expected either a stream or future read event")
4078        }
4079    }
4080
4081    /// Cancel a pending stream or future read.
4082    fn cancel_read(
4083        self,
4084        store: &mut StoreOpaque,
4085        transmit_id: TableId<TransmitState>,
4086        async_: bool,
4087    ) -> Result<ReturnCode> {
4088        let state = store.concurrent_state_mut();
4089        let transmit = state.get_mut(transmit_id)?;
4090        log::trace!(
4091            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4092            transmit.read,
4093            transmit.write
4094        );
4095
4096        let waitable = Waitable::Transmit(transmit.read_handle);
4097        let code = if let Some(event) = waitable.take_event(state)? {
4098            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4099                bail_bug!("expected either a stream or future read event")
4100            };
4101            waitable.on_delivery(store, self, event)?;
4102            match (code, event) {
4103                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4104                    ReturnCode::Cancelled(count)
4105                }
4106                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4107                _ => bail_bug!("unexpected code/event combo"),
4108            }
4109        } else if let WriteState::HostReady {
4110            cancel,
4111            cancel_waker,
4112            ..
4113        } = &mut state.get_mut(transmit_id)?.write
4114        {
4115            *cancel = true;
4116            if let Some(waker) = cancel_waker.take() {
4117                waker.wake();
4118            }
4119
4120            if async_ {
4121                ReturnCode::Blocked
4122            } else {
4123                let handle = store
4124                    .concurrent_state_mut()
4125                    .get_mut(transmit_id)?
4126                    .read_handle;
4127                self.wait_for_read(store, handle)?
4128            }
4129        } else {
4130            ReturnCode::Cancelled(ItemCount::ZERO)
4131        };
4132
4133        if !matches!(code, ReturnCode::Blocked) {
4134            let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4135
4136            match &transmit.read {
4137                ReadState::GuestReady { .. } => {
4138                    transmit.read = ReadState::Open;
4139                }
4140                ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4141                    bail_bug!("support host read cancellation")
4142                }
4143                ReadState::Open | ReadState::Dropped => {}
4144            }
4145        }
4146
4147        log::trace!("cancelled read {transmit_id:?}: {code:?}");
4148
4149        Ok(code)
4150    }
4151
4152    /// Cancel a pending write for the specified stream or future from the guest.
4153    fn guest_cancel_write(
4154        self,
4155        store: &mut StoreOpaque,
4156        ty: TransmitIndex,
4157        async_: bool,
4158        writer: u32,
4159    ) -> Result<ReturnCode> {
4160        if !async_ {
4161            // The caller may only sync call `{stream,future}.cancel-write` from
4162            // an async task (i.e. a task created via a call to an async
4163            // export).  Otherwise, we'll trap.
4164            store.check_blocking()?;
4165        }
4166
4167        let (rep, state) =
4168            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4169        let id = TableId::<TransmitHandle>::new(rep);
4170        log::trace!("guest cancel write {id:?} (handle {writer})");
4171        match state {
4172            TransmitLocalState::Write { .. } => {
4173                bail!("stream or future write cancelled when no write is pending")
4174            }
4175            TransmitLocalState::Read { .. } => {
4176                bail!("passed read end to `{{stream|future}}.cancel-write`")
4177            }
4178            TransmitLocalState::Busy => {}
4179        }
4180        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4181        let code = self.cancel_write(store, transmit_id, async_)?;
4182        if !matches!(code, ReturnCode::Blocked) {
4183            let state =
4184                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4185                    .1;
4186            if let TransmitLocalState::Busy = state {
4187                *state = TransmitLocalState::Write { done: false };
4188            }
4189        }
4190        Ok(code)
4191    }
4192
4193    /// Cancel a pending read for the specified stream or future from the guest.
4194    fn guest_cancel_read(
4195        self,
4196        store: &mut StoreOpaque,
4197        ty: TransmitIndex,
4198        async_: bool,
4199        reader: u32,
4200    ) -> Result<ReturnCode> {
4201        if !async_ {
4202            // The caller may only sync call `{stream,future}.cancel-read` from
4203            // an async task (i.e. a task created via a call to an async
4204            // export).  Otherwise, we'll trap.
4205            store.check_blocking()?;
4206        }
4207
4208        let (rep, state) =
4209            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4210        let id = TableId::<TransmitHandle>::new(rep);
4211        log::trace!("guest cancel read {id:?} (handle {reader})");
4212        match state {
4213            TransmitLocalState::Read { .. } => {
4214                bail!("stream or future read cancelled when no read is pending")
4215            }
4216            TransmitLocalState::Write { .. } => {
4217                bail!("passed write end to `{{stream|future}}.cancel-read`")
4218            }
4219            TransmitLocalState::Busy => {}
4220        }
4221        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4222        let code = self.cancel_read(store, transmit_id, async_)?;
4223        if !matches!(code, ReturnCode::Blocked) {
4224            let state =
4225                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4226                    .1;
4227            if let TransmitLocalState::Busy = state {
4228                *state = TransmitLocalState::Read { done: false };
4229            }
4230        }
4231        Ok(code)
4232    }
4233
4234    /// Drop the readable end of the specified stream or future from the guest.
4235    fn guest_drop_readable(
4236        self,
4237        store: &mut StoreOpaque,
4238        ty: TransmitIndex,
4239        reader: u32,
4240    ) -> Result<()> {
4241        let table = self.id().get_mut(store).table_for_transmit(ty);
4242        let (rep, _is_done) = match ty {
4243            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4244            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4245        };
4246        let kind = match ty {
4247            TransmitIndex::Stream(_) => TransmitKind::Stream,
4248            TransmitIndex::Future(_) => TransmitKind::Future,
4249        };
4250        let id = TableId::<TransmitHandle>::new(rep);
4251        log::trace!("guest_drop_readable: drop reader {id:?}");
4252        store.host_drop_reader(id, kind)
4253    }
4254
4255    /// Create a new error context for the given component.
4256    pub(crate) fn error_context_new(
4257        self,
4258        store: &mut StoreOpaque,
4259        ty: TypeComponentLocalErrorContextTableIndex,
4260        options: OptionsIndex,
4261        debug_msg_address: u32,
4262        debug_msg_len: u32,
4263    ) -> Result<u32> {
4264        let lift_ctx = &mut LiftContext::new(store, options, self);
4265        let debug_msg = String::linear_lift_from_flat(
4266            lift_ctx,
4267            InterfaceType::String,
4268            &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4269        )?;
4270
4271        // Create a new ErrorContext that is tracked along with other concurrent state
4272        let err_ctx = ErrorContextState { debug_msg };
4273        let state = store.concurrent_state_mut();
4274        let table_id = state.push(err_ctx)?;
4275        let global_ref_count_idx =
4276            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4277
4278        // Add to the global error context ref counts
4279        let _ = state
4280            .global_error_context_ref_counts
4281            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4282
4283        // Error context are tracked both locally (to a single component instance) and globally
4284        // the counts for both must stay in sync.
4285        //
4286        // Here we reflect the newly created global concurrent error context state into the
4287        // component instance's locally tracked count, along with the appropriate key into the global
4288        // ref tracking data structures to enable later lookup
4289        let local_idx = self
4290            .id()
4291            .get_mut(store)
4292            .table_for_error_context(ty)
4293            .error_context_insert(table_id.rep())?;
4294
4295        Ok(local_idx)
4296    }
4297
4298    /// Retrieve the debug message from the specified error context.
4299    pub(super) fn error_context_debug_message<T>(
4300        self,
4301        store: StoreContextMut<T>,
4302        ty: TypeComponentLocalErrorContextTableIndex,
4303        options: OptionsIndex,
4304        err_ctx_handle: u32,
4305        debug_msg_address: u32,
4306    ) -> Result<()> {
4307        // Retrieve the error context and internal debug message
4308        let handle_table_id_rep = self
4309            .id()
4310            .get_mut(store.0)
4311            .table_for_error_context(ty)
4312            .error_context_rep(err_ctx_handle)?;
4313
4314        let state = store.0.concurrent_state_mut();
4315        // Get the state associated with the error context
4316        let ErrorContextState { debug_msg } =
4317            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4318        let debug_msg = debug_msg.clone();
4319
4320        let lower_cx = &mut LowerContext::new(store, options, self);
4321        let debug_msg_address = usize::try_from(debug_msg_address)?;
4322        // Lower the string into the component's memory.
4323        //
4324        // Note that the "8" here is the size of a WIT `string` in linear
4325        // memory, the ptr+length. This'll need to be updated when `memory64`
4326        // comes along. (FIXME(#4311))
4327        let offset = lower_cx
4328            .as_slice_mut()
4329            .get(debug_msg_address..)
4330            .and_then(|b| b.get(..8))
4331            .map(|_| debug_msg_address)
4332            .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4333        debug_msg
4334            .as_str()
4335            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4336
4337        Ok(())
4338    }
4339
4340    /// Implements the `future.cancel-read` intrinsic.
4341    pub(crate) fn future_cancel_read(
4342        self,
4343        store: &mut StoreOpaque,
4344        ty: TypeFutureTableIndex,
4345        async_: bool,
4346        reader: u32,
4347    ) -> Result<u32> {
4348        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4349            .map(|v| v.encode())
4350    }
4351
4352    /// Implements the `future.cancel-write` intrinsic.
4353    pub(crate) fn future_cancel_write(
4354        self,
4355        store: &mut StoreOpaque,
4356        ty: TypeFutureTableIndex,
4357        async_: bool,
4358        writer: u32,
4359    ) -> Result<u32> {
4360        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4361            .map(|v| v.encode())
4362    }
4363
4364    /// Implements the `stream.cancel-read` intrinsic.
4365    pub(crate) fn stream_cancel_read(
4366        self,
4367        store: &mut StoreOpaque,
4368        ty: TypeStreamTableIndex,
4369        async_: bool,
4370        reader: u32,
4371    ) -> Result<u32> {
4372        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4373            .map(|v| v.encode())
4374    }
4375
4376    /// Implements the `stream.cancel-write` intrinsic.
4377    pub(crate) fn stream_cancel_write(
4378        self,
4379        store: &mut StoreOpaque,
4380        ty: TypeStreamTableIndex,
4381        async_: bool,
4382        writer: u32,
4383    ) -> Result<u32> {
4384        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4385            .map(|v| v.encode())
4386    }
4387
4388    /// Implements the `future.drop-readable` intrinsic.
4389    pub(crate) fn future_drop_readable(
4390        self,
4391        store: &mut StoreOpaque,
4392        ty: TypeFutureTableIndex,
4393        reader: u32,
4394    ) -> Result<()> {
4395        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4396    }
4397
4398    /// Implements the `stream.drop-readable` intrinsic.
4399    pub(crate) fn stream_drop_readable(
4400        self,
4401        store: &mut StoreOpaque,
4402        ty: TypeStreamTableIndex,
4403        reader: u32,
4404    ) -> Result<()> {
4405        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4406    }
4407
4408    /// Allocate a new future or stream and grant ownership of both the read and
4409    /// write ends to the (sub-)component instance to which the specified
4410    /// `TransmitIndex` belongs.
4411    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4412        let (write, read) = store
4413            .concurrent_state_mut()
4414            .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4415
4416        let table = self.id().get_mut(store).table_for_transmit(ty);
4417        let (read_handle, write_handle) = match ty {
4418            TransmitIndex::Future(ty) => (
4419                table.future_insert_read(ty, read.rep())?,
4420                table.future_insert_write(ty, write.rep())?,
4421            ),
4422            TransmitIndex::Stream(ty) => (
4423                table.stream_insert_read(ty, read.rep())?,
4424                table.stream_insert_write(ty, write.rep())?,
4425            ),
4426        };
4427
4428        let state = store.concurrent_state_mut();
4429        state.get_mut(read)?.common.handle = Some(read_handle);
4430        state.get_mut(write)?.common.handle = Some(write_handle);
4431
4432        Ok(ResourcePair {
4433            write: write_handle,
4434            read: read_handle,
4435        })
4436    }
4437
4438    /// Drop the specified error context.
4439    pub(crate) fn error_context_drop(
4440        self,
4441        store: &mut StoreOpaque,
4442        ty: TypeComponentLocalErrorContextTableIndex,
4443        error_context: u32,
4444    ) -> Result<()> {
4445        let instance = self.id().get_mut(store);
4446
4447        let local_handle_table = instance.table_for_error_context(ty);
4448
4449        let rep = local_handle_table.error_context_drop(error_context)?;
4450
4451        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4452
4453        let state = store.concurrent_state_mut();
4454        let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4455            .global_error_context_ref_counts
4456            .get_mut(&global_ref_count_idx)
4457        else {
4458            bail_bug!("retrieve concurrent state for error context during drop")
4459        };
4460
4461        // Reduce the component-global ref count, removing tracking if necessary
4462        if *global_ref_count < 1 {
4463            bail_bug!("ref count unexpectedly zero");
4464        }
4465        *global_ref_count -= 1;
4466        if *global_ref_count == 0 {
4467            state
4468                .global_error_context_ref_counts
4469                .remove(&global_ref_count_idx);
4470
4471            state
4472                .delete(TableId::<ErrorContextState>::new(rep))
4473                .context("deleting component-global error context data")?;
4474        }
4475
4476        Ok(())
4477    }
4478
4479    /// Transfer ownership of the specified stream or future read end from one
4480    /// guest to another.
4481    fn guest_transfer(
4482        self,
4483        store: &mut StoreOpaque,
4484        src_idx: u32,
4485        src: TransmitIndex,
4486        dst: TransmitIndex,
4487    ) -> Result<u32> {
4488        let mut instance = self.id().get_mut(store);
4489        let src_table = instance.as_mut().table_for_transmit(src);
4490        let (rep, is_done) = match src {
4491            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4492            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4493        };
4494        if is_done {
4495            bail!("cannot lift after being notified that the writable end dropped");
4496        }
4497        let dst_table = instance.table_for_transmit(dst);
4498        let handle = match dst {
4499            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4500            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4501        }?;
4502        store
4503            .concurrent_state_mut()
4504            .get_mut(TableId::<TransmitHandle>::new(rep))?
4505            .common
4506            .handle = Some(handle);
4507        Ok(handle)
4508    }
4509
4510    /// Implements the `future.new` intrinsic.
4511    pub(crate) fn future_new(
4512        self,
4513        store: &mut StoreOpaque,
4514        ty: TypeFutureTableIndex,
4515    ) -> Result<ResourcePair> {
4516        self.guest_new(store, TransmitIndex::Future(ty))
4517    }
4518
4519    /// Implements the `stream.new` intrinsic.
4520    pub(crate) fn stream_new(
4521        self,
4522        store: &mut StoreOpaque,
4523        ty: TypeStreamTableIndex,
4524    ) -> Result<ResourcePair> {
4525        self.guest_new(store, TransmitIndex::Stream(ty))
4526    }
4527
4528    /// Transfer ownership of the specified future read end from one guest to
4529    /// another.
4530    pub(crate) fn future_transfer(
4531        self,
4532        store: &mut StoreOpaque,
4533        src_idx: u32,
4534        src: TypeFutureTableIndex,
4535        dst: TypeFutureTableIndex,
4536    ) -> Result<u32> {
4537        self.guest_transfer(
4538            store,
4539            src_idx,
4540            TransmitIndex::Future(src),
4541            TransmitIndex::Future(dst),
4542        )
4543    }
4544
4545    /// Transfer ownership of the specified stream read end from one guest to
4546    /// another.
4547    pub(crate) fn stream_transfer(
4548        self,
4549        store: &mut StoreOpaque,
4550        src_idx: u32,
4551        src: TypeStreamTableIndex,
4552        dst: TypeStreamTableIndex,
4553    ) -> Result<u32> {
4554        self.guest_transfer(
4555            store,
4556            src_idx,
4557            TransmitIndex::Stream(src),
4558            TransmitIndex::Stream(dst),
4559        )
4560    }
4561
4562    /// Copy the specified error context from one component to another.
4563    pub(crate) fn error_context_transfer(
4564        self,
4565        store: &mut StoreOpaque,
4566        src_idx: u32,
4567        src: TypeComponentLocalErrorContextTableIndex,
4568        dst: TypeComponentLocalErrorContextTableIndex,
4569    ) -> Result<u32> {
4570        let mut instance = self.id().get_mut(store);
4571        let rep = instance
4572            .as_mut()
4573            .table_for_error_context(src)
4574            .error_context_rep(src_idx)?;
4575        let dst_idx = instance
4576            .table_for_error_context(dst)
4577            .error_context_insert(rep)?;
4578
4579        // Update the global (cross-subcomponent) count for error contexts
4580        // as the new component has essentially created a new reference that will
4581        // be dropped/handled independently
4582        let global_ref_count = store
4583            .concurrent_state_mut()
4584            .global_error_context_ref_counts
4585            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4586            .context("global ref count present for existing (sub)component error context")?;
4587
4588        global_ref_count.0 = global_ref_count
4589            .0
4590            .checked_add(1)
4591            .ok_or_else(|| format_err!(Trap::ReferenceCountOverflow))?;
4592
4593        Ok(dst_idx)
4594    }
4595}
4596
4597impl ComponentInstance {
4598    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4599        let (states, types) = self.instance_states();
4600        let runtime_instance = match ty {
4601            TransmitIndex::Stream(ty) => types[ty].instance,
4602            TransmitIndex::Future(ty) => types[ty].instance,
4603        };
4604        states[runtime_instance].handle_table()
4605    }
4606
4607    fn table_for_error_context(
4608        self: Pin<&mut Self>,
4609        ty: TypeComponentLocalErrorContextTableIndex,
4610    ) -> &mut HandleTable {
4611        let (states, types) = self.instance_states();
4612        let runtime_instance = types[ty].instance;
4613        states[runtime_instance].handle_table()
4614    }
4615
4616    fn get_mut_by_index(
4617        self: Pin<&mut Self>,
4618        ty: TransmitIndex,
4619        index: u32,
4620    ) -> Result<(u32, &mut TransmitLocalState)> {
4621        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4622    }
4623}
4624
4625impl ConcurrentState {
4626    fn send_write_result(
4627        &mut self,
4628        ty: TransmitIndex,
4629        id: TableId<TransmitState>,
4630        handle: u32,
4631        code: ReturnCode,
4632    ) -> Result<()> {
4633        let write_handle = self.get_mut(id)?.write_handle.rep();
4634        self.set_event(
4635            write_handle,
4636            match ty {
4637                TransmitIndex::Future(ty) => Event::FutureWrite {
4638                    code,
4639                    pending: Some((ty, handle)),
4640                },
4641                TransmitIndex::Stream(ty) => Event::StreamWrite {
4642                    code,
4643                    pending: Some((ty, handle)),
4644                },
4645            },
4646        )
4647    }
4648
4649    fn send_read_result(
4650        &mut self,
4651        ty: TransmitIndex,
4652        id: TableId<TransmitState>,
4653        handle: u32,
4654        code: ReturnCode,
4655    ) -> Result<()> {
4656        let read_handle = self.get_mut(id)?.read_handle.rep();
4657        self.set_event(
4658            read_handle,
4659            match ty {
4660                TransmitIndex::Future(ty) => Event::FutureRead {
4661                    code,
4662                    pending: Some((ty, handle)),
4663                },
4664                TransmitIndex::Stream(ty) => Event::StreamRead {
4665                    code,
4666                    pending: Some((ty, handle)),
4667                },
4668            },
4669        )
4670    }
4671
4672    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4673        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4674    }
4675
4676    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4677        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4678    }
4679
4680    /// Set or update the event for the specified waitable.
4681    ///
4682    /// If there is already an event set for this waitable, we assert that it is
4683    /// of the same variant as the new one and reuse the `ReturnCode` count and
4684    /// the `pending` field if applicable.
4685    // TODO: This is a bit awkward due to how
4686    // `Event::{Stream,Future}{Write,Read}` and
4687    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4688    // Consider updating those representations in a way that allows this
4689    // function to be simplified.
4690    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4691        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4692
4693        fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4694            let (ReturnCode::Completed(count)
4695            | ReturnCode::Dropped(count)
4696            | ReturnCode::Cancelled(count)) = old
4697            else {
4698                bail_bug!("unexpected old return code")
4699            };
4700
4701            Ok(match new {
4702                ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count),
4703                ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count),
4704                _ => bail_bug!("unexpected new return code"),
4705            })
4706        }
4707
4708        let event = match (waitable.take_event(self)?, event) {
4709            (None, _) => event,
4710            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4711            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4712            (
4713                Some(Event::StreamWrite {
4714                    code: old_code,
4715                    pending: old_pending,
4716                }),
4717                Event::StreamWrite { code, pending },
4718            ) => Event::StreamWrite {
4719                code: update_code(old_code, code)?,
4720                pending: old_pending.or(pending),
4721            },
4722            (
4723                Some(Event::StreamRead {
4724                    code: old_code,
4725                    pending: old_pending,
4726                }),
4727                Event::StreamRead { code, pending },
4728            ) => Event::StreamRead {
4729                code: update_code(old_code, code)?,
4730                pending: old_pending.or(pending),
4731            },
4732            _ => bail_bug!("unexpected event combination"),
4733        };
4734
4735        waitable.set_event(self, Some(event))
4736    }
4737
4738    /// Allocate a new future or stream, including the `TransmitState` and the
4739    /// `TransmitHandle`s corresponding to the read and write ends.
4740    fn new_transmit(
4741        &mut self,
4742        origin: TransmitOrigin,
4743    ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4744        let state_id = self.push(TransmitState::new(origin))?;
4745
4746        let write = self.push(TransmitHandle::new(state_id))?;
4747        let read = self.push(TransmitHandle::new(state_id))?;
4748
4749        let state = self.get_mut(state_id)?;
4750        state.write_handle = write;
4751        state.read_handle = read;
4752
4753        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4754
4755        Ok((write, read))
4756    }
4757
4758    /// Delete the specified future or stream, including the read and write ends.
4759    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4760        let state = self.delete(state_id)?;
4761        self.delete(state.write_handle)?;
4762        self.delete(state.read_handle)?;
4763
4764        log::trace!(
4765            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4766            state.write_handle,
4767            state.read_handle,
4768        );
4769
4770        Ok(())
4771    }
4772}
4773
4774pub(crate) struct ResourcePair {
4775    pub(crate) write: u32,
4776    pub(crate) read: u32,
4777}
4778
4779impl Waitable {
4780    /// Handle the imminent delivery of the specified event, e.g. by updating
4781    /// the state of the stream or future.
4782    pub(super) fn on_delivery(
4783        &self,
4784        store: &mut StoreOpaque,
4785        instance: Instance,
4786        event: Event,
4787    ) -> Result<()> {
4788        let instance = instance.id().get_mut(store);
4789        let (rep, state, code) = match event {
4790            Event::FutureRead {
4791                pending: Some((ty, handle)),
4792                code,
4793            }
4794            | Event::FutureWrite {
4795                pending: Some((ty, handle)),
4796                code,
4797            } => {
4798                let runtime_instance = instance.component().types()[ty].instance;
4799                let (rep, state) = instance.instance_states().0[runtime_instance]
4800                    .handle_table()
4801                    .future_rep(ty, handle)?;
4802                (rep, state, code)
4803            }
4804            Event::StreamRead {
4805                pending: Some((ty, handle)),
4806                code,
4807            }
4808            | Event::StreamWrite {
4809                pending: Some((ty, handle)),
4810                code,
4811            } => {
4812                let runtime_instance = instance.component().types()[ty].instance;
4813                let (rep, state) = instance.instance_states().0[runtime_instance]
4814                    .handle_table()
4815                    .stream_rep(ty, handle)?;
4816                (rep, state, code)
4817            }
4818            _ => return Ok(()),
4819        };
4820        if rep != self.rep() {
4821            bail_bug!("unexpected rep mismatch");
4822        }
4823        if *state != TransmitLocalState::Busy {
4824            bail_bug!("expected state to be busy");
4825        }
4826        let done = matches!(code, ReturnCode::Dropped(_));
4827        *state = match event {
4828            Event::FutureRead { .. } | Event::StreamRead { .. } => {
4829                TransmitLocalState::Read { done }
4830            }
4831            Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
4832                TransmitLocalState::Write { done }
4833            }
4834            _ => bail_bug!("unexpected event for stream"),
4835        };
4836
4837        let transmit_handle = TableId::<TransmitHandle>::new(rep);
4838        let state = store.concurrent_state_mut();
4839        let transmit_id = state.get_mut(transmit_handle)?.state;
4840        let transmit = state.get_mut(transmit_id)?;
4841
4842        match event {
4843            Event::StreamRead { .. } => {
4844                transmit.read = ReadState::Open;
4845            }
4846            Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4847            _ => {}
4848        }
4849        Ok(())
4850    }
4851}
4852
4853/// Determine whether an intra-component read/write is allowed for the specified
4854/// `stream` or `future` payload type according to the component model
4855/// specification.
4856fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
4857    matches!(
4858        ty,
4859        None | Some(
4860            InterfaceType::S8
4861                | InterfaceType::U8
4862                | InterfaceType::S16
4863                | InterfaceType::U16
4864                | InterfaceType::S32
4865                | InterfaceType::U32
4866                | InterfaceType::S64
4867                | InterfaceType::U64
4868                | InterfaceType::Float32
4869                | InterfaceType::Float64
4870        )
4871    )
4872}
4873
4874/// Helper structure to manage moving a `T` in/out of an interior `Mutex` which
4875/// contains an
4876/// `Option<T>`
4877struct LockedState<T> {
4878    inner: Mutex<Option<T>>,
4879}
4880
4881impl<T> LockedState<T> {
4882    /// Creates a new initial state with `value` stored.
4883    fn new(value: T) -> Self {
4884        Self {
4885            inner: Mutex::new(Some(value)),
4886        }
4887    }
4888
4889    /// Attempts to lock the inner mutex and return its guard.
4890    ///
4891    /// # Errors
4892    ///
4893    /// Fails if this lock is either poisoned or if it's currently locked.
4894    /// As-used in this file there should never actually be contention on this
4895    /// lock nor recursive access so failing to acquire the lock is a fatal
4896    /// error that gets propagated upwards.
4897    fn try_lock(&self) -> Result<MutexGuard<'_, Option<T>>> {
4898        match self.inner.try_lock() {
4899            Ok(lock) => Ok(lock),
4900            Err(_) => bail_bug!("should not have contention on state lock"),
4901        }
4902    }
4903
4904    /// Takes the inner `T` out of this state, returning it as a guard which
4905    /// will put it back when finished.
4906    ///
4907    /// # Errors
4908    ///
4909    /// Returns an error if the state `T` isn't present.
4910    fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4911        let result = self.try_lock()?.take();
4912        match result {
4913            Some(result) => Ok(LockedStateGuard {
4914                value: ManuallyDrop::new(result),
4915                state: self,
4916            }),
4917            None => bail_bug!("lock value unexpectedly missing"),
4918        }
4919    }
4920
4921    /// Performs the operation `f` on the inner state `&mut T`.
4922    ///
4923    /// This will acquire the internal lock and invoke `f`, so `f` should not
4924    /// expect to be able to recursively acquire this lock.
4925    ///
4926    /// # Errors
4927    ///
4928    /// Returns an error if the state `T` isn't present.
4929    fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4930        let mut inner = self.try_lock()?;
4931        match &mut *inner {
4932            Some(state) => Ok(f(state)),
4933            None => bail_bug!("lock value unexpectedly missing"),
4934        }
4935    }
4936}
4937
4938/// Helper structure returned from [`LockedState::take`] which will put the
4939/// state specified by `value` back into the original lock once this is dropped.
4940struct LockedStateGuard<'a, T> {
4941    value: ManuallyDrop<T>,
4942    state: &'a LockedState<T>,
4943}
4944
4945impl<T> Deref for LockedStateGuard<'_, T> {
4946    type Target = T;
4947
4948    fn deref(&self) -> &T {
4949        &self.value
4950    }
4951}
4952
4953impl<T> DerefMut for LockedStateGuard<'_, T> {
4954    fn deref_mut(&mut self) -> &mut T {
4955        &mut self.value
4956    }
4957}
4958
4959impl<T> Drop for LockedStateGuard<'_, T> {
4960    fn drop(&mut self) {
4961        // SAFETY: `ManuallyDrop::take` requires that after invoked the
4962        // original value is not read. This is the `Drop` for this type which
4963        // means we have exclusive ownership and it is not read further in the
4964        // destructor, satisfying this requirement.
4965        let value = unsafe { ManuallyDrop::take(&mut self.value) };
4966
4967        // If this fails due to contention that's a bug, but we're not in a
4968        // position to panic due to this being a destructor nor return an error,
4969        // so defer the bug to showing up later.
4970        if let Ok(mut lock) = self.state.try_lock() {
4971            *lock = Some(value);
4972        }
4973    }
4974}
4975
4976#[cfg(test)]
4977mod tests {
4978    use super::*;
4979    use crate::{Engine, Store};
4980    use core::future::pending;
4981    use core::pin::pin;
4982    use std::sync::LazyLock;
4983
4984    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4985
4986    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4987    where
4988        T: FutureProducer<()>,
4989    {
4990        rx.poll_produce(
4991            &mut Context::from_waker(Waker::noop()),
4992            Store::new(&ENGINE, ()).as_context_mut(),
4993            finish,
4994        )
4995    }
4996
4997    #[test]
4998    fn future_producer() {
4999        let mut fut = pin!(async { crate::error::Ok(()) });
5000        assert!(matches!(
5001            poll_future_producer(fut.as_mut(), false),
5002            Poll::Ready(Ok(Some(()))),
5003        ));
5004
5005        let mut fut = pin!(async { crate::error::Ok(()) });
5006        assert!(matches!(
5007            poll_future_producer(fut.as_mut(), true),
5008            Poll::Ready(Ok(Some(()))),
5009        ));
5010
5011        let mut fut = pin!(pending::<Result<()>>());
5012        assert!(matches!(
5013            poll_future_producer(fut.as_mut(), false),
5014            Poll::Pending,
5015        ));
5016        assert!(matches!(
5017            poll_future_producer(fut.as_mut(), true),
5018            Poll::Ready(Ok(None)),
5019        ));
5020
5021        let (tx, rx) = oneshot::channel();
5022        let mut rx = pin!(rx);
5023        assert!(matches!(
5024            poll_future_producer(rx.as_mut(), false),
5025            Poll::Pending,
5026        ));
5027        assert!(matches!(
5028            poll_future_producer(rx.as_mut(), true),
5029            Poll::Ready(Ok(None)),
5030        ));
5031        tx.send(()).unwrap();
5032        assert!(matches!(
5033            poll_future_producer(rx.as_mut(), true),
5034            Poll::Ready(Ok(Some(()))),
5035        ));
5036
5037        let (tx, rx) = oneshot::channel();
5038        let mut rx = pin!(rx);
5039        tx.send(()).unwrap();
5040        assert!(matches!(
5041            poll_future_producer(rx.as_mut(), false),
5042            Poll::Ready(Ok(Some(()))),
5043        ));
5044
5045        let (tx, rx) = oneshot::channel::<()>();
5046        let mut rx = pin!(rx);
5047        drop(tx);
5048        assert!(matches!(
5049            poll_future_producer(rx.as_mut(), false),
5050            Poll::Ready(Err(..)),
5051        ));
5052
5053        let (tx, rx) = oneshot::channel::<()>();
5054        let mut rx = pin!(rx);
5055        drop(tx);
5056        assert!(matches!(
5057            poll_future_producer(rx.as_mut(), true),
5058            Poll::Ready(Err(..)),
5059        ));
5060    }
5061}