Skip to main content

wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9    AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10    Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17    Error, Result, bail, bail_bug, ensure,
18    error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, ManuallyDrop, MaybeUninit};
26use core::ops::{Deref, DerefMut};
27use core::pin::Pin;
28use core::task::{Context, Poll, Waker, ready};
29use futures::channel::oneshot;
30use futures::{FutureExt as _, stream};
31use std::any::{Any, TypeId};
32use std::boxed::Box;
33use std::io::Cursor;
34use std::string::String;
35use std::sync::{Arc, Mutex, MutexGuard};
36use std::vec::Vec;
37use wasmtime_environ::component::{
38    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
39    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
40    TypeFutureTableIndex, TypeStreamTableIndex,
41};
42
43pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
44
45mod buffers;
46
47/// Enum for distinguishing between a stream or future in functions that handle
48/// both.
49#[derive(Copy, Clone, Debug)]
50pub enum TransmitKind {
51    Stream,
52    Future,
53}
54
55/// Represents `{stream,future}.{read,write}` results.
56#[derive(Copy, Clone, Debug, PartialEq)]
57pub enum ReturnCode {
58    Blocked,
59    Completed(u32),
60    Dropped(u32),
61    Cancelled(u32),
62}
63
64impl ReturnCode {
65    /// Pack `self` into a single 32-bit integer that may be returned to the
66    /// guest.
67    ///
68    /// This corresponds to `pack_copy_result` in the Component Model spec.
69    pub fn encode(&self) -> u32 {
70        const BLOCKED: u32 = 0xffff_ffff;
71        const COMPLETED: u32 = 0x0;
72        const DROPPED: u32 = 0x1;
73        const CANCELLED: u32 = 0x2;
74        match self {
75            ReturnCode::Blocked => BLOCKED,
76            ReturnCode::Completed(n) => {
77                debug_assert!(*n < (1 << 28));
78                (n << 4) | COMPLETED
79            }
80            ReturnCode::Dropped(n) => {
81                debug_assert!(*n < (1 << 28));
82                (n << 4) | DROPPED
83            }
84            ReturnCode::Cancelled(n) => {
85                debug_assert!(*n < (1 << 28));
86                (n << 4) | CANCELLED
87            }
88        }
89    }
90
91    /// Returns `Self::Completed` with the specified count (or zero if
92    /// `matches!(kind, TransmitKind::Future)`)
93    fn completed(kind: TransmitKind, count: u32) -> Self {
94        Self::Completed(if let TransmitKind::Future = kind {
95            0
96        } else {
97            count
98        })
99    }
100}
101
102/// Represents a stream or future type index.
103///
104/// This is useful as a parameter type for functions which operate on either a
105/// future or a stream.
106#[derive(Copy, Clone, Debug)]
107pub enum TransmitIndex {
108    Stream(TypeStreamTableIndex),
109    Future(TypeFutureTableIndex),
110}
111
112impl TransmitIndex {
113    pub fn kind(&self) -> TransmitKind {
114        match self {
115            TransmitIndex::Stream(_) => TransmitKind::Stream,
116            TransmitIndex::Future(_) => TransmitKind::Future,
117        }
118    }
119}
120
121/// Retrieve the payload type of the specified stream or future, or `None` if it
122/// has no payload type.
123fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
124    match ty {
125        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
126        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
127    }
128}
129
130/// Retrieve the host rep and state for the specified guest-visible waitable
131/// handle.
132fn get_mut_by_index_from(
133    handle_table: &mut HandleTable,
134    ty: TransmitIndex,
135    index: u32,
136) -> Result<(u32, &mut TransmitLocalState)> {
137    match ty {
138        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
139        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
140    }
141}
142
143fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
144    mut store: StoreContextMut<U>,
145    instance: Instance,
146    caller_thread: QualifiedThreadId,
147    options: OptionsIndex,
148    ty: TransmitIndex,
149    address: usize,
150    count: usize,
151    buffer: &mut B,
152) -> Result<()> {
153    let count = buffer.remaining().len().min(count);
154
155    // If lowering may call realloc in the guest, then the guest may need
156    // to access its thread context, so we need to set the current thread before lowering
157    // and restore the old one afterward.
158    let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
159        let old_thread = store.0.set_thread(caller_thread)?;
160        (
161            &mut LowerContext::new(store.as_context_mut(), options, instance),
162            Some(old_thread),
163        )
164    } else {
165        (
166            &mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
167            None,
168        )
169    };
170
171    if address % usize::try_from(T::ALIGN32)? != 0 {
172        bail!("read pointer not aligned");
173    }
174    lower
175        .as_slice_mut()
176        .get_mut(address..)
177        .and_then(|b| b.get_mut(..T::SIZE32 * count))
178        .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
179
180    if let Some(ty) = payload(ty, lower.types) {
181        T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
182    }
183
184    if let Some(old_thread) = old_thread {
185        store.0.set_thread(old_thread)?;
186    }
187
188    buffer.skip(count);
189
190    Ok(())
191}
192
193fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
194    lift: &mut LiftContext<'_>,
195    ty: Option<InterfaceType>,
196    buffer: &mut B,
197    address: usize,
198    count: usize,
199) -> Result<()> {
200    let count = count.min(buffer.remaining_capacity());
201    if T::IS_RUST_UNIT_TYPE {
202        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
203        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
204        // is a valid way to populate the zero-sized buffer.
205        buffer.extend(
206            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
207        )
208    } else {
209        let ty = match ty {
210            Some(ty) => ty,
211            None => bail_bug!("type required for non-unit lift"),
212        };
213        if address % usize::try_from(T::ALIGN32)? != 0 {
214            bail!("write pointer not aligned");
215        }
216        lift.memory()
217            .get(address..)
218            .and_then(|b| b.get(..T::SIZE32 * count))
219            .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
220
221        let list = &WasmList::new(address, count, lift, ty)?;
222        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
223    }
224    Ok(())
225}
226
227/// Represents the state associated with an error context
228#[derive(Debug, PartialEq, Eq, PartialOrd)]
229pub(super) struct ErrorContextState {
230    /// Debug message associated with the error context
231    pub(crate) debug_msg: String,
232}
233
234/// Represents the size and alignment for a "flat" Component Model type,
235/// i.e. one containing no pointers or handles.
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub(super) struct FlatAbi {
238    pub(super) size: u32,
239    pub(super) align: u32,
240}
241
242/// Represents the buffer for a host- or guest-initiated stream read.
243pub struct Destination<'a, T, B> {
244    id: TableId<TransmitState>,
245    buffer: &'a mut B,
246    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
247    _phantom: PhantomData<fn() -> T>,
248}
249
250impl<'a, T, B> Destination<'a, T, B> {
251    /// Reborrow `self` so it can be used again later.
252    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
253        Destination {
254            id: self.id,
255            buffer: &mut *self.buffer,
256            host_buffer: self.host_buffer.as_deref_mut(),
257            _phantom: PhantomData,
258        }
259    }
260
261    /// Take the buffer out of `self`, leaving a default-initialized one in its
262    /// place.
263    ///
264    /// This can be useful for reusing the previously-stored buffer's capacity
265    /// instead of allocating a fresh one.
266    pub fn take_buffer(&mut self) -> B
267    where
268        B: Default,
269    {
270        mem::take(self.buffer)
271    }
272
273    /// Store the specified buffer in `self`.
274    ///
275    /// Any items contained in the buffer will be delivered to the reader after
276    /// the `StreamProducer::poll_produce` call to which this `Destination` was
277    /// passed returns (unless overwritten by another call to `set_buffer`).
278    ///
279    /// If items are stored via this buffer _and_ written via a
280    /// `DirectDestination` view of `self`, then the items in the buffer will be
281    /// delivered after the ones written using `DirectDestination`.
282    pub fn set_buffer(&mut self, buffer: B) {
283        *self.buffer = buffer;
284    }
285
286    /// Return the remaining number of items the current read has capacity to
287    /// accept, if known.
288    ///
289    /// This will return `Some(_)` if the reader is a guest; it will return
290    /// `None` if the reader is the host.
291    ///
292    /// Note that this can return `Some(0)`. This means that the guest is
293    /// attempting to perform a zero-length read which typically means that it's
294    /// trying to wait for this stream to be ready-to-read but is not actually
295    /// ready to receive the items yet. The host in this case is allowed to
296    /// either block waiting for readiness or immediately complete the
297    /// operation. The guest is expected to handle both cases. Some more
298    /// discussion about this case can be found in the discussion of ["Stream
299    /// Readiness" in the component-model repo][docs].
300    ///
301    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
302    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
303        // Note that this unwrap should only trigger for bugs in Wasmtime, and
304        // this is modeled here to centralize the `.unwrap()` for this method in
305        // one location.
306        self.remaining_(store.as_context_mut().0).unwrap()
307    }
308
309    fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
310        let transmit = store.concurrent_state_mut().get_mut(self.id)?;
311
312        if let &ReadState::GuestReady { count, .. } = &transmit.read {
313            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
314                bail_bug!("expected WriteState::HostReady")
315            };
316
317            Ok(Some(count - guest_offset))
318        } else {
319            Ok(None)
320        }
321    }
322}
323
324impl<'a, B> Destination<'a, u8, B> {
325    /// Return a `DirectDestination` view of `self`.
326    ///
327    /// If the reader is a guest, this will provide direct access to the guest's
328    /// read buffer.  If the reader is a host, this will provide access to a
329    /// buffer which will be delivered to the host before any items stored using
330    /// `Destination::set_buffer`.
331    ///
332    /// `capacity` will only be used if the reader is a host, in which case it
333    /// will update the length of the buffer, possibly zero-initializing the new
334    /// elements if the new length is larger than the old length.
335    pub fn as_direct<D>(
336        mut self,
337        store: StoreContextMut<'a, D>,
338        capacity: usize,
339    ) -> DirectDestination<'a, D> {
340        if let Some(buffer) = self.host_buffer.as_deref_mut() {
341            buffer.set_position(0);
342            if buffer.get_mut().is_empty() {
343                buffer.get_mut().resize(capacity, 0);
344            }
345        }
346
347        DirectDestination {
348            id: self.id,
349            host_buffer: self.host_buffer,
350            store,
351        }
352    }
353}
354
355/// Represents a read from a `stream<u8>`, providing direct access to the
356/// writer's buffer.
357pub struct DirectDestination<'a, D: 'static> {
358    id: TableId<TransmitState>,
359    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
360    store: StoreContextMut<'a, D>,
361}
362
363impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
364    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
365        let rem = self.remaining();
366        let n = rem.len().min(buf.len());
367        rem[..n].copy_from_slice(&buf[..n]);
368        self.mark_written(n);
369        Ok(n)
370    }
371
372    fn flush(&mut self) -> std::io::Result<()> {
373        Ok(())
374    }
375}
376
377impl<D: 'static> DirectDestination<'_, D> {
378    /// Provide direct access to the writer's buffer.
379    pub fn remaining(&mut self) -> &mut [u8] {
380        // Note that this unwrap should only trigger for bugs in Wasmtime, and
381        // this is modeled here to centralize the `.unwrap()` for this method in
382        // one location.
383        self.remaining_().unwrap()
384    }
385
386    fn remaining_(&mut self) -> Result<&mut [u8]> {
387        if let Some(buffer) = self.host_buffer.as_deref_mut() {
388            return Ok(buffer.get_mut());
389        }
390        let transmit = self
391            .store
392            .as_context_mut()
393            .0
394            .concurrent_state_mut()
395            .get_mut(self.id)?;
396
397        let &ReadState::GuestReady {
398            address,
399            count,
400            options,
401            instance,
402            ..
403        } = &transmit.read
404        else {
405            bail_bug!("expected ReadState::GuestReady")
406        };
407
408        let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
409            bail_bug!("expected WriteState::HostReady")
410        };
411
412        let memory = instance
413            .options_memory_mut(self.store.0, options)
414            .get_mut((address + guest_offset)..)
415            .and_then(|b| b.get_mut(..(count - guest_offset)));
416        match memory {
417            Some(memory) => Ok(memory),
418            None => bail_bug!("guest buffer unexpectedly out of bounds"),
419        }
420    }
421
422    /// Mark the specified number of bytes as written to the writer's buffer.
423    ///
424    /// # Panics
425    ///
426    /// This will panic if the count is larger than the size of the
427    /// buffer returned by `Self::remaining`.
428    pub fn mark_written(&mut self, count: usize) {
429        // Note that this unwrap should only trigger for bugs in Wasmtime, and
430        // this is modeled here to centralize the `.unwrap()` for this method in
431        // one location.
432        self.mark_written_(count).unwrap()
433    }
434
435    fn mark_written_(&mut self, count: usize) -> Result<()> {
436        if let Some(buffer) = self.host_buffer.as_deref_mut() {
437            buffer.set_position(
438                // Note that these `.unwrap`s are documented panic conditions of
439                // `mark_written`.
440                buffer
441                    .position()
442                    .checked_add(u64::try_from(count).unwrap())
443                    .unwrap(),
444            );
445        } else {
446            let transmit = self
447                .store
448                .as_context_mut()
449                .0
450                .concurrent_state_mut()
451                .get_mut(self.id)?;
452
453            let ReadState::GuestReady {
454                count: read_count, ..
455            } = &transmit.read
456            else {
457                bail_bug!("expected ReadState::GuestReady")
458            };
459
460            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
461                bail_bug!("expected WriteState::HostReady");
462            };
463
464            if *guest_offset + count > *read_count {
465                // Note that this `panic` is a documented panic condition of
466                // `mark_written`.
467                panic!(
468                    "write count ({count}) must be less than or equal to read count ({read_count})"
469                )
470            } else {
471                *guest_offset += count;
472            }
473        }
474        Ok(())
475    }
476}
477
478/// Represents the state of a `Stream{Producer,Consumer}`.
479#[derive(Copy, Clone, Debug)]
480pub enum StreamResult {
481    /// The operation completed normally, and the producer or consumer may be
482    /// able to produce or consume more items, respectively.
483    Completed,
484    /// The operation was interrupted (i.e. it wrapped up early after receiving
485    /// a `finish` parameter value of true in a call to `poll_produce` or
486    /// `poll_consume`), and the producer or consumer may be able to produce or
487    /// consume more items, respectively.
488    Cancelled,
489    /// The operation completed normally, but the producer or consumer will
490    /// _not_ able to produce or consume more items, respectively.
491    Dropped,
492}
493
494/// Represents the host-owned write end of a stream.
495pub trait StreamProducer<D>: Send + 'static {
496    /// The payload type of this stream.
497    type Item;
498
499    /// The `WriteBuffer` type to use when delivering items.
500    type Buffer: WriteBuffer<Self::Item> + Default;
501
502    /// Handle a host- or guest-initiated read by delivering zero or more items
503    /// to the specified destination.
504    ///
505    /// This will be called whenever the reader starts a read.
506    ///
507    /// # Arguments
508    ///
509    /// * `self` - a `Pin`'d version of self to perform Rust-level
510    ///   future-related operations on.
511    /// * `cx` - a Rust-related [`Context`] which is passed to other
512    ///   future-related operations or used to acquire a waker.
513    /// * `store` - the Wasmtime store that this operation is happening within.
514    ///   Used, for example, to consult the state `D` associated with the store.
515    /// * `destination` - the location that items are to be written to.
516    /// * `finish` - a flag indicating whether the host should strive to
517    ///   immediately complete/cancel any pending operation. See below for more
518    ///   details.
519    ///
520    /// # Behavior
521    ///
522    /// If the implementation is able to produce one or more items immediately,
523    /// it should write them to `destination` and return either
524    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
525    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
526    /// any more items.
527    ///
528    /// If the implementation is unable to produce any items immediately, but
529    /// expects to do so later, and `finish` is _false_, it should store the
530    /// waker from `cx` for later and return `Poll::Pending` without writing
531    /// anything to `destination`.  Later, it should alert the waker when either
532    /// the items arrive, the stream has ended, or an error occurs.
533    ///
534    /// If more items are written to `destination` than the reader has immediate
535    /// capacity to accept, they will be retained in memory by the caller and
536    /// used to satisfy future reads, in which case `poll_produce` will only be
537    /// called again once all those items have been delivered.
538    ///
539    /// # Zero-length reads
540    ///
541    /// This function may be called with a zero-length capacity buffer
542    /// (i.e. `Destination::remaining` returns `Some(0)`). This indicates that
543    /// the guest wants to wait to see if an item is ready without actually
544    /// reading the item. For example think of a UNIX `poll` function run on a
545    /// TCP stream, seeing if it's readable without actually reading it.
546    ///
547    /// In this situation the host is allowed to either return immediately or
548    /// wait for readiness. Note that waiting for readiness is not always
549    /// possible. For example it's impossible to test if a Rust-native `Future`
550    /// is ready without actually reading the item. Stream-specific
551    /// optimizations, such as testing if a TCP stream is readable, may be
552    /// possible however.
553    ///
554    /// For a zero-length read, the host is allowed to:
555    ///
556    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
557    ///   anything if it expects to be able to produce items immediately (i.e.
558    ///   without first returning `Poll::Pending`) the next time `poll_produce`
559    ///   is called with non-zero capacity. This is the best-case scenario of
560    ///   fulfilling the guest's desire -- items aren't read/buffered but the
561    ///   host is saying it's ready when the guest is.
562    ///
563    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without actually
564    ///   testing for readiness. The guest doesn't know this yet, but the guest
565    ///   will realize that zero-length reads won't work on this stream when a
566    ///   subsequent nonzero read attempt is made which returns `Poll::Pending`
567    ///   here.
568    ///
569    /// - Return `Poll::Pending` if the host has performed necessary async work
570    ///   to wait for this stream to be readable without actually reading
571    ///   anything. This is also a best-case scenario where the host is letting
572    ///   the guest know that nothing is ready yet. Later the zero-length read
573    ///   will complete and then the guest will attempt a nonzero-length read to
574    ///   actually read some bytes.
575    ///
576    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
577    ///   `Destination::set_buffer` with one more more items. Note, however,
578    ///   that this creates the hazard that the items will never be received by
579    ///   the guest if it decides not to do another non-zero-length read before
580    ///   closing the stream.  Moreover, if `Self::Item` is e.g. a
581    ///   `Resource<_>`, they may end up leaking in that scenario. It is not
582    ///   recommended to do this and it's better to return
583    ///   `StreamResult::Completed` without buffering anything instead.
584    ///
585    /// For more discussion on zero-length reads see the [documentation in the
586    /// component-model repo itself][docs].
587    ///
588    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
589    ///
590    /// # Return
591    ///
592    /// This function can return a number of possible cases from this function:
593    ///
594    /// * `Poll::Pending` - this operation cannot complete at this time. The
595    ///   Rust-level `Future::poll` contract applies here where a waker should
596    ///   be stored from the `cx` argument and be arranged to receive a
597    ///   notification when this implementation can make progress. For example
598    ///   if you call `Future::poll` on a sub-future, that's enough. If items
599    ///   were written to `destination` then a trap in the guest will be raised.
600    ///
601    ///   Note that implementations should strive to avoid this return value
602    ///   when `finish` is `true`. In such a situation the guest is attempting
603    ///   to, for example, cancel a previous operation. By returning
604    ///   `Poll::Pending` the guest will be blocked during the cancellation
605    ///   request. If `finish` is `true` then `StreamResult::Cancelled` is
606    ///   favored to indicate that no items were read. If a short read happened,
607    ///   however, it's ok to return `StreamResult::Completed` indicating some
608    ///   items were read.
609    ///
610    /// * `Poll::Ok(StreamResult::Completed)` - items, if applicable, were
611    ///   written to the `destination`.
612    ///
613    /// * `Poll::Ok(StreamResult::Cancelled)` - used when `finish` is `true` and
614    ///   the implementation was able to successfully cancel any async work that
615    ///   a previous read kicked off, if any. The host should not buffer values
616    ///   received after returning `Cancelled` because the guest will not be
617    ///   aware of these values and the guest could close the stream after
618    ///   cancelling a read. Hosts should only return `Cancelled` when there are
619    ///   no more async operations in flight for a previous read.
620    ///
621    ///   If items were written to `destination` then a trap in the guest will
622    ///   be raised. If `finish` is `false` then this return value will raise a
623    ///   trap in the guest.
624    ///
625    /// * `Poll::Ok(StreamResult::Dropped)` - end-of-stream marker, indicating
626    ///   that this producer should not be polled again. Note that items may
627    ///   still be written to `destination`.
628    ///
629    /// # Errors
630    ///
631    /// The implementation may alternatively choose to return `Err(_)` to
632    /// indicate an unrecoverable error. This will cause the guest (if any) to
633    /// trap and render the component instance (if any) unusable. The
634    /// implementation should report errors that _are_ recoverable by other
635    /// means (e.g. by writing to a `future`) and return
636    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
637    fn poll_produce<'a>(
638        self: Pin<&mut Self>,
639        cx: &mut Context<'_>,
640        store: StoreContextMut<'a, D>,
641        destination: Destination<'a, Self::Item, Self::Buffer>,
642        finish: bool,
643    ) -> Poll<Result<StreamResult>>;
644
645    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
646    /// be downcast to the specified type.
647    ///
648    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
649    /// to the specified type is guaranteed to succeed.
650    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
651        Err(me)
652    }
653}
654
655impl<T, D> StreamProducer<D> for iter::Empty<T>
656where
657    T: Send + Sync + 'static,
658{
659    type Item = T;
660    type Buffer = Option<Self::Item>;
661
662    fn poll_produce<'a>(
663        self: Pin<&mut Self>,
664        _: &mut Context<'_>,
665        _: StoreContextMut<'a, D>,
666        _: Destination<'a, Self::Item, Self::Buffer>,
667        _: bool,
668    ) -> Poll<Result<StreamResult>> {
669        Poll::Ready(Ok(StreamResult::Dropped))
670    }
671}
672
673impl<T, D> StreamProducer<D> for stream::Empty<T>
674where
675    T: Send + Sync + 'static,
676{
677    type Item = T;
678    type Buffer = Option<Self::Item>;
679
680    fn poll_produce<'a>(
681        self: Pin<&mut Self>,
682        _: &mut Context<'_>,
683        _: StoreContextMut<'a, D>,
684        _: Destination<'a, Self::Item, Self::Buffer>,
685        _: bool,
686    ) -> Poll<Result<StreamResult>> {
687        Poll::Ready(Ok(StreamResult::Dropped))
688    }
689}
690
691impl<T, D> StreamProducer<D> for Vec<T>
692where
693    T: Unpin + Send + Sync + 'static,
694{
695    type Item = T;
696    type Buffer = VecBuffer<T>;
697
698    fn poll_produce<'a>(
699        self: Pin<&mut Self>,
700        _: &mut Context<'_>,
701        _: StoreContextMut<'a, D>,
702        mut dst: Destination<'a, Self::Item, Self::Buffer>,
703        _: bool,
704    ) -> Poll<Result<StreamResult>> {
705        dst.set_buffer(mem::take(self.get_mut()).into());
706        Poll::Ready(Ok(StreamResult::Dropped))
707    }
708}
709
710impl<T, D> StreamProducer<D> for Box<[T]>
711where
712    T: Unpin + Send + Sync + 'static,
713{
714    type Item = T;
715    type Buffer = VecBuffer<T>;
716
717    fn poll_produce<'a>(
718        self: Pin<&mut Self>,
719        _: &mut Context<'_>,
720        _: StoreContextMut<'a, D>,
721        mut dst: Destination<'a, Self::Item, Self::Buffer>,
722        _: bool,
723    ) -> Poll<Result<StreamResult>> {
724        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
725        Poll::Ready(Ok(StreamResult::Dropped))
726    }
727}
728
729#[cfg(feature = "component-model-async-bytes")]
730impl<D> StreamProducer<D> for bytes::Bytes {
731    type Item = u8;
732    type Buffer = Cursor<Self>;
733
734    fn poll_produce<'a>(
735        mut self: Pin<&mut Self>,
736        _: &mut Context<'_>,
737        mut store: StoreContextMut<'a, D>,
738        mut dst: Destination<'a, Self::Item, Self::Buffer>,
739        _: bool,
740    ) -> Poll<Result<StreamResult>> {
741        let cap = dst.remaining(&mut store);
742        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
743            // on 0-length or host reads, buffer the bytes
744            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
745            return Poll::Ready(Ok(StreamResult::Dropped));
746        };
747        let cap = cap.into();
748        // data does not fit in destination, fill it and buffer the rest
749        dst.set_buffer(Cursor::new(self.split_off(cap)));
750        let mut dst = dst.as_direct(store, cap);
751        dst.remaining().copy_from_slice(&self);
752        dst.mark_written(cap);
753        Poll::Ready(Ok(StreamResult::Dropped))
754    }
755}
756
757#[cfg(feature = "component-model-async-bytes")]
758impl<D> StreamProducer<D> for bytes::BytesMut {
759    type Item = u8;
760    type Buffer = Cursor<Self>;
761
762    fn poll_produce<'a>(
763        mut self: Pin<&mut Self>,
764        _: &mut Context<'_>,
765        mut store: StoreContextMut<'a, D>,
766        mut dst: Destination<'a, Self::Item, Self::Buffer>,
767        _: bool,
768    ) -> Poll<Result<StreamResult>> {
769        let cap = dst.remaining(&mut store);
770        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
771            // on 0-length or host reads, buffer the bytes
772            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
773            return Poll::Ready(Ok(StreamResult::Dropped));
774        };
775        let cap = cap.into();
776        // data does not fit in destination, fill it and buffer the rest
777        dst.set_buffer(Cursor::new(self.split_off(cap)));
778        let mut dst = dst.as_direct(store, cap);
779        dst.remaining().copy_from_slice(&self);
780        dst.mark_written(cap);
781        Poll::Ready(Ok(StreamResult::Dropped))
782    }
783}
784
785/// Represents the buffer for a host- or guest-initiated stream write.
786pub struct Source<'a, T> {
787    id: TableId<TransmitState>,
788    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
789}
790
791impl<'a, T> Source<'a, T> {
792    /// Reborrow `self` so it can be used again later.
793    pub fn reborrow(&mut self) -> Source<'_, T> {
794        Source {
795            id: self.id,
796            host_buffer: self.host_buffer.as_deref_mut(),
797        }
798    }
799
800    /// Accept zero or more items from the writer.
801    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
802    where
803        T: func::Lift + 'static,
804        B: ReadBuffer<T>,
805    {
806        if let Some(input) = &mut self.host_buffer {
807            let count = input.remaining().len().min(buffer.remaining_capacity());
808            buffer.move_from(*input, count);
809        } else {
810            let store = store.as_context_mut();
811            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
812
813            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
814                bail_bug!("expected ReadState::HostReady");
815            };
816
817            let &WriteState::GuestReady {
818                ty,
819                address,
820                count,
821                options,
822                instance,
823                ..
824            } = &transmit.write
825            else {
826                bail_bug!("expected WriteState::HostReady");
827            };
828
829            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
830            let ty = payload(ty, cx.types);
831            let old_remaining = buffer.remaining_capacity();
832            lift::<T, B>(
833                cx,
834                ty,
835                buffer,
836                address + (T::SIZE32 * guest_offset),
837                count - guest_offset,
838            )?;
839
840            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
841
842            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
843                bail_bug!("expected ReadState::HostReady");
844            };
845
846            *guest_offset += old_remaining - buffer.remaining_capacity();
847        }
848
849        Ok(())
850    }
851
852    /// Return the number of items remaining to be read from the current write
853    /// operation.
854    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
855    where
856        T: 'static,
857    {
858        // Note that this unwrap should only trigger for bugs in Wasmtime, and
859        // this is modeled here to centralize the `.unwrap()` for this method in
860        // one location.
861        self.remaining_(store.as_context_mut().0).unwrap()
862    }
863
864    fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
865    where
866        T: 'static,
867    {
868        let transmit = store.concurrent_state_mut().get_mut(self.id)?;
869
870        if let &WriteState::GuestReady { count, .. } = &transmit.write {
871            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
872                bail_bug!("expected ReadState::HostReady")
873            };
874
875            Ok(count - guest_offset)
876        } else if let Some(host_buffer) = &self.host_buffer {
877            Ok(host_buffer.remaining().len())
878        } else {
879            bail_bug!("expected either WriteState::GuestReady or host buffer")
880        }
881    }
882}
883
884impl<'a> Source<'a, u8> {
885    /// Return a `DirectSource` view of `self`.
886    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
887        DirectSource {
888            id: self.id,
889            host_buffer: self.host_buffer,
890            store,
891        }
892    }
893}
894
895/// Represents a write to a `stream<u8>`, providing direct access to the
896/// writer's buffer.
897pub struct DirectSource<'a, D: 'static> {
898    id: TableId<TransmitState>,
899    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
900    store: StoreContextMut<'a, D>,
901}
902
903impl<D: 'static> std::io::Read for DirectSource<'_, D> {
904    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
905        let rem = self.remaining();
906        let n = rem.len().min(buf.len());
907        buf[..n].copy_from_slice(&rem[..n]);
908        self.mark_read(n);
909        Ok(n)
910    }
911}
912
913impl<D: 'static> DirectSource<'_, D> {
914    /// Provide direct access to the writer's buffer.
915    pub fn remaining(&mut self) -> &[u8] {
916        // Note that this unwrap should only trigger for bugs in Wasmtime, and
917        // this is modeled here to centralize the `.unwrap()` for this method in
918        // one location.
919        self.remaining_().unwrap()
920    }
921
922    fn remaining_(&mut self) -> Result<&[u8]> {
923        if let Some(buffer) = self.host_buffer.as_deref_mut() {
924            return Ok(buffer.remaining());
925        }
926        let transmit = self
927            .store
928            .as_context_mut()
929            .0
930            .concurrent_state_mut()
931            .get_mut(self.id)?;
932
933        let &WriteState::GuestReady {
934            address,
935            count,
936            options,
937            instance,
938            ..
939        } = &transmit.write
940        else {
941            bail_bug!("expected WriteState::GuestReady")
942        };
943
944        let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
945            bail_bug!("expected ReadState::HostReady")
946        };
947
948        let memory = instance
949            .options_memory(self.store.0, options)
950            .get((address + guest_offset)..)
951            .and_then(|b| b.get(..(count - guest_offset)));
952        match memory {
953            Some(memory) => Ok(memory),
954            None => bail_bug!("guest buffer unexpectedly out of bounds"),
955        }
956    }
957
958    /// Mark the specified number of bytes as read from the writer's buffer.
959    ///
960    /// # Panics
961    ///
962    /// This will panic if the count is larger than the size of the buffer
963    /// returned by `Self::remaining`.
964    pub fn mark_read(&mut self, count: usize) {
965        // Note that this unwrap should only trigger for bugs in Wasmtime, and
966        // this is modeled here to centralize the `.unwrap()` for this method in
967        // one location.
968        self.mark_read_(count).unwrap()
969    }
970
971    fn mark_read_(&mut self, count: usize) -> Result<()> {
972        if let Some(buffer) = self.host_buffer.as_deref_mut() {
973            buffer.skip(count);
974            return Ok(());
975        }
976
977        let transmit = self
978            .store
979            .as_context_mut()
980            .0
981            .concurrent_state_mut()
982            .get_mut(self.id)?;
983
984        let WriteState::GuestReady {
985            count: write_count, ..
986        } = &transmit.write
987        else {
988            bail_bug!("expected WriteState::GuestReady");
989        };
990
991        let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
992            bail_bug!("expected ReadState::HostReady");
993        };
994
995        if *guest_offset + count > *write_count {
996            // Note that this is a documented panic condition of `mark_read`.
997            panic!("read count ({count}) must be less than or equal to write count ({write_count})")
998        } else {
999            *guest_offset += count;
1000        }
1001        Ok(())
1002    }
1003}
1004
1005/// Represents the host-owned read end of a stream.
1006pub trait StreamConsumer<D>: Send + 'static {
1007    /// The payload type of this stream.
1008    type Item;
1009
1010    /// Handle a host- or guest-initiated write by accepting zero or more items
1011    /// from the specified source.
1012    ///
1013    /// This will be called whenever the writer starts a write.
1014    ///
1015    /// If the implementation is able to consume one or more items immediately,
1016    /// it should take them from `source` and return either
1017    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
1018    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
1019    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
1020    /// indicate that the caller should delay sending a `COMPLETED` event to the
1021    /// writer until a later call to this function returns `Poll::Ready(_)`.
1022    /// For more about that, see the `Backpressure` section below.
1023    ///
1024    /// If the implementation cannot consume any items immediately and `finish`
1025    /// is _false_, it should store the waker from `cx` for later and return
1026    /// `Poll::Pending` without writing anything to `destination`.  Later, it
1027    /// should alert the waker when either (1) the items arrive, (2) the stream
1028    /// has ended, or (3) an error occurs.
1029    ///
1030    /// If the implementation cannot consume any items immediately and `finish`
1031    /// is _true_, it should, if possible, return
1032    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
1033    /// anything from `source`.  However, that might not be possible if an
1034    /// earlier call to `poll_consume` kicked off an asynchronous operation
1035    /// which needs to be completed (and possibly interrupted) gracefully, in
1036    /// which case the implementation may return `Poll::Pending` and later alert
1037    /// the waker as described above.  In other words, when `finish` is true,
1038    /// the implementation should prioritize returning a result to the reader
1039    /// (even if no items can be consumed) rather than wait indefinitely for at
1040    /// capacity to free up.
1041    ///
1042    /// In all of the above cases, the implementation may alternatively choose
1043    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
1044    /// the guest (if any) to trap and render the component instance (if any)
1045    /// unusable.  The implementation should report errors that _are_
1046    /// recoverable by other means (e.g. by writing to a `future`) and return
1047    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
1048    ///
1049    /// Note that the implementation should only return
1050    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
1051    /// items from `source` if called with `finish` set to true.  If it does so
1052    /// when `finish` is false, the caller will trap.  Additionally, it should
1053    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
1054    /// least one item from `source` if there is an item available; otherwise,
1055    /// the caller will trap.  If `poll_consume` is called with no items in
1056    /// `source`, it should only return `Poll::Ready(_)` once it is able to
1057    /// accept at least one item during the next call to `poll_consume`.
1058    ///
1059    /// Note that any items which the implementation of this trait takes from
1060    /// `source` become the responsibility of that implementation.  For that
1061    /// reason, an implementation which forwards items to an upstream sink
1062    /// should reserve capacity in that sink before taking items out of
1063    /// `source`, if possible.  Alternatively, it might buffer items which can't
1064    /// be forwarded immediately and send them once capacity is freed up.
1065    ///
1066    /// ## Backpressure
1067    ///
1068    /// As mentioned above, an implementation might choose to return
1069    /// `Poll::Pending` after taking items from `source`, which tells the caller
1070    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
1071    /// a form of backpressure when the items are forwarded to an upstream sink
1072    /// asynchronously.  Note, however, that it's not possible to "put back"
1073    /// items into `source` once they've been taken out, so if the upstream sink
1074    /// is unable to accept all the items, that cannot be communicated to the
1075    /// writer at this level of abstraction.  Just as with application-specific,
1076    /// recoverable errors, information about which items could be forwarded and
1077    /// which could not must be communicated out-of-band, e.g. by writing to an
1078    /// application-specific `future`.
1079    ///
1080    /// Similarly, if the writer cancels the write after items have been taken
1081    /// from `source` but before the items have all been forwarded to an
1082    /// upstream sink, `poll_consume` will be called with `finish` set to true,
1083    /// and the implementation may either:
1084    ///
1085    /// - Interrupt the forwarding process gracefully.  This may be preferable
1086    /// if there is an out-of-band channel for communicating to the writer how
1087    /// many items were forwarded before being interrupted.
1088    ///
1089    /// - Allow the forwarding to complete without interrupting it.  This is
1090    /// usually preferable if there's no out-of-band channel for reporting back
1091    /// to the writer how many items were forwarded.
1092    fn poll_consume(
1093        self: Pin<&mut Self>,
1094        cx: &mut Context<'_>,
1095        store: StoreContextMut<D>,
1096        source: Source<'_, Self::Item>,
1097        finish: bool,
1098    ) -> Poll<Result<StreamResult>>;
1099}
1100
1101/// Represents a host-owned write end of a future.
1102pub trait FutureProducer<D>: Send + 'static {
1103    /// The payload type of this future.
1104    type Item;
1105
1106    /// Handle a host- or guest-initiated read by producing a value.
1107    ///
1108    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1109    /// simplified interface for futures.
1110    ///
1111    /// If `finish` is true, the implementation may return
1112    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
1113    /// could produce a value.  Otherwise, it must either return
1114    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
1115    fn poll_produce(
1116        self: Pin<&mut Self>,
1117        cx: &mut Context<'_>,
1118        store: StoreContextMut<D>,
1119        finish: bool,
1120    ) -> Poll<Result<Option<Self::Item>>>;
1121}
1122
1123impl<T, E, D, Fut> FutureProducer<D> for Fut
1124where
1125    E: Into<Error>,
1126    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1127{
1128    type Item = T;
1129
1130    fn poll_produce<'a>(
1131        self: Pin<&mut Self>,
1132        cx: &mut Context<'_>,
1133        _: StoreContextMut<'a, D>,
1134        finish: bool,
1135    ) -> Poll<Result<Option<T>>> {
1136        match self.poll(cx) {
1137            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1138            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1139            Poll::Pending if finish => Poll::Ready(Ok(None)),
1140            Poll::Pending => Poll::Pending,
1141        }
1142    }
1143}
1144
1145/// Represents a host-owned read end of a future.
1146pub trait FutureConsumer<D>: Send + 'static {
1147    /// The payload type of this future.
1148    type Item;
1149
1150    /// Handle a host- or guest-initiated write by consuming a value.
1151    ///
1152    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1153    /// simplified interface for futures.
1154    ///
1155    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
1156    /// without taking the item from `source`, which indicates the operation was
1157    /// canceled before it could consume the value.  Otherwise, it must either
1158    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
1159    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
1160    /// the item).
1161    fn poll_consume(
1162        self: Pin<&mut Self>,
1163        cx: &mut Context<'_>,
1164        store: StoreContextMut<D>,
1165        source: Source<'_, Self::Item>,
1166        finish: bool,
1167    ) -> Poll<Result<()>>;
1168}
1169
1170/// Represents the readable end of a Component Model `future`.
1171///
1172/// Note that `FutureReader` instances must be disposed of using either `pipe`
1173/// or `close`; otherwise the in-store representation will leak and the writer
1174/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1175/// ensure that disposal happens automatically.
1176pub struct FutureReader<T> {
1177    id: TableId<TransmitHandle>,
1178    _phantom: PhantomData<T>,
1179}
1180
1181impl<T> FutureReader<T> {
1182    /// Create a new future with the specified producer.
1183    ///
1184    /// # Errors
1185    ///
1186    /// Returns an error if the resource table for this store is full or if
1187    /// [`Config::concurrency_support`] is not enabled.
1188    ///
1189    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1190    pub fn new<S: AsContextMut>(
1191        mut store: S,
1192        producer: impl FutureProducer<S::Data, Item = T>,
1193    ) -> Result<Self>
1194    where
1195        T: func::Lower + func::Lift + Send + Sync + 'static,
1196    {
1197        ensure!(
1198            store.as_context().0.concurrency_support(),
1199            "concurrency support is not enabled"
1200        );
1201
1202        struct Producer<P>(P);
1203
1204        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1205            for Producer<P>
1206        {
1207            type Item = P::Item;
1208            type Buffer = Option<P::Item>;
1209
1210            fn poll_produce<'a>(
1211                self: Pin<&mut Self>,
1212                cx: &mut Context<'_>,
1213                store: StoreContextMut<D>,
1214                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1215                finish: bool,
1216            ) -> Poll<Result<StreamResult>> {
1217                // SAFETY: This is a standard pin-projection, and we never move
1218                // out of `self`.
1219                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1220
1221                Poll::Ready(Ok(
1222                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1223                        destination.set_buffer(Some(value));
1224
1225                        // Here we return `StreamResult::Completed` even though
1226                        // we've produced the last item we'll ever produce.
1227                        // That's because the ABI expects
1228                        // `ReturnCode::Completed(1)` rather than
1229                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1230                        // called again since the future will have resolved.
1231                        StreamResult::Completed
1232                    } else {
1233                        StreamResult::Cancelled
1234                    },
1235                ))
1236            }
1237        }
1238
1239        Ok(Self::new_(
1240            store
1241                .as_context_mut()
1242                .new_transmit(TransmitKind::Future, Producer(producer))?,
1243        ))
1244    }
1245
1246    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1247        Self {
1248            id,
1249            _phantom: PhantomData,
1250        }
1251    }
1252
1253    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1254        self.id
1255    }
1256
1257    /// Set the consumer that accepts the result of this future.
1258    ///
1259    /// # Errors
1260    ///
1261    /// Returns an error if this future has already been closed.
1262    ///
1263    /// # Panics
1264    ///
1265    /// Panics if this future does not belong to `store`.
1266    pub fn pipe<S: AsContextMut>(
1267        self,
1268        mut store: S,
1269        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1270    ) -> Result<()>
1271    where
1272        T: func::Lift + 'static,
1273    {
1274        struct Consumer<C>(C);
1275
1276        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1277            for Consumer<C>
1278        {
1279            type Item = T;
1280
1281            fn poll_consume(
1282                self: Pin<&mut Self>,
1283                cx: &mut Context<'_>,
1284                mut store: StoreContextMut<D>,
1285                mut source: Source<Self::Item>,
1286                finish: bool,
1287            ) -> Poll<Result<StreamResult>> {
1288                // SAFETY: This is a standard pin-projection, and we never move
1289                // out of `self`.
1290                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1291
1292                ready!(consumer.poll_consume(
1293                    cx,
1294                    store.as_context_mut(),
1295                    source.reborrow(),
1296                    finish
1297                ))?;
1298
1299                Poll::Ready(Ok(if source.remaining(store) == 0 {
1300                    // Here we return `StreamResult::Completed` even though
1301                    // we've consumed the last item we'll ever consume.  That's
1302                    // because the ABI expects `ReturnCode::Completed(1)` rather
1303                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1304                    // called again since the future will have resolved.
1305                    StreamResult::Completed
1306                } else {
1307                    StreamResult::Cancelled
1308                }))
1309            }
1310        }
1311
1312        store
1313            .as_context_mut()
1314            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
1315    }
1316
1317    /// Transfer ownership of the read end of a future from a guest to the host.
1318    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1319        let id = lift_index_to_future(cx, ty, index)?;
1320        Ok(Self::new_(id))
1321    }
1322
1323    /// Close this `FutureReader`.
1324    ///
1325    /// This will close this half of the future which will signal to a pending
1326    /// write, if any, that the reader side is dropped. If the writer half has
1327    /// not yet written a value then when it attempts to write a value it will
1328    /// see that this end is closed.
1329    ///
1330    /// # Errors
1331    ///
1332    /// Returns an error if this future has already been closed.
1333    ///
1334    /// # Panics
1335    ///
1336    /// Panics if the store that the [`Accessor`] is derived from does not own
1337    /// this future.
1338    ///
1339    /// [`Accessor`]: crate::component::Accessor
1340    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1341        future_close(store.as_context_mut().0, &mut self.id)
1342    }
1343
1344    /// Convenience method around [`Self::close`].
1345    pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1346        accessor.as_accessor().with(|access| self.close(access))
1347    }
1348
1349    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1350    /// drop and clean it up from the store.
1351    ///
1352    /// Note that the `accessor` provided must own this future and is
1353    /// additionally transferred to the `GuardedFutureReader` return value.
1354    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1355    where
1356        A: AsAccessor,
1357    {
1358        GuardedFutureReader::new(accessor, self)
1359    }
1360
1361    /// Attempts to convert this [`FutureReader<T>`] to a [`FutureAny`].
1362    ///
1363    /// # Errors
1364    ///
1365    /// This function will return an error if `self` does not belong to
1366    /// `store`.
1367    pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1368    where
1369        T: ComponentType + 'static,
1370    {
1371        FutureAny::try_from_future_reader(store, self)
1372    }
1373
1374    /// Attempts to convert a [`FutureAny`] into a [`FutureReader<T>`].
1375    ///
1376    /// # Errors
1377    ///
1378    /// This function will fail if `T` doesn't match the type of the value that
1379    /// `future` is sending.
1380    pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1381    where
1382        T: ComponentType + 'static,
1383    {
1384        future.try_into_future_reader()
1385    }
1386}
1387
1388impl<T> fmt::Debug for FutureReader<T> {
1389    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1390        f.debug_struct("FutureReader")
1391            .field("id", &self.id)
1392            .finish()
1393    }
1394}
1395
1396pub(super) fn future_close(
1397    store: &mut StoreOpaque,
1398    id: &mut TableId<TransmitHandle>,
1399) -> Result<()> {
1400    let id = mem::replace(id, TableId::new(u32::MAX));
1401    store.host_drop_reader(id, TransmitKind::Future)
1402}
1403
1404/// Transfer ownership of the read end of a future from the host to a guest.
1405pub(super) fn lift_index_to_future(
1406    cx: &mut LiftContext<'_>,
1407    ty: InterfaceType,
1408    index: u32,
1409) -> Result<TableId<TransmitHandle>> {
1410    match ty {
1411        InterfaceType::Future(src) => {
1412            let handle_table = cx
1413                .instance_mut()
1414                .table_for_transmit(TransmitIndex::Future(src));
1415            let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1416            if is_done {
1417                bail!("cannot lift future after being notified that the writable end dropped");
1418            }
1419            let id = TableId::<TransmitHandle>::new(rep);
1420            let concurrent_state = cx.concurrent_state_mut();
1421            let future = concurrent_state.get_mut(id)?;
1422            future.common.handle = None;
1423            let state = future.state;
1424
1425            if concurrent_state.get_mut(state)?.done {
1426                bail!("cannot lift future after previous read succeeded");
1427            }
1428
1429            Ok(id)
1430        }
1431        _ => func::bad_type_info(),
1432    }
1433}
1434
1435/// Transfer ownership of the read end of a future from the host to a guest.
1436pub(super) fn lower_future_to_index<U>(
1437    id: TableId<TransmitHandle>,
1438    cx: &mut LowerContext<'_, U>,
1439    ty: InterfaceType,
1440) -> Result<u32> {
1441    match ty {
1442        InterfaceType::Future(dst) => {
1443            let concurrent_state = cx.store.0.concurrent_state_mut();
1444            let state = concurrent_state.get_mut(id)?.state;
1445            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1446
1447            let handle = cx
1448                .instance_mut()
1449                .table_for_transmit(TransmitIndex::Future(dst))
1450                .future_insert_read(dst, rep)?;
1451
1452            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1453
1454            Ok(handle)
1455        }
1456        _ => func::bad_type_info(),
1457    }
1458}
1459
1460// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1461// safe and correct since we lift and lower future handles as `u32`s.
1462unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1463    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1464
1465    type Lower = <u32 as func::ComponentType>::Lower;
1466
1467    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1468        match ty {
1469            InterfaceType::Future(ty) => {
1470                let ty = types.types[*ty].ty;
1471                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1472            }
1473            other => bail!("expected `future`, found `{}`", func::desc(other)),
1474        }
1475    }
1476}
1477
1478// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1479unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1480    fn linear_lower_to_flat<U>(
1481        &self,
1482        cx: &mut LowerContext<'_, U>,
1483        ty: InterfaceType,
1484        dst: &mut MaybeUninit<Self::Lower>,
1485    ) -> Result<()> {
1486        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1487    }
1488
1489    fn linear_lower_to_memory<U>(
1490        &self,
1491        cx: &mut LowerContext<'_, U>,
1492        ty: InterfaceType,
1493        offset: usize,
1494    ) -> Result<()> {
1495        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1496            cx,
1497            InterfaceType::U32,
1498            offset,
1499        )
1500    }
1501}
1502
1503// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1504unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1505    fn linear_lift_from_flat(
1506        cx: &mut LiftContext<'_>,
1507        ty: InterfaceType,
1508        src: &Self::Lower,
1509    ) -> Result<Self> {
1510        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1511        Self::lift_from_index(cx, ty, index)
1512    }
1513
1514    fn linear_lift_from_memory(
1515        cx: &mut LiftContext<'_>,
1516        ty: InterfaceType,
1517        bytes: &[u8],
1518    ) -> Result<Self> {
1519        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1520        Self::lift_from_index(cx, ty, index)
1521    }
1522}
1523
1524/// A [`FutureReader`] paired with an [`Accessor`].
1525///
1526/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1527/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1528/// [`FutureReader::guard`].
1529///
1530/// [`Accessor`]: crate::component::Accessor
1531pub struct GuardedFutureReader<T, A>
1532where
1533    A: AsAccessor,
1534{
1535    // This field is `None` to implement the conversion from this guard back to
1536    // `FutureReader`. When `None` is seen in the destructor it will cause the
1537    // destructor to do nothing.
1538    reader: Option<FutureReader<T>>,
1539    accessor: A,
1540}
1541
1542impl<T, A> GuardedFutureReader<T, A>
1543where
1544    A: AsAccessor,
1545{
1546    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1547    ///
1548    /// # Panics
1549    ///
1550    /// Panics if [`Config::concurrency_support`] is not enabled.
1551    ///
1552    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1553    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1554        assert!(
1555            accessor
1556                .as_accessor()
1557                .with(|a| a.as_context().0.concurrency_support())
1558        );
1559        Self {
1560            reader: Some(reader),
1561            accessor,
1562        }
1563    }
1564
1565    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1566    /// back.
1567    pub fn into_future(self) -> FutureReader<T> {
1568        self.into()
1569    }
1570}
1571
1572impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1573where
1574    A: AsAccessor,
1575{
1576    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1577        guard.reader.take().unwrap()
1578    }
1579}
1580
1581impl<T, A> Drop for GuardedFutureReader<T, A>
1582where
1583    A: AsAccessor,
1584{
1585    fn drop(&mut self) {
1586        if let Some(reader) = &mut self.reader {
1587            // Currently this can only fail if the future is closed twice, which
1588            // this guard prevents, so this error shouldn't happen.
1589            let result = reader.close_with(&self.accessor);
1590            debug_assert!(result.is_ok());
1591        }
1592    }
1593}
1594
1595/// Represents the readable end of a Component Model `stream`.
1596///
1597/// Note that `StreamReader` instances must be disposed of using `close`;
1598/// otherwise the in-store representation will leak and the writer end will hang
1599/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1600/// disposal happens automatically.
1601pub struct StreamReader<T> {
1602    id: TableId<TransmitHandle>,
1603    _phantom: PhantomData<T>,
1604}
1605
1606impl<T> StreamReader<T> {
1607    /// Create a new stream with the specified producer.
1608    ///
1609    /// # Errors
1610    ///
1611    /// Returns an error if the resource table for this store is full or if
1612    /// [`Config::concurrency_support`] is not enabled.
1613    ///
1614    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1615    pub fn new<S: AsContextMut>(
1616        mut store: S,
1617        producer: impl StreamProducer<S::Data, Item = T>,
1618    ) -> Result<Self>
1619    where
1620        T: func::Lower + func::Lift + Send + Sync + 'static,
1621    {
1622        ensure!(
1623            store.as_context().0.concurrency_support(),
1624            "concurrency support is not enabled",
1625        );
1626        Ok(Self::new_(
1627            store
1628                .as_context_mut()
1629                .new_transmit(TransmitKind::Stream, producer)?,
1630        ))
1631    }
1632
1633    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1634        Self {
1635            id,
1636            _phantom: PhantomData,
1637        }
1638    }
1639
1640    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1641        self.id
1642    }
1643
1644    /// Attempt to consume this object by converting it into the specified type.
1645    ///
1646    /// This can be useful for "short-circuiting" host-to-host streams,
1647    /// bypassing the guest entirely.  For example, if a guest task returns a
1648    /// host-created stream and then exits, this function may be used to
1649    /// retrieve the write end, after which the guest instance and store may be
1650    /// disposed of if no longer needed.
1651    ///
1652    /// This will return `Ok(_)` if and only if the following conditions are
1653    /// met:
1654    ///
1655    /// - The stream was created by the host (i.e. not by the guest).
1656    ///
1657    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1658    /// producer provided to `StreamReader::new` when the stream was created,
1659    /// along with `TypeId::of::<V>()`.
1660    ///
1661    /// # Panics
1662    ///
1663    /// Panics if this stream has already been closed, or if this stream doesn't
1664    /// belong to the specified `store`.
1665    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1666        let store = store.as_context_mut();
1667        let state = store.0.concurrent_state_mut();
1668        let id = state.get_mut(self.id).unwrap().state;
1669        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1670            match try_into(TypeId::of::<V>()) {
1671                Some(result) => {
1672                    self.close(store).unwrap();
1673                    Ok(*result.downcast::<V>().unwrap())
1674                }
1675                None => Err(self),
1676            }
1677        } else {
1678            Err(self)
1679        }
1680    }
1681
1682    /// Set the consumer that accepts the items delivered to this stream.
1683    ///
1684    /// # Errors
1685    ///
1686    /// Returns an error if this stream has already been closed.
1687    ///
1688    /// # Panics
1689    ///
1690    /// Panics if this stream does not belong to `store`.
1691    pub fn pipe<S: AsContextMut>(
1692        self,
1693        mut store: S,
1694        consumer: impl StreamConsumer<S::Data, Item = T>,
1695    ) -> Result<()>
1696    where
1697        T: 'static,
1698    {
1699        store
1700            .as_context_mut()
1701            .set_consumer(self.id, TransmitKind::Stream, consumer)
1702    }
1703
1704    /// Transfer ownership of the read end of a stream from a guest to the host.
1705    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1706        let id = lift_index_to_stream(cx, ty, index)?;
1707        Ok(Self::new_(id))
1708    }
1709
1710    /// Close this `StreamReader`.
1711    ///
1712    /// This will signal that this portion of the stream is closed causing all
1713    /// future writes to return immediately with "DROPPED".
1714    ///
1715    /// # Errors
1716    ///
1717    /// Returns an error if this stream has already been closed.
1718    ///
1719    /// # Panics
1720    ///
1721    /// Panics if the store that the [`Accessor`] is derived from does not own
1722    /// this stream.
1723    ///
1724    /// [`Accessor`]: crate::component::Accessor
1725    pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
1726        stream_close(store.as_context_mut().0, &mut self.id)
1727    }
1728
1729    /// Convenience method around [`Self::close`].
1730    pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
1731        accessor.as_accessor().with(|access| self.close(access))
1732    }
1733
1734    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1735    /// drop and clean it up from the store.
1736    ///
1737    /// Note that the `accessor` provided must own this future and is
1738    /// additionally transferred to the `GuardedStreamReader` return value.
1739    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1740    where
1741        A: AsAccessor,
1742    {
1743        GuardedStreamReader::new(accessor, self)
1744    }
1745
1746    /// Attempts to convert this [`StreamReader<T>`] to a [`StreamAny`].
1747    ///
1748    /// # Errors
1749    ///
1750    /// This function will return an error if `self` does not belong to
1751    /// `store`.
1752    pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1753    where
1754        T: ComponentType + 'static,
1755    {
1756        StreamAny::try_from_stream_reader(store, self)
1757    }
1758
1759    /// Attempts to convert a [`StreamAny`] into a [`StreamReader<T>`].
1760    ///
1761    /// # Errors
1762    ///
1763    /// This function will fail if `T` doesn't match the type of the value that
1764    /// `stream` is sending.
1765    pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1766    where
1767        T: ComponentType + 'static,
1768    {
1769        stream.try_into_stream_reader()
1770    }
1771}
1772
1773impl<T> fmt::Debug for StreamReader<T> {
1774    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1775        f.debug_struct("StreamReader")
1776            .field("id", &self.id)
1777            .finish()
1778    }
1779}
1780
1781pub(super) fn stream_close(
1782    store: &mut StoreOpaque,
1783    id: &mut TableId<TransmitHandle>,
1784) -> Result<()> {
1785    let id = mem::replace(id, TableId::new(u32::MAX));
1786    store.host_drop_reader(id, TransmitKind::Stream)
1787}
1788
1789/// Transfer ownership of the read end of a stream from a guest to the host.
1790pub(super) fn lift_index_to_stream(
1791    cx: &mut LiftContext<'_>,
1792    ty: InterfaceType,
1793    index: u32,
1794) -> Result<TableId<TransmitHandle>> {
1795    match ty {
1796        InterfaceType::Stream(src) => {
1797            let handle_table = cx
1798                .instance_mut()
1799                .table_for_transmit(TransmitIndex::Stream(src));
1800            let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1801            if is_done {
1802                bail!("cannot lift stream after being notified that the writable end dropped");
1803            }
1804            let id = TableId::<TransmitHandle>::new(rep);
1805            cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1806            Ok(id)
1807        }
1808        _ => func::bad_type_info(),
1809    }
1810}
1811
1812/// Transfer ownership of the read end of a stream from the host to a guest.
1813pub(super) fn lower_stream_to_index<U>(
1814    id: TableId<TransmitHandle>,
1815    cx: &mut LowerContext<'_, U>,
1816    ty: InterfaceType,
1817) -> Result<u32> {
1818    match ty {
1819        InterfaceType::Stream(dst) => {
1820            let concurrent_state = cx.store.0.concurrent_state_mut();
1821            let state = concurrent_state.get_mut(id)?.state;
1822            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1823
1824            let handle = cx
1825                .instance_mut()
1826                .table_for_transmit(TransmitIndex::Stream(dst))
1827                .stream_insert_read(dst, rep)?;
1828
1829            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1830
1831            Ok(handle)
1832        }
1833        _ => func::bad_type_info(),
1834    }
1835}
1836
1837// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1838// safe and correct since we lift and lower stream handles as `u32`s.
1839unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1840    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1841
1842    type Lower = <u32 as func::ComponentType>::Lower;
1843
1844    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1845        match ty {
1846            InterfaceType::Stream(ty) => {
1847                let ty = types.types[*ty].ty;
1848                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1849            }
1850            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1851        }
1852    }
1853}
1854
1855// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1856unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1857    fn linear_lower_to_flat<U>(
1858        &self,
1859        cx: &mut LowerContext<'_, U>,
1860        ty: InterfaceType,
1861        dst: &mut MaybeUninit<Self::Lower>,
1862    ) -> Result<()> {
1863        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1864    }
1865
1866    fn linear_lower_to_memory<U>(
1867        &self,
1868        cx: &mut LowerContext<'_, U>,
1869        ty: InterfaceType,
1870        offset: usize,
1871    ) -> Result<()> {
1872        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1873            cx,
1874            InterfaceType::U32,
1875            offset,
1876        )
1877    }
1878}
1879
1880// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1881unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1882    fn linear_lift_from_flat(
1883        cx: &mut LiftContext<'_>,
1884        ty: InterfaceType,
1885        src: &Self::Lower,
1886    ) -> Result<Self> {
1887        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1888        Self::lift_from_index(cx, ty, index)
1889    }
1890
1891    fn linear_lift_from_memory(
1892        cx: &mut LiftContext<'_>,
1893        ty: InterfaceType,
1894        bytes: &[u8],
1895    ) -> Result<Self> {
1896        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1897        Self::lift_from_index(cx, ty, index)
1898    }
1899}
1900
1901/// A [`StreamReader`] paired with an [`Accessor`].
1902///
1903/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1904/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1905/// [`StreamReader::guard`].
1906///
1907/// [`Accessor`]: crate::component::Accessor
1908pub struct GuardedStreamReader<T, A>
1909where
1910    A: AsAccessor,
1911{
1912    // This field is `None` to implement the conversion from this guard back to
1913    // `StreamReader`. When `None` is seen in the destructor it will cause the
1914    // destructor to do nothing.
1915    reader: Option<StreamReader<T>>,
1916    accessor: A,
1917}
1918
1919impl<T, A> GuardedStreamReader<T, A>
1920where
1921    A: AsAccessor,
1922{
1923    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1924    /// `reader`.
1925    ///
1926    /// # Panics
1927    ///
1928    /// Panics if [`Config::concurrency_support`] is not enabled.
1929    ///
1930    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1931    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1932        assert!(
1933            accessor
1934                .as_accessor()
1935                .with(|a| a.as_context().0.concurrency_support())
1936        );
1937        Self {
1938            reader: Some(reader),
1939            accessor,
1940        }
1941    }
1942
1943    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1944    /// back.
1945    pub fn into_stream(self) -> StreamReader<T> {
1946        self.into()
1947    }
1948}
1949
1950impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1951where
1952    A: AsAccessor,
1953{
1954    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1955        guard.reader.take().unwrap()
1956    }
1957}
1958
1959impl<T, A> Drop for GuardedStreamReader<T, A>
1960where
1961    A: AsAccessor,
1962{
1963    fn drop(&mut self) {
1964        if let Some(reader) = &mut self.reader {
1965            // Currently this can only fail if the future is closed twice, which
1966            // this guard prevents, so this error shouldn't happen.
1967            let result = reader.close_with(&self.accessor);
1968            debug_assert!(result.is_ok());
1969        }
1970    }
1971}
1972
1973/// Represents a Component Model `error-context`.
1974pub struct ErrorContext {
1975    rep: u32,
1976}
1977
1978impl ErrorContext {
1979    pub(crate) fn new(rep: u32) -> Self {
1980        Self { rep }
1981    }
1982
1983    /// Convert this `ErrorContext` into a [`Val`].
1984    pub fn into_val(self) -> Val {
1985        Val::ErrorContext(ErrorContextAny(self.rep))
1986    }
1987
1988    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1989    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1990        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1991            bail!("expected `error-context`; got `{}`", value.desc());
1992        };
1993        Ok(Self::new(*rep))
1994    }
1995
1996    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1997        match ty {
1998            InterfaceType::ErrorContext(src) => {
1999                let rep = cx
2000                    .instance_mut()
2001                    .table_for_error_context(src)
2002                    .error_context_rep(index)?;
2003
2004                Ok(Self { rep })
2005            }
2006            _ => func::bad_type_info(),
2007        }
2008    }
2009}
2010
2011pub(crate) fn lower_error_context_to_index<U>(
2012    rep: u32,
2013    cx: &mut LowerContext<'_, U>,
2014    ty: InterfaceType,
2015) -> Result<u32> {
2016    match ty {
2017        InterfaceType::ErrorContext(dst) => {
2018            let tbl = cx.instance_mut().table_for_error_context(dst);
2019            tbl.error_context_insert(rep)
2020        }
2021        _ => func::bad_type_info(),
2022    }
2023}
2024// SAFETY: This relies on the `ComponentType` implementation for `u32` being
2025// safe and correct since we lift and lower future handles as `u32`s.
2026unsafe impl func::ComponentType for ErrorContext {
2027    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
2028
2029    type Lower = <u32 as func::ComponentType>::Lower;
2030
2031    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
2032        match ty {
2033            InterfaceType::ErrorContext(_) => Ok(()),
2034            other => bail!("expected `error`, found `{}`", func::desc(other)),
2035        }
2036    }
2037}
2038
2039// SAFETY: See the comment on the `ComponentType` `impl` for this type.
2040unsafe impl func::Lower for ErrorContext {
2041    fn linear_lower_to_flat<T>(
2042        &self,
2043        cx: &mut LowerContext<'_, T>,
2044        ty: InterfaceType,
2045        dst: &mut MaybeUninit<Self::Lower>,
2046    ) -> Result<()> {
2047        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
2048            cx,
2049            InterfaceType::U32,
2050            dst,
2051        )
2052    }
2053
2054    fn linear_lower_to_memory<T>(
2055        &self,
2056        cx: &mut LowerContext<'_, T>,
2057        ty: InterfaceType,
2058        offset: usize,
2059    ) -> Result<()> {
2060        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
2061            cx,
2062            InterfaceType::U32,
2063            offset,
2064        )
2065    }
2066}
2067
2068// SAFETY: See the comment on the `ComponentType` `impl` for this type.
2069unsafe impl func::Lift for ErrorContext {
2070    fn linear_lift_from_flat(
2071        cx: &mut LiftContext<'_>,
2072        ty: InterfaceType,
2073        src: &Self::Lower,
2074    ) -> Result<Self> {
2075        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
2076        Self::lift_from_index(cx, ty, index)
2077    }
2078
2079    fn linear_lift_from_memory(
2080        cx: &mut LiftContext<'_>,
2081        ty: InterfaceType,
2082        bytes: &[u8],
2083    ) -> Result<Self> {
2084        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
2085        Self::lift_from_index(cx, ty, index)
2086    }
2087}
2088
2089/// Represents the read or write end of a stream or future.
2090pub(super) struct TransmitHandle {
2091    pub(super) common: WaitableCommon,
2092    /// See `TransmitState`
2093    state: TableId<TransmitState>,
2094}
2095
2096impl TransmitHandle {
2097    fn new(state: TableId<TransmitState>) -> Self {
2098        Self {
2099            common: WaitableCommon::default(),
2100            state,
2101        }
2102    }
2103}
2104
2105impl TableDebug for TransmitHandle {
2106    fn type_name() -> &'static str {
2107        "TransmitHandle"
2108    }
2109}
2110
2111/// Represents the state of a stream or future.
2112struct TransmitState {
2113    /// The write end of the stream or future.
2114    write_handle: TableId<TransmitHandle>,
2115    /// The read end of the stream or future.
2116    read_handle: TableId<TransmitHandle>,
2117    /// See `WriteState`
2118    write: WriteState,
2119    /// See `ReadState`
2120    read: ReadState,
2121    /// Whether further values may be transmitted via this stream or future.
2122    done: bool,
2123    /// The original creator of this stream, used for type-checking with
2124    /// `{Future,Stream}Any`.
2125    pub(super) origin: TransmitOrigin,
2126}
2127
2128#[derive(Copy, Clone)]
2129pub(super) enum TransmitOrigin {
2130    Host,
2131    GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2132    GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2133}
2134
2135impl TransmitState {
2136    fn new(origin: TransmitOrigin) -> Self {
2137        Self {
2138            write_handle: TableId::new(u32::MAX),
2139            read_handle: TableId::new(u32::MAX),
2140            read: ReadState::Open,
2141            write: WriteState::Open,
2142            done: false,
2143            origin,
2144        }
2145    }
2146}
2147
2148impl TableDebug for TransmitState {
2149    fn type_name() -> &'static str {
2150        "TransmitState"
2151    }
2152}
2153
2154impl TransmitOrigin {
2155    fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2156        match index {
2157            TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2158            TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2159        }
2160    }
2161}
2162
2163type PollStream = Box<
2164    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2165>;
2166
2167type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2168
2169/// Represents the state of the write end of a stream or future.
2170enum WriteState {
2171    /// The write end is open, but no write is pending.
2172    Open,
2173    /// The write end is owned by a guest task and a write is pending.
2174    GuestReady {
2175        instance: Instance,
2176        caller: RuntimeComponentInstanceIndex,
2177        ty: TransmitIndex,
2178        flat_abi: Option<FlatAbi>,
2179        options: OptionsIndex,
2180        address: usize,
2181        count: usize,
2182        handle: u32,
2183    },
2184    /// The write end is owned by the host, which is ready to produce items.
2185    HostReady {
2186        produce: PollStream,
2187        try_into: TryInto,
2188        guest_offset: usize,
2189        cancel: bool,
2190        cancel_waker: Option<Waker>,
2191    },
2192    /// The write end has been dropped.
2193    Dropped,
2194}
2195
2196impl fmt::Debug for WriteState {
2197    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2198        match self {
2199            Self::Open => f.debug_tuple("Open").finish(),
2200            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2201            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2202            Self::Dropped => f.debug_tuple("Dropped").finish(),
2203        }
2204    }
2205}
2206
2207/// Represents the state of the read end of a stream or future.
2208enum ReadState {
2209    /// The read end is open, but no read is pending.
2210    Open,
2211    /// The read end is owned by a guest task and a read is pending.
2212    GuestReady {
2213        ty: TransmitIndex,
2214        caller_instance: RuntimeComponentInstanceIndex,
2215        caller_thread: QualifiedThreadId,
2216        flat_abi: Option<FlatAbi>,
2217        instance: Instance,
2218        options: OptionsIndex,
2219        address: usize,
2220        count: usize,
2221        handle: u32,
2222    },
2223    /// The read end is owned by a host task, and it is ready to consume items.
2224    HostReady {
2225        consume: PollStream,
2226        guest_offset: usize,
2227        cancel: bool,
2228        cancel_waker: Option<Waker>,
2229    },
2230    /// Both the read and write ends are owned by the host.
2231    HostToHost {
2232        accept: Box<
2233            dyn for<'a> Fn(
2234                    &'a mut UntypedWriteBuffer<'a>,
2235                )
2236                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2237                + Send
2238                + Sync,
2239        >,
2240        buffer: Vec<u8>,
2241        limit: usize,
2242    },
2243    /// The read end has been dropped.
2244    Dropped,
2245}
2246
2247impl fmt::Debug for ReadState {
2248    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2249        match self {
2250            Self::Open => f.debug_tuple("Open").finish(),
2251            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2252            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2253            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2254            Self::Dropped => f.debug_tuple("Dropped").finish(),
2255        }
2256    }
2257}
2258
2259fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> Result<ReturnCode> {
2260    let count = guest_offset.try_into()?;
2261    Ok(match state {
2262        StreamResult::Dropped => ReturnCode::Dropped(count),
2263        StreamResult::Completed => ReturnCode::completed(kind, count),
2264        StreamResult::Cancelled => ReturnCode::Cancelled(count),
2265    })
2266}
2267
2268impl StoreOpaque {
2269    fn pipe_from_guest(
2270        &mut self,
2271        kind: TransmitKind,
2272        id: TableId<TransmitState>,
2273        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2274    ) {
2275        let future = async move {
2276            let stream_state = future.await?;
2277            tls::get(|store| {
2278                let state = store.concurrent_state_mut();
2279                let transmit = state.get_mut(id)?;
2280                let ReadState::HostReady {
2281                    consume,
2282                    guest_offset,
2283                    ..
2284                } = mem::replace(&mut transmit.read, ReadState::Open)
2285                else {
2286                    bail_bug!("expected ReadState::HostReady")
2287                };
2288                let code = return_code(kind, stream_state, guest_offset)?;
2289                transmit.read = match stream_state {
2290                    StreamResult::Dropped => ReadState::Dropped,
2291                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2292                        consume,
2293                        guest_offset: 0,
2294                        cancel: false,
2295                        cancel_waker: None,
2296                    },
2297                };
2298                let WriteState::GuestReady { ty, handle, .. } =
2299                    mem::replace(&mut transmit.write, WriteState::Open)
2300                else {
2301                    bail_bug!("expected WriteState::HostReady")
2302                };
2303                state.send_write_result(ty, id, handle, code)?;
2304                Ok(())
2305            })
2306        };
2307
2308        self.concurrent_state_mut().push_future(future.boxed());
2309    }
2310
2311    fn pipe_to_guest(
2312        &mut self,
2313        kind: TransmitKind,
2314        id: TableId<TransmitState>,
2315        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2316    ) {
2317        let future = async move {
2318            let stream_state = future.await?;
2319            tls::get(|store| {
2320                let state = store.concurrent_state_mut();
2321                let transmit = state.get_mut(id)?;
2322                let WriteState::HostReady {
2323                    produce,
2324                    try_into,
2325                    guest_offset,
2326                    ..
2327                } = mem::replace(&mut transmit.write, WriteState::Open)
2328                else {
2329                    bail_bug!("expected WriteState::HostReady")
2330                };
2331                let code = return_code(kind, stream_state, guest_offset)?;
2332                transmit.write = match stream_state {
2333                    StreamResult::Dropped => WriteState::Dropped,
2334                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2335                        produce,
2336                        try_into,
2337                        guest_offset: 0,
2338                        cancel: false,
2339                        cancel_waker: None,
2340                    },
2341                };
2342                let ReadState::GuestReady { ty, handle, .. } =
2343                    mem::replace(&mut transmit.read, ReadState::Open)
2344                else {
2345                    bail_bug!("expected ReadState::HostReady")
2346                };
2347                state.send_read_result(ty, id, handle, code)?;
2348                Ok(())
2349            })
2350        };
2351
2352        self.concurrent_state_mut().push_future(future.boxed());
2353    }
2354
2355    /// Drop the read end of a stream or future read from the host.
2356    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2357        let state = self.concurrent_state_mut();
2358        let transmit_id = state.get_mut(id)?.state;
2359        let transmit = state
2360            .get_mut(transmit_id)
2361            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2362        log::trace!(
2363            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2364            transmit.read,
2365            transmit.write
2366        );
2367
2368        transmit.read = ReadState::Dropped;
2369
2370        // If the write end is already dropped, it should stay dropped,
2371        // otherwise, it should be opened.
2372        let new_state = if let WriteState::Dropped = &transmit.write {
2373            WriteState::Dropped
2374        } else {
2375            WriteState::Open
2376        };
2377
2378        let write_handle = transmit.write_handle;
2379
2380        match mem::replace(&mut transmit.write, new_state) {
2381            // If a guest is waiting to write, notify it that the read end has
2382            // been dropped.
2383            WriteState::GuestReady { ty, handle, .. } => {
2384                state.update_event(
2385                    write_handle.rep(),
2386                    match ty {
2387                        TransmitIndex::Future(ty) => Event::FutureWrite {
2388                            code: ReturnCode::Dropped(0),
2389                            pending: Some((ty, handle)),
2390                        },
2391                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2392                            code: ReturnCode::Dropped(0),
2393                            pending: Some((ty, handle)),
2394                        },
2395                    },
2396                )?;
2397            }
2398
2399            WriteState::HostReady { .. } => {}
2400
2401            WriteState::Open => {
2402                state.update_event(
2403                    write_handle.rep(),
2404                    match kind {
2405                        TransmitKind::Future => Event::FutureWrite {
2406                            code: ReturnCode::Dropped(0),
2407                            pending: None,
2408                        },
2409                        TransmitKind::Stream => Event::StreamWrite {
2410                            code: ReturnCode::Dropped(0),
2411                            pending: None,
2412                        },
2413                    },
2414                )?;
2415            }
2416
2417            WriteState::Dropped => {
2418                log::trace!("host_drop_reader delete {transmit_id:?}");
2419                state.delete_transmit(transmit_id)?;
2420            }
2421        }
2422        Ok(())
2423    }
2424
2425    /// Drop the write end of a stream or future read from the host.
2426    fn host_drop_writer(
2427        &mut self,
2428        id: TableId<TransmitHandle>,
2429        on_drop_open: Option<fn() -> Result<()>>,
2430    ) -> Result<()> {
2431        let state = self.concurrent_state_mut();
2432        let transmit_id = state.get_mut(id)?.state;
2433        let transmit = state
2434            .get_mut(transmit_id)
2435            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2436        log::trace!(
2437            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2438            transmit.read,
2439            transmit.write
2440        );
2441
2442        // Existing queued transmits must be updated with information for the impending writer closure
2443        match &mut transmit.write {
2444            WriteState::GuestReady { .. } => {
2445                bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
2446            }
2447            WriteState::HostReady { .. } => {}
2448            v @ WriteState::Open => {
2449                if let (Some(on_drop_open), false) = (
2450                    on_drop_open,
2451                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2452                ) {
2453                    on_drop_open()?;
2454                } else {
2455                    *v = WriteState::Dropped;
2456                }
2457            }
2458            WriteState::Dropped => bail_bug!("write state is already dropped"),
2459        }
2460
2461        let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2462
2463        // If the existing read state is dropped, then there's nothing to read
2464        // and we can keep it that way.
2465        //
2466        // If the read state was any other state, then we must set the new state to open
2467        // to indicate that there *is* data to be read
2468        let new_state = if let ReadState::Dropped = &transmit.read {
2469            ReadState::Dropped
2470        } else {
2471            ReadState::Open
2472        };
2473
2474        let read_handle = transmit.read_handle;
2475
2476        // Swap in the new read state
2477        match mem::replace(&mut transmit.read, new_state) {
2478            // If the guest was ready to read, then we cannot drop the reader (or writer);
2479            // we must deliver the event, and update the state associated with the handle to
2480            // represent that a read must be performed
2481            ReadState::GuestReady { ty, handle, .. } => {
2482                // Ensure the final read of the guest is queued, with appropriate closure indicator
2483                self.concurrent_state_mut().update_event(
2484                    read_handle.rep(),
2485                    match ty {
2486                        TransmitIndex::Future(ty) => Event::FutureRead {
2487                            code: ReturnCode::Dropped(0),
2488                            pending: Some((ty, handle)),
2489                        },
2490                        TransmitIndex::Stream(ty) => Event::StreamRead {
2491                            code: ReturnCode::Dropped(0),
2492                            pending: Some((ty, handle)),
2493                        },
2494                    },
2495                )?;
2496            }
2497
2498            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2499
2500            // If the read state is open, then there are no registered readers of the stream/future
2501            ReadState::Open => {
2502                self.concurrent_state_mut().update_event(
2503                    read_handle.rep(),
2504                    match on_drop_open {
2505                        Some(_) => Event::FutureRead {
2506                            code: ReturnCode::Dropped(0),
2507                            pending: None,
2508                        },
2509                        None => Event::StreamRead {
2510                            code: ReturnCode::Dropped(0),
2511                            pending: None,
2512                        },
2513                    },
2514                )?;
2515            }
2516
2517            // If the read state was already dropped, then we can remove the transmit state completely
2518            // (both writer and reader have been dropped)
2519            ReadState::Dropped => {
2520                log::trace!("host_drop_writer delete {transmit_id:?}");
2521                self.concurrent_state_mut().delete_transmit(transmit_id)?;
2522            }
2523        }
2524        Ok(())
2525    }
2526
2527    pub(super) fn transmit_origin(
2528        &mut self,
2529        id: TableId<TransmitHandle>,
2530    ) -> Result<TransmitOrigin> {
2531        let state = self.concurrent_state_mut();
2532        let state_id = state.get_mut(id)?.state;
2533        Ok(state.get_mut(state_id)?.origin)
2534    }
2535}
2536
2537impl<T> StoreContextMut<'_, T> {
2538    fn new_transmit<P: StreamProducer<T>>(
2539        mut self,
2540        kind: TransmitKind,
2541        producer: P,
2542    ) -> Result<TableId<TransmitHandle>>
2543    where
2544        P::Item: func::Lower,
2545    {
2546        let token = StoreToken::new(self.as_context_mut());
2547        let state = self.0.concurrent_state_mut();
2548        let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
2549        let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
2550        let id = state.get_mut(read)?.state;
2551        let mut dropped = false;
2552        let produce = Box::new({
2553            let producer = producer.clone();
2554            move || {
2555                let producer = producer.clone();
2556                async move {
2557                    let mut state = producer.take()?;
2558                    let (mine, buffer) = &mut *state;
2559
2560                    let (result, cancelled) = if buffer.remaining().is_empty() {
2561                        future::poll_fn(|cx| {
2562                            tls::get(|store| {
2563                                let transmit = store.concurrent_state_mut().get_mut(id)?;
2564
2565                                let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2566                                    bail_bug!("expected WriteState::HostReady")
2567                                };
2568
2569                                let mut host_buffer =
2570                                    if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2571                                        Some(Cursor::new(mem::take(buffer)))
2572                                    } else {
2573                                        None
2574                                    };
2575
2576                                let poll = mine.as_mut().poll_produce(
2577                                    cx,
2578                                    token.as_context_mut(store),
2579                                    Destination {
2580                                        id,
2581                                        buffer,
2582                                        host_buffer: host_buffer.as_mut(),
2583                                        _phantom: PhantomData,
2584                                    },
2585                                    cancel,
2586                                );
2587
2588                                let transmit = store.concurrent_state_mut().get_mut(id)?;
2589
2590                                let host_offset = if let (
2591                                    Some(host_buffer),
2592                                    ReadState::HostToHost { buffer, limit, .. },
2593                                ) = (host_buffer, &mut transmit.read)
2594                                {
2595                                    *limit = usize::try_from(host_buffer.position())?;
2596                                    *buffer = host_buffer.into_inner();
2597                                    *limit
2598                                } else {
2599                                    0
2600                                };
2601
2602                                {
2603                                    let WriteState::HostReady {
2604                                        guest_offset,
2605                                        cancel,
2606                                        cancel_waker,
2607                                        ..
2608                                    } = &mut transmit.write
2609                                    else {
2610                                        bail_bug!("expected WriteState::HostReady")
2611                                    };
2612
2613                                    if poll.is_pending() {
2614                                        if !buffer.remaining().is_empty()
2615                                            || *guest_offset > 0
2616                                            || host_offset > 0
2617                                        {
2618                                            bail!(
2619                                                "StreamProducer::poll_produce returned Poll::Pending \
2620                                                 after producing at least one item"
2621                                            )
2622                                        }
2623                                        *cancel_waker = Some(cx.waker().clone());
2624                                    } else {
2625                                        *cancel_waker = None;
2626                                        *cancel = false;
2627                                    }
2628                                }
2629
2630                                Ok(poll.map(|v| v.map(|result| (result, cancel))))
2631                            })?
2632                        })
2633                            .await?
2634                    } else {
2635                        (StreamResult::Completed, false)
2636                    };
2637
2638                    let (guest_offset, host_offset, count) = tls::get(|store| {
2639                        let transmit = store.concurrent_state_mut().get_mut(id)?;
2640                        let (count, host_offset) = match &transmit.read {
2641                            &ReadState::GuestReady { count, .. } => (count, 0),
2642                            &ReadState::HostToHost { limit, .. } => (1, limit),
2643                            _ => bail_bug!("invalid read state"),
2644                        };
2645                        let guest_offset = match &transmit.write {
2646                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2647                            _ => bail_bug!("invalid write state"),
2648                        };
2649                        Ok((guest_offset, host_offset, count))
2650                    })?;
2651
2652                    match result {
2653                        StreamResult::Completed => {
2654                            if count > 1
2655                                && buffer.remaining().is_empty()
2656                                && guest_offset == 0
2657                                && host_offset == 0
2658                            {
2659                                bail!(
2660                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2661                                     without producing any items"
2662                                );
2663                            }
2664                        }
2665                        StreamResult::Cancelled => {
2666                            if !cancelled {
2667                                bail!(
2668                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2669                                     without being given a `finish` parameter value of true"
2670                                );
2671                            }
2672                        }
2673                        StreamResult::Dropped => {
2674                            dropped = true;
2675                        }
2676                    }
2677
2678                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2679
2680                    drop(state);
2681
2682                    if write_buffer {
2683                        write(token, id, producer.clone(), kind).await?;
2684                    }
2685
2686                    Ok(if dropped {
2687                        if producer.with(|p| p.1.remaining().is_empty())?  {
2688                            StreamResult::Dropped
2689                        } else {
2690                            StreamResult::Completed
2691                        }
2692                    } else {
2693                        result
2694                    })
2695                }
2696                .boxed()
2697            }
2698        });
2699        let try_into = Box::new(move |ty| {
2700            let (mine, buffer) = producer.try_lock().ok()?.take()?;
2701            match P::try_into(mine, ty) {
2702                Ok(value) => Some(value),
2703                Err(mine) => {
2704                    *producer.try_lock().ok()? = Some((mine, buffer));
2705                    None
2706                }
2707            }
2708        });
2709        state.get_mut(id)?.write = WriteState::HostReady {
2710            produce,
2711            try_into,
2712            guest_offset: 0,
2713            cancel: false,
2714            cancel_waker: None,
2715        };
2716        Ok(read)
2717    }
2718
2719    fn set_consumer<C: StreamConsumer<T>>(
2720        mut self,
2721        id: TableId<TransmitHandle>,
2722        kind: TransmitKind,
2723        consumer: C,
2724    ) -> Result<()> {
2725        let token = StoreToken::new(self.as_context_mut());
2726        let state = self.0.concurrent_state_mut();
2727        let id = state.get_mut(id)?.state;
2728        let transmit = state.get_mut(id)?;
2729        let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
2730        let consume_with_buffer = {
2731            let consumer = consumer.clone();
2732            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2733                let mut mine = consumer.take()?;
2734
2735                let host_buffer_remaining_before =
2736                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2737
2738                let (result, cancelled) = future::poll_fn(|cx| {
2739                    tls::get(|store| {
2740                        let cancel = match &store.concurrent_state_mut().get_mut(id)?.read {
2741                            &ReadState::HostReady { cancel, .. } => cancel,
2742                            ReadState::Open => false,
2743                            _ => bail_bug!("unexpected read state"),
2744                        };
2745
2746                        let poll = mine.as_mut().poll_consume(
2747                            cx,
2748                            token.as_context_mut(store),
2749                            Source {
2750                                id,
2751                                host_buffer: host_buffer.as_deref_mut(),
2752                            },
2753                            cancel,
2754                        );
2755
2756                        if let ReadState::HostReady {
2757                            cancel_waker,
2758                            cancel,
2759                            ..
2760                        } = &mut store.concurrent_state_mut().get_mut(id)?.read
2761                        {
2762                            if poll.is_pending() {
2763                                *cancel_waker = Some(cx.waker().clone());
2764                            } else {
2765                                *cancel_waker = None;
2766                                *cancel = false;
2767                            }
2768                        }
2769
2770                        Ok(poll.map(|v| v.map(|result| (result, cancel))))
2771                    })?
2772                })
2773                .await?;
2774
2775                let (guest_offset, count) = tls::get(|store| {
2776                    let transmit = store.concurrent_state_mut().get_mut(id)?;
2777                    Ok((
2778                        match &transmit.read {
2779                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2780                            ReadState::Open => 0,
2781                            _ => bail_bug!("invalid read state"),
2782                        },
2783                        match &transmit.write {
2784                            &WriteState::GuestReady { count, .. } => count,
2785                            WriteState::HostReady { .. } => match host_buffer_remaining_before {
2786                                Some(n) => n,
2787                                None => bail_bug!("host_buffer_remaining_before should be set"),
2788                            },
2789                            _ => bail_bug!("invalid write state"),
2790                        },
2791                    ))
2792                })?;
2793
2794                match result {
2795                    StreamResult::Completed => {
2796                        if count > 0
2797                            && guest_offset == 0
2798                            && host_buffer_remaining_before
2799                                .zip(host_buffer.map(|v| v.remaining().len()))
2800                                .map(|(before, after)| before == after)
2801                                .unwrap_or(false)
2802                        {
2803                            bail!(
2804                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2805                                 without consuming any items"
2806                            );
2807                        }
2808
2809                        if let TransmitKind::Future = kind {
2810                            tls::get(|store| {
2811                                store.concurrent_state_mut().get_mut(id)?.done = true;
2812                                crate::error::Ok(())
2813                            })?;
2814                        }
2815                    }
2816                    StreamResult::Cancelled => {
2817                        if !cancelled {
2818                            bail!(
2819                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2820                                 without being given a `finish` parameter value of true"
2821                            );
2822                        }
2823                    }
2824                    StreamResult::Dropped => {}
2825                }
2826
2827                Ok(result)
2828            }
2829        };
2830        let consume = {
2831            let consume = consume_with_buffer.clone();
2832            Box::new(move || {
2833                let consume = consume.clone();
2834                async move { consume(None).await }.boxed()
2835            })
2836        };
2837
2838        match &transmit.write {
2839            WriteState::Open => {
2840                transmit.read = ReadState::HostReady {
2841                    consume,
2842                    guest_offset: 0,
2843                    cancel: false,
2844                    cancel_waker: None,
2845                };
2846            }
2847            &WriteState::GuestReady { .. } => {
2848                let future = consume();
2849                transmit.read = ReadState::HostReady {
2850                    consume,
2851                    guest_offset: 0,
2852                    cancel: false,
2853                    cancel_waker: None,
2854                };
2855                self.0.pipe_from_guest(kind, id, future);
2856            }
2857            WriteState::HostReady { .. } => {
2858                let WriteState::HostReady { produce, .. } = mem::replace(
2859                    &mut transmit.write,
2860                    WriteState::HostReady {
2861                        produce: Box::new(|| {
2862                            Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
2863                        }),
2864                        try_into: Box::new(|_| None),
2865                        guest_offset: 0,
2866                        cancel: false,
2867                        cancel_waker: None,
2868                    },
2869                ) else {
2870                    bail_bug!("expected WriteState::HostReady")
2871                };
2872
2873                transmit.read = ReadState::HostToHost {
2874                    accept: Box::new(move |input| {
2875                        let consume = consume_with_buffer.clone();
2876                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2877                    }),
2878                    buffer: Vec::new(),
2879                    limit: 0,
2880                };
2881
2882                let future = async move {
2883                    loop {
2884                        if tls::get(|store| {
2885                            crate::error::Ok(matches!(
2886                                store.concurrent_state_mut().get_mut(id)?.read,
2887                                ReadState::Dropped
2888                            ))
2889                        })? {
2890                            break Ok(());
2891                        }
2892
2893                        match produce().await? {
2894                            StreamResult::Completed | StreamResult::Cancelled => {}
2895                            StreamResult::Dropped => break Ok(()),
2896                        }
2897
2898                        if let TransmitKind::Future = kind {
2899                            break Ok(());
2900                        }
2901                    }
2902                }
2903                .map(move |result| {
2904                    tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2905                    result
2906                });
2907
2908                state.push_future(Box::pin(future));
2909            }
2910            WriteState::Dropped => {
2911                let reader = transmit.read_handle;
2912                self.0.host_drop_reader(reader, kind)?;
2913            }
2914        }
2915        Ok(())
2916    }
2917}
2918
2919async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2920    token: StoreToken<D>,
2921    id: TableId<TransmitState>,
2922    pair: Arc<LockedState<(P, B)>>,
2923    kind: TransmitKind,
2924) -> Result<()> {
2925    let (read, guest_offset) = tls::get(|store| {
2926        let transmit = store.concurrent_state_mut().get_mut(id)?;
2927
2928        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2929            Some(guest_offset)
2930        } else {
2931            None
2932        };
2933
2934        crate::error::Ok((
2935            mem::replace(&mut transmit.read, ReadState::Open),
2936            guest_offset,
2937        ))
2938    })?;
2939
2940    match read {
2941        ReadState::GuestReady {
2942            ty,
2943            flat_abi,
2944            options,
2945            address,
2946            count,
2947            handle,
2948            instance,
2949            caller_instance,
2950            caller_thread,
2951        } => {
2952            let guest_offset = match guest_offset {
2953                Some(i) => i,
2954                None => bail_bug!("guest_offset should be present if ready"),
2955            };
2956
2957            if let TransmitKind::Future = kind {
2958                tls::get(|store| {
2959                    store.concurrent_state_mut().get_mut(id)?.done = true;
2960                    crate::error::Ok(())
2961                })?;
2962            }
2963
2964            let old_remaining = pair.with(|p| p.1.remaining().len())?;
2965            let accept = {
2966                let pair = pair.clone();
2967                move |mut store: StoreContextMut<D>| {
2968                    let mut state = pair.take()?;
2969                    lower::<T, B, D>(
2970                        store.as_context_mut(),
2971                        instance,
2972                        caller_thread,
2973                        options,
2974                        ty,
2975                        address + (T::SIZE32 * guest_offset),
2976                        count - guest_offset,
2977                        &mut state.1,
2978                    )?;
2979                    crate::error::Ok(())
2980                }
2981            };
2982
2983            if guest_offset < count {
2984                if T::MAY_REQUIRE_REALLOC {
2985                    // For payloads which may require a realloc call, use a
2986                    // oneshot::channel and background task.  This is
2987                    // necessary because calling the guest while there are
2988                    // host embedder frames on the stack is unsound.
2989                    let (tx, rx) = oneshot::channel();
2990                    tls::get(move |store| {
2991                        store
2992                            .concurrent_state_mut()
2993                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2994                                move |store| {
2995                                    _ = tx.send(accept(token.as_context_mut(store))?);
2996                                    Ok(())
2997                                },
2998                            ))))
2999                    });
3000                    rx.await?
3001                } else {
3002                    // Optimize flat payloads (i.e. those which do not
3003                    // require calling the guest's realloc function) by
3004                    // lowering directly instead of using a oneshot::channel
3005                    // and background task.
3006                    tls::get(|store| accept(token.as_context_mut(store)))?
3007                };
3008            }
3009
3010            tls::get(|store| {
3011                let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
3012
3013                let transmit = store.concurrent_state_mut().get_mut(id)?;
3014
3015                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3016                    bail_bug!("expected WriteState::HostReady")
3017                };
3018
3019                *guest_offset += count;
3020
3021                transmit.read = ReadState::GuestReady {
3022                    ty,
3023                    flat_abi,
3024                    options,
3025                    address,
3026                    count,
3027                    handle,
3028                    instance,
3029                    caller_instance,
3030                    caller_thread,
3031                };
3032
3033                crate::error::Ok(())
3034            })?;
3035
3036            Ok(())
3037        }
3038
3039        ReadState::HostToHost {
3040            accept,
3041            mut buffer,
3042            limit,
3043        } => {
3044            let mut state = StreamResult::Completed;
3045            let mut position = 0;
3046
3047            while !matches!(state, StreamResult::Dropped) && position < limit {
3048                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
3049                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
3050                (buffer, position, _) = slice_buffer.into_parts();
3051            }
3052
3053            {
3054                let mut pair = pair.take()?;
3055                let (_, buffer) = &mut *pair;
3056
3057                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
3058                    state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
3059                }
3060            }
3061
3062            tls::get(|store| {
3063                store.concurrent_state_mut().get_mut(id)?.read = match state {
3064                    StreamResult::Dropped => ReadState::Dropped,
3065                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
3066                        accept,
3067                        buffer,
3068                        limit: 0,
3069                    },
3070                };
3071
3072                crate::error::Ok(())
3073            })?;
3074            Ok(())
3075        }
3076
3077        _ => bail_bug!("unexpected read state"),
3078    }
3079}
3080
3081impl Instance {
3082    /// Handle a host- or guest-initiated write by delivering the item(s) to the
3083    /// `StreamConsumer` for the specified stream or future.
3084    fn consume(
3085        self,
3086        store: &mut dyn VMStore,
3087        kind: TransmitKind,
3088        transmit_id: TableId<TransmitState>,
3089        consume: PollStream,
3090        guest_offset: usize,
3091        cancel: bool,
3092    ) -> Result<ReturnCode> {
3093        let mut future = consume();
3094        store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
3095            consume,
3096            guest_offset,
3097            cancel,
3098            cancel_waker: None,
3099        };
3100        let poll = tls::set(store, || {
3101            future
3102                .as_mut()
3103                .poll(&mut Context::from_waker(&Waker::noop()))
3104        });
3105
3106        Ok(match poll {
3107            Poll::Ready(state) => {
3108                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3109                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
3110                    bail_bug!("expected ReadState::HostReady")
3111                };
3112                let code = return_code(kind, state?, mem::replace(guest_offset, 0))?;
3113                transmit.write = WriteState::Open;
3114                code
3115            }
3116            Poll::Pending => {
3117                store.pipe_from_guest(kind, transmit_id, future);
3118                ReturnCode::Blocked
3119            }
3120        })
3121    }
3122
3123    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
3124    /// for the specified stream or future for items.
3125    fn produce(
3126        self,
3127        store: &mut dyn VMStore,
3128        kind: TransmitKind,
3129        transmit_id: TableId<TransmitState>,
3130        produce: PollStream,
3131        try_into: TryInto,
3132        guest_offset: usize,
3133        cancel: bool,
3134    ) -> Result<ReturnCode> {
3135        let mut future = produce();
3136        store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3137            produce,
3138            try_into,
3139            guest_offset,
3140            cancel,
3141            cancel_waker: None,
3142        };
3143        let poll = tls::set(store, || {
3144            future
3145                .as_mut()
3146                .poll(&mut Context::from_waker(&Waker::noop()))
3147        });
3148
3149        Ok(match poll {
3150            Poll::Ready(state) => {
3151                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3152                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3153                    bail_bug!("expected WriteState::HostReady")
3154                };
3155                let code = return_code(kind, state?, mem::replace(guest_offset, 0))?;
3156                transmit.read = ReadState::Open;
3157                code
3158            }
3159            Poll::Pending => {
3160                store.pipe_to_guest(kind, transmit_id, future);
3161                ReturnCode::Blocked
3162            }
3163        })
3164    }
3165
3166    /// Drop the writable end of the specified stream or future from the guest.
3167    pub(super) fn guest_drop_writable(
3168        self,
3169        store: &mut StoreOpaque,
3170        ty: TransmitIndex,
3171        writer: u32,
3172    ) -> Result<()> {
3173        let table = self.id().get_mut(store).table_for_transmit(ty);
3174        let transmit_rep = match ty {
3175            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3176            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3177        };
3178
3179        let id = TableId::<TransmitHandle>::new(transmit_rep);
3180        log::trace!("guest_drop_writable: drop writer {id:?}");
3181        match ty {
3182            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3183            TransmitIndex::Future(_) => store.host_drop_writer(
3184                id,
3185                Some(|| {
3186                    Err(format_err!(
3187                        "cannot drop future write end without first writing a value"
3188                    ))
3189                }),
3190            ),
3191        }
3192    }
3193
3194    /// Copy `count` items from `read_address` to `write_address` for the
3195    /// specified stream or future.
3196    fn copy<T: 'static>(
3197        self,
3198        mut store: StoreContextMut<T>,
3199        flat_abi: Option<FlatAbi>,
3200        write_caller_instance: RuntimeComponentInstanceIndex,
3201        write_ty: TransmitIndex,
3202        write_options: OptionsIndex,
3203        write_address: usize,
3204        read_caller_instance: RuntimeComponentInstanceIndex,
3205        read_caller_thread: QualifiedThreadId,
3206        read_ty: TransmitIndex,
3207        read_options: OptionsIndex,
3208        read_address: usize,
3209        count: usize,
3210        rep: u32,
3211    ) -> Result<()> {
3212        let types = self.id().get(store.0).component().types();
3213        match (write_ty, read_ty) {
3214            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3215                if count != 1 {
3216                    bail_bug!("futures can only send 1 item");
3217                }
3218
3219                let payload = types[types[write_ty].ty].payload;
3220
3221                if write_caller_instance == read_caller_instance
3222                    && !allow_intra_component_read_write(payload)
3223                {
3224                    bail!(
3225                        "cannot read from and write to intra-component future with non-numeric payload"
3226                    )
3227                }
3228
3229                let val = payload
3230                    .map(|ty| {
3231                        let lift =
3232                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3233
3234                        let abi = lift.types.canonical_abi(&ty);
3235                        // FIXME: needs to read an i64 for memory64
3236                        if write_address % usize::try_from(abi.align32)? != 0 {
3237                            bail!("write pointer not aligned");
3238                        }
3239
3240                        let bytes = lift
3241                            .memory()
3242                            .get(write_address..)
3243                            .and_then(|b| b.get(..usize::try_from(abi.size32).ok()?))
3244                            .ok_or_else(|| {
3245                                crate::format_err!("write pointer out of bounds of memory")
3246                            })?;
3247
3248                        Val::load(lift, ty, bytes)
3249                    })
3250                    .transpose()?;
3251
3252                if let Some(val) = val {
3253                    // Serializing the value may require calling the guest's realloc function, so we
3254                    // set the guest's thread context in case realloc requires it, and restore the original
3255                    // thread context after the copy is complete.
3256                    let old_thread = store.0.set_thread(read_caller_thread)?;
3257                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3258                    let types = lower.types;
3259                    let ty = match types[types[read_ty].ty].payload {
3260                        Some(ty) => ty,
3261                        None => bail_bug!("expected payload type to be present"),
3262                    };
3263                    let ptr = func::validate_inbounds_dynamic(
3264                        types.canonical_abi(&ty),
3265                        lower.as_slice_mut(),
3266                        &ValRaw::u32(read_address.try_into()?),
3267                    )?;
3268                    val.store(lower, ty, ptr)?;
3269                    store.0.set_thread(old_thread)?;
3270                }
3271            }
3272            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3273                if write_caller_instance == read_caller_instance
3274                    && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3275                {
3276                    bail!(
3277                        "cannot read from and write to intra-component stream with non-numeric payload"
3278                    )
3279                }
3280
3281                if let Some(flat_abi) = flat_abi {
3282                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
3283                    let length_in_bytes = usize::try_from(flat_abi.size)? * count;
3284                    if length_in_bytes > 0 {
3285                        if write_address % usize::try_from(flat_abi.align)? != 0 {
3286                            bail!("write pointer not aligned");
3287                        }
3288                        if read_address % usize::try_from(flat_abi.align)? != 0 {
3289                            bail!("read pointer not aligned");
3290                        }
3291
3292                        let store_opaque = store.0.store_opaque_mut();
3293
3294                        {
3295                            let src = self
3296                                .options_memory(store_opaque, write_options)
3297                                .get(write_address..)
3298                                .and_then(|b| b.get(..length_in_bytes))
3299                                .ok_or_else(|| {
3300                                    crate::format_err!("write pointer out of bounds of memory")
3301                                })?
3302                                .as_ptr();
3303                            let dst = self
3304                                .options_memory_mut(store_opaque, read_options)
3305                                .get_mut(read_address..)
3306                                .and_then(|b| b.get_mut(..length_in_bytes))
3307                                .ok_or_else(|| {
3308                                    crate::format_err!("read pointer out of bounds of memory")
3309                                })?
3310                                .as_mut_ptr();
3311                            // SAFETY: Both `src` and `dst` have been validated
3312                            // above.
3313                            unsafe {
3314                                if write_caller_instance == read_caller_instance {
3315                                    // If the same instance owns both ends of
3316                                    // the stream, the source and destination
3317                                    // buffers might overlap.
3318                                    src.copy_to(dst, length_in_bytes)
3319                                } else {
3320                                    // Since the read and write ends of the
3321                                    // stream are owned by distinct instances,
3322                                    // the buffers cannot possibly belong to the
3323                                    // same memory and thus cannot overlap.
3324                                    src.copy_to_nonoverlapping(dst, length_in_bytes)
3325                                }
3326                            }
3327                        }
3328                    }
3329                } else {
3330                    let store_opaque = store.0.store_opaque_mut();
3331                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
3332                    let ty = match lift.types[lift.types[write_ty].ty].payload {
3333                        Some(ty) => ty,
3334                        None => bail_bug!("expected payload type to be present"),
3335                    };
3336                    let abi = lift.types.canonical_abi(&ty);
3337                    let size = usize::try_from(abi.size32)?;
3338                    if write_address % usize::try_from(abi.align32)? != 0 {
3339                        bail!("write pointer not aligned");
3340                    }
3341                    let bytes = lift
3342                        .memory()
3343                        .get(write_address..)
3344                        .and_then(|b| b.get(..size * count))
3345                        .ok_or_else(|| {
3346                            crate::format_err!("write pointer out of bounds of memory")
3347                        })?;
3348                    lift.consume_fuel_array(count, size_of::<Val>())?;
3349
3350                    let values = (0..count)
3351                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3352                        .collect::<Result<Vec<_>>>()?;
3353
3354                    let id = TableId::<TransmitHandle>::new(rep);
3355                    log::trace!("copy values {values:?} for {id:?}");
3356
3357                    // Serializing the value may require calling the guest's realloc function, so we
3358                    // set the guest's thread context in case realloc requires it, and restore the original
3359                    // thread context after the copy is complete.
3360                    let old_thread = store.0.set_thread(read_caller_thread)?;
3361                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3362                    let ty = match lower.types[lower.types[read_ty].ty].payload {
3363                        Some(ty) => ty,
3364                        None => bail_bug!("expected payload type to be present"),
3365                    };
3366                    let abi = lower.types.canonical_abi(&ty);
3367                    if read_address % usize::try_from(abi.align32)? != 0 {
3368                        bail!("read pointer not aligned");
3369                    }
3370                    let size = usize::try_from(abi.size32)?;
3371                    lower
3372                        .as_slice_mut()
3373                        .get_mut(read_address..)
3374                        .and_then(|b| b.get_mut(..size * count))
3375                        .ok_or_else(|| {
3376                            crate::format_err!("read pointer out of bounds of memory")
3377                        })?;
3378                    let mut ptr = read_address;
3379                    for value in values {
3380                        value.store(lower, ty, ptr)?;
3381                        ptr += size
3382                    }
3383                    store.0.set_thread(old_thread)?;
3384                }
3385            }
3386            _ => bail_bug!("mismatched transmit types in copy"),
3387        }
3388
3389        Ok(())
3390    }
3391
3392    fn check_bounds(
3393        self,
3394        store: &StoreOpaque,
3395        options: OptionsIndex,
3396        ty: TransmitIndex,
3397        address: usize,
3398        count: usize,
3399    ) -> Result<()> {
3400        let types = self.id().get(store).component().types();
3401        let size = usize::try_from(
3402            match ty {
3403                TransmitIndex::Future(ty) => types[types[ty].ty]
3404                    .payload
3405                    .map(|ty| types.canonical_abi(&ty).size32),
3406                TransmitIndex::Stream(ty) => types[types[ty].ty]
3407                    .payload
3408                    .map(|ty| types.canonical_abi(&ty).size32),
3409            }
3410            .unwrap_or(0),
3411        )?;
3412
3413        if count > 0 && size > 0 {
3414            self.options_memory(store, options)
3415                .get(address..)
3416                .and_then(|b| b.get(..(size * count)))
3417                .map(drop)
3418                .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3419        } else {
3420            Ok(())
3421        }
3422    }
3423
3424    /// Write to the specified stream or future from the guest.
3425    pub(super) fn guest_write<T: 'static>(
3426        self,
3427        mut store: StoreContextMut<T>,
3428        caller: RuntimeComponentInstanceIndex,
3429        ty: TransmitIndex,
3430        options: OptionsIndex,
3431        flat_abi: Option<FlatAbi>,
3432        handle: u32,
3433        address: u32,
3434        count: u32,
3435    ) -> Result<ReturnCode> {
3436        if !self.options(store.0, options).async_ {
3437            // The caller may only sync call `{stream,future}.write` from an
3438            // async task (i.e. a task created via a call to an async export).
3439            // Otherwise, we'll trap.
3440            store.0.check_blocking()?;
3441        }
3442
3443        let address = usize::try_from(address)?;
3444        let count = usize::try_from(count)?;
3445        self.check_bounds(store.0, options, ty, address, count)?;
3446        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3447        let TransmitLocalState::Write { done } = *state else {
3448            bail!(
3449                "invalid handle {handle}; expected `Write`; got {:?}",
3450                *state
3451            );
3452        };
3453
3454        if done {
3455            bail!("cannot write to stream after being notified that the readable end dropped");
3456        }
3457
3458        *state = TransmitLocalState::Busy;
3459        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3460        let concurrent_state = store.0.concurrent_state_mut();
3461        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3462        let transmit = concurrent_state.get_mut(transmit_id)?;
3463        log::trace!(
3464            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3465            transmit.read
3466        );
3467
3468        if transmit.done {
3469            bail!("cannot write to future after previous write succeeded or readable end dropped");
3470        }
3471
3472        let new_state = if let ReadState::Dropped = &transmit.read {
3473            ReadState::Dropped
3474        } else {
3475            ReadState::Open
3476        };
3477
3478        let set_guest_ready = |me: &mut ConcurrentState| {
3479            let transmit = me.get_mut(transmit_id)?;
3480            if !matches!(&transmit.write, WriteState::Open) {
3481                bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
3482            }
3483            transmit.write = WriteState::GuestReady {
3484                instance: self,
3485                caller,
3486                ty,
3487                flat_abi,
3488                options,
3489                address,
3490                count,
3491                handle,
3492            };
3493            Ok::<_, crate::Error>(())
3494        };
3495
3496        let mut result = match mem::replace(&mut transmit.read, new_state) {
3497            ReadState::GuestReady {
3498                ty: read_ty,
3499                flat_abi: read_flat_abi,
3500                options: read_options,
3501                address: read_address,
3502                count: read_count,
3503                handle: read_handle,
3504                instance: read_instance,
3505                caller_instance: read_caller_instance,
3506                caller_thread: read_caller_thread,
3507            } => {
3508                if flat_abi != read_flat_abi {
3509                    bail_bug!("expected flat ABI calculations to be the same");
3510                }
3511
3512                if let TransmitIndex::Future(_) = ty {
3513                    transmit.done = true;
3514                }
3515
3516                // Note that zero-length reads and writes are handling specially
3517                // by the spec to allow each end to signal readiness to the
3518                // other.  Quoting the spec:
3519                //
3520                // ```
3521                // The meaning of a read or write when the length is 0 is that
3522                // the caller is querying the "readiness" of the other
3523                // side. When a 0-length read/write rendezvous with a
3524                // non-0-length read/write, only the 0-length read/write
3525                // completes; the non-0-length read/write is kept pending (and
3526                // ready for a subsequent rendezvous).
3527                //
3528                // In the corner case where a 0-length read and write
3529                // rendezvous, only the writer is notified of readiness. To
3530                // avoid livelock, the Canonical ABI requires that a writer must
3531                // (eventually) follow a completed 0-length write with a
3532                // non-0-length write that is allowed to block (allowing the
3533                // reader end to run and rendezvous with its own non-0-length
3534                // read).
3535                // ```
3536
3537                let write_complete = count == 0 || read_count > 0;
3538                let read_complete = count > 0;
3539                let read_buffer_remaining = count < read_count;
3540
3541                let read_handle_rep = transmit.read_handle.rep();
3542
3543                let count = count.min(read_count);
3544
3545                self.copy(
3546                    store.as_context_mut(),
3547                    flat_abi,
3548                    caller,
3549                    ty,
3550                    options,
3551                    address,
3552                    read_caller_instance,
3553                    read_caller_thread,
3554                    read_ty,
3555                    read_options,
3556                    read_address,
3557                    count,
3558                    rep,
3559                )?;
3560
3561                let instance = self.id().get_mut(store.0);
3562                let types = instance.component().types();
3563                let item_size = match payload(ty, types) {
3564                    Some(ty) => usize::try_from(types.canonical_abi(&ty).size32)?,
3565                    None => 0,
3566                };
3567                let concurrent_state = store.0.concurrent_state_mut();
3568                if read_complete {
3569                    let count = u32::try_from(count)?;
3570                    let total = if let Some(Event::StreamRead {
3571                        code: ReturnCode::Completed(old_total),
3572                        ..
3573                    }) = concurrent_state.take_event(read_handle_rep)?
3574                    {
3575                        count + old_total
3576                    } else {
3577                        count
3578                    };
3579
3580                    let code = ReturnCode::completed(ty.kind(), total);
3581
3582                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3583                }
3584
3585                if read_buffer_remaining {
3586                    let transmit = concurrent_state.get_mut(transmit_id)?;
3587                    transmit.read = ReadState::GuestReady {
3588                        ty: read_ty,
3589                        flat_abi: read_flat_abi,
3590                        options: read_options,
3591                        address: read_address + (count * item_size),
3592                        count: read_count - count,
3593                        handle: read_handle,
3594                        instance: read_instance,
3595                        caller_instance: read_caller_instance,
3596                        caller_thread: read_caller_thread,
3597                    };
3598                }
3599
3600                if write_complete {
3601                    ReturnCode::completed(ty.kind(), count.try_into()?)
3602                } else {
3603                    set_guest_ready(concurrent_state)?;
3604                    ReturnCode::Blocked
3605                }
3606            }
3607
3608            ReadState::HostReady {
3609                consume,
3610                guest_offset,
3611                cancel,
3612                cancel_waker,
3613            } => {
3614                if cancel_waker.is_some() {
3615                    bail_bug!("expected cancel_waker to be none");
3616                }
3617                if cancel {
3618                    bail_bug!("expected cancel to be false");
3619                }
3620                if guest_offset != 0 {
3621                    bail_bug!("expected guest_offset to be 0");
3622                }
3623
3624                if let TransmitIndex::Future(_) = ty {
3625                    transmit.done = true;
3626                }
3627
3628                set_guest_ready(concurrent_state)?;
3629                self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3630            }
3631
3632            ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
3633
3634            ReadState::Open => {
3635                set_guest_ready(concurrent_state)?;
3636                ReturnCode::Blocked
3637            }
3638
3639            ReadState::Dropped => {
3640                if let TransmitIndex::Future(_) = ty {
3641                    transmit.done = true;
3642                }
3643
3644                ReturnCode::Dropped(0)
3645            }
3646        };
3647
3648        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3649            result = self.wait_for_write(store.0, transmit_handle)?;
3650        }
3651
3652        if result != ReturnCode::Blocked {
3653            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3654                TransmitLocalState::Write {
3655                    done: matches!(
3656                        (result, ty),
3657                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3658                    ),
3659                };
3660        }
3661
3662        log::trace!(
3663            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3664        );
3665
3666        Ok(result)
3667    }
3668
3669    /// Read from the specified stream or future from the guest.
3670    pub(super) fn guest_read<T: 'static>(
3671        self,
3672        mut store: StoreContextMut<T>,
3673        caller_instance: RuntimeComponentInstanceIndex,
3674        ty: TransmitIndex,
3675        options: OptionsIndex,
3676        flat_abi: Option<FlatAbi>,
3677        handle: u32,
3678        address: u32,
3679        count: u32,
3680    ) -> Result<ReturnCode> {
3681        if !self.options(store.0, options).async_ {
3682            // The caller may only sync call `{stream,future}.read` from an
3683            // async task (i.e. a task created via a call to an async export).
3684            // Otherwise, we'll trap.
3685            store.0.check_blocking()?;
3686        }
3687
3688        let address = usize::try_from(address)?;
3689        let count = usize::try_from(count)?;
3690        self.check_bounds(store.0, options, ty, address, count)?;
3691        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3692        let TransmitLocalState::Read { done } = *state else {
3693            bail_bug!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3694        };
3695
3696        if done {
3697            bail!("cannot read from stream after being notified that the writable end dropped");
3698        }
3699
3700        *state = TransmitLocalState::Busy;
3701        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3702        let concurrent_state = store.0.concurrent_state_mut();
3703        let caller_thread = concurrent_state.current_guest_thread()?;
3704        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3705        let transmit = concurrent_state.get_mut(transmit_id)?;
3706        log::trace!(
3707            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3708            transmit.write
3709        );
3710
3711        if transmit.done {
3712            bail!("cannot read from future after previous read succeeded");
3713        }
3714
3715        let new_state = if let WriteState::Dropped = &transmit.write {
3716            WriteState::Dropped
3717        } else {
3718            WriteState::Open
3719        };
3720
3721        let set_guest_ready = |me: &mut ConcurrentState| {
3722            let transmit = me.get_mut(transmit_id)?;
3723            if !matches!(&transmit.read, ReadState::Open) {
3724                bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
3725            }
3726            transmit.read = ReadState::GuestReady {
3727                ty,
3728                flat_abi,
3729                options,
3730                address,
3731                count,
3732                handle,
3733                instance: self,
3734                caller_instance,
3735                caller_thread,
3736            };
3737            Ok::<_, crate::Error>(())
3738        };
3739
3740        let mut result = match mem::replace(&mut transmit.write, new_state) {
3741            WriteState::GuestReady {
3742                instance: _,
3743                ty: write_ty,
3744                flat_abi: write_flat_abi,
3745                options: write_options,
3746                address: write_address,
3747                count: write_count,
3748                handle: write_handle,
3749                caller: write_caller,
3750            } => {
3751                if flat_abi != write_flat_abi {
3752                    bail_bug!("expected flat ABI calculations to be the same");
3753                }
3754
3755                if let TransmitIndex::Future(_) = ty {
3756                    transmit.done = true;
3757                }
3758
3759                let write_handle_rep = transmit.write_handle.rep();
3760
3761                // See the comment in `guest_write` for the
3762                // `ReadState::GuestReady` case concerning zero-length reads and
3763                // writes.
3764
3765                let write_complete = write_count == 0 || count > 0;
3766                let read_complete = write_count > 0;
3767                let write_buffer_remaining = count < write_count;
3768
3769                let count = count.min(write_count);
3770
3771                self.copy(
3772                    store.as_context_mut(),
3773                    flat_abi,
3774                    write_caller,
3775                    write_ty,
3776                    write_options,
3777                    write_address,
3778                    caller_instance,
3779                    caller_thread,
3780                    ty,
3781                    options,
3782                    address,
3783                    count,
3784                    rep,
3785                )?;
3786
3787                let instance = self.id().get_mut(store.0);
3788                let types = instance.component().types();
3789                let item_size = match payload(ty, types) {
3790                    Some(ty) => usize::try_from(types.canonical_abi(&ty).size32)?,
3791                    None => 0,
3792                };
3793                let concurrent_state = store.0.concurrent_state_mut();
3794
3795                if write_complete {
3796                    let count = u32::try_from(count)?;
3797                    let total = if let Some(Event::StreamWrite {
3798                        code: ReturnCode::Completed(old_total),
3799                        ..
3800                    }) = concurrent_state.take_event(write_handle_rep)?
3801                    {
3802                        count + old_total
3803                    } else {
3804                        count
3805                    };
3806
3807                    let code = ReturnCode::completed(ty.kind(), total);
3808
3809                    concurrent_state.send_write_result(
3810                        write_ty,
3811                        transmit_id,
3812                        write_handle,
3813                        code,
3814                    )?;
3815                }
3816
3817                if write_buffer_remaining {
3818                    let transmit = concurrent_state.get_mut(transmit_id)?;
3819                    transmit.write = WriteState::GuestReady {
3820                        instance: self,
3821                        caller: write_caller,
3822                        ty: write_ty,
3823                        flat_abi: write_flat_abi,
3824                        options: write_options,
3825                        address: write_address + (count * item_size),
3826                        count: write_count - count,
3827                        handle: write_handle,
3828                    };
3829                }
3830
3831                if read_complete {
3832                    ReturnCode::completed(ty.kind(), count.try_into()?)
3833                } else {
3834                    set_guest_ready(concurrent_state)?;
3835                    ReturnCode::Blocked
3836                }
3837            }
3838
3839            WriteState::HostReady {
3840                produce,
3841                try_into,
3842                guest_offset,
3843                cancel,
3844                cancel_waker,
3845            } => {
3846                if cancel_waker.is_some() {
3847                    bail_bug!("expected cancel_waker to be none");
3848                }
3849                if cancel {
3850                    bail_bug!("expected cancel to be false");
3851                }
3852                if guest_offset != 0 {
3853                    bail_bug!("expected guest_offset to be 0");
3854                }
3855
3856                set_guest_ready(concurrent_state)?;
3857
3858                let code =
3859                    self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3860
3861                if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3862                    store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3863                }
3864
3865                code
3866            }
3867
3868            WriteState::Open => {
3869                set_guest_ready(concurrent_state)?;
3870                ReturnCode::Blocked
3871            }
3872
3873            WriteState::Dropped => ReturnCode::Dropped(0),
3874        };
3875
3876        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3877            result = self.wait_for_read(store.0, transmit_handle)?;
3878        }
3879
3880        if result != ReturnCode::Blocked {
3881            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3882                TransmitLocalState::Read {
3883                    done: matches!(
3884                        (result, ty),
3885                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3886                    ),
3887                };
3888        }
3889
3890        log::trace!(
3891            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3892        );
3893
3894        Ok(result)
3895    }
3896
3897    fn wait_for_write(
3898        self,
3899        store: &mut StoreOpaque,
3900        handle: TableId<TransmitHandle>,
3901    ) -> Result<ReturnCode> {
3902        let waitable = Waitable::Transmit(handle);
3903        store.wait_for_event(waitable)?;
3904        let event = waitable.take_event(store.concurrent_state_mut())?;
3905        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3906            event
3907        {
3908            waitable.on_delivery(store, self, event)?;
3909            Ok(code)
3910        } else {
3911            bail_bug!("expected either a stream or future write event")
3912        }
3913    }
3914
3915    /// Cancel a pending stream or future write.
3916    fn cancel_write(
3917        self,
3918        store: &mut StoreOpaque,
3919        transmit_id: TableId<TransmitState>,
3920        async_: bool,
3921    ) -> Result<ReturnCode> {
3922        let state = store.concurrent_state_mut();
3923        let transmit = state.get_mut(transmit_id)?;
3924        log::trace!(
3925            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3926            transmit.read,
3927            transmit.write
3928        );
3929
3930        let code = if let Some(event) =
3931            Waitable::Transmit(transmit.write_handle).take_event(state)?
3932        {
3933            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3934                bail_bug!("expected either a stream or future write event")
3935            };
3936            match (code, event) {
3937                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3938                    ReturnCode::Cancelled(count)
3939                }
3940                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3941                _ => bail_bug!("unexpected code/event combo"),
3942            }
3943        } else if let ReadState::HostReady {
3944            cancel,
3945            cancel_waker,
3946            ..
3947        } = &mut state.get_mut(transmit_id)?.read
3948        {
3949            *cancel = true;
3950            if let Some(waker) = cancel_waker.take() {
3951                waker.wake();
3952            }
3953
3954            if async_ {
3955                ReturnCode::Blocked
3956            } else {
3957                let handle = store
3958                    .concurrent_state_mut()
3959                    .get_mut(transmit_id)?
3960                    .write_handle;
3961                self.wait_for_write(store, handle)?
3962            }
3963        } else {
3964            ReturnCode::Cancelled(0)
3965        };
3966
3967        if !matches!(code, ReturnCode::Blocked) {
3968            let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3969
3970            match &transmit.write {
3971                WriteState::GuestReady { .. } => {
3972                    transmit.write = WriteState::Open;
3973                }
3974                WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
3975                WriteState::Open | WriteState::Dropped => {}
3976            }
3977        }
3978
3979        log::trace!("cancelled write {transmit_id:?}: {code:?}");
3980
3981        Ok(code)
3982    }
3983
3984    fn wait_for_read(
3985        self,
3986        store: &mut StoreOpaque,
3987        handle: TableId<TransmitHandle>,
3988    ) -> Result<ReturnCode> {
3989        let waitable = Waitable::Transmit(handle);
3990        store.wait_for_event(waitable)?;
3991        let event = waitable.take_event(store.concurrent_state_mut())?;
3992        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3993            event
3994        {
3995            waitable.on_delivery(store, self, event)?;
3996            Ok(code)
3997        } else {
3998            bail_bug!("expected either a stream or future read event")
3999        }
4000    }
4001
4002    /// Cancel a pending stream or future read.
4003    fn cancel_read(
4004        self,
4005        store: &mut StoreOpaque,
4006        transmit_id: TableId<TransmitState>,
4007        async_: bool,
4008    ) -> Result<ReturnCode> {
4009        let state = store.concurrent_state_mut();
4010        let transmit = state.get_mut(transmit_id)?;
4011        log::trace!(
4012            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
4013            transmit.read,
4014            transmit.write
4015        );
4016
4017        let code = if let Some(event) =
4018            Waitable::Transmit(transmit.read_handle).take_event(state)?
4019        {
4020            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
4021                bail_bug!("expected either a stream or future read event")
4022            };
4023            match (code, event) {
4024                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
4025                    ReturnCode::Cancelled(count)
4026                }
4027                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
4028                _ => bail_bug!("unexpected code/event combo"),
4029            }
4030        } else if let WriteState::HostReady {
4031            cancel,
4032            cancel_waker,
4033            ..
4034        } = &mut state.get_mut(transmit_id)?.write
4035        {
4036            *cancel = true;
4037            if let Some(waker) = cancel_waker.take() {
4038                waker.wake();
4039            }
4040
4041            if async_ {
4042                ReturnCode::Blocked
4043            } else {
4044                let handle = store
4045                    .concurrent_state_mut()
4046                    .get_mut(transmit_id)?
4047                    .read_handle;
4048                self.wait_for_read(store, handle)?
4049            }
4050        } else {
4051            ReturnCode::Cancelled(0)
4052        };
4053
4054        if !matches!(code, ReturnCode::Blocked) {
4055            let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4056
4057            match &transmit.read {
4058                ReadState::GuestReady { .. } => {
4059                    transmit.read = ReadState::Open;
4060                }
4061                ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4062                    bail_bug!("support host read cancellation")
4063                }
4064                ReadState::Open | ReadState::Dropped => {}
4065            }
4066        }
4067
4068        log::trace!("cancelled read {transmit_id:?}: {code:?}");
4069
4070        Ok(code)
4071    }
4072
4073    /// Cancel a pending write for the specified stream or future from the guest.
4074    fn guest_cancel_write(
4075        self,
4076        store: &mut StoreOpaque,
4077        ty: TransmitIndex,
4078        async_: bool,
4079        writer: u32,
4080    ) -> Result<ReturnCode> {
4081        if !async_ {
4082            // The caller may only sync call `{stream,future}.cancel-write` from
4083            // an async task (i.e. a task created via a call to an async
4084            // export).  Otherwise, we'll trap.
4085            store.check_blocking()?;
4086        }
4087
4088        let (rep, state) =
4089            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
4090        let id = TableId::<TransmitHandle>::new(rep);
4091        log::trace!("guest cancel write {id:?} (handle {writer})");
4092        match state {
4093            TransmitLocalState::Write { .. } => {
4094                bail!("stream or future write cancelled when no write is pending")
4095            }
4096            TransmitLocalState::Read { .. } => {
4097                bail!("passed read end to `{{stream|future}}.cancel-write`")
4098            }
4099            TransmitLocalState::Busy => {}
4100        }
4101        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4102        let code = self.cancel_write(store, transmit_id, async_)?;
4103        if !matches!(code, ReturnCode::Blocked) {
4104            let state =
4105                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
4106                    .1;
4107            if let TransmitLocalState::Busy = state {
4108                *state = TransmitLocalState::Write { done: false };
4109            }
4110        }
4111        Ok(code)
4112    }
4113
4114    /// Cancel a pending read for the specified stream or future from the guest.
4115    fn guest_cancel_read(
4116        self,
4117        store: &mut StoreOpaque,
4118        ty: TransmitIndex,
4119        async_: bool,
4120        reader: u32,
4121    ) -> Result<ReturnCode> {
4122        if !async_ {
4123            // The caller may only sync call `{stream,future}.cancel-read` from
4124            // an async task (i.e. a task created via a call to an async
4125            // export).  Otherwise, we'll trap.
4126            store.check_blocking()?;
4127        }
4128
4129        let (rep, state) =
4130            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
4131        let id = TableId::<TransmitHandle>::new(rep);
4132        log::trace!("guest cancel read {id:?} (handle {reader})");
4133        match state {
4134            TransmitLocalState::Read { .. } => {
4135                bail!("stream or future read cancelled when no read is pending")
4136            }
4137            TransmitLocalState::Write { .. } => {
4138                bail!("passed write end to `{{stream|future}}.cancel-read`")
4139            }
4140            TransmitLocalState::Busy => {}
4141        }
4142        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
4143        let code = self.cancel_read(store, transmit_id, async_)?;
4144        if !matches!(code, ReturnCode::Blocked) {
4145            let state =
4146                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
4147                    .1;
4148            if let TransmitLocalState::Busy = state {
4149                *state = TransmitLocalState::Read { done: false };
4150            }
4151        }
4152        Ok(code)
4153    }
4154
4155    /// Drop the readable end of the specified stream or future from the guest.
4156    fn guest_drop_readable(
4157        self,
4158        store: &mut StoreOpaque,
4159        ty: TransmitIndex,
4160        reader: u32,
4161    ) -> Result<()> {
4162        let table = self.id().get_mut(store).table_for_transmit(ty);
4163        let (rep, _is_done) = match ty {
4164            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
4165            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
4166        };
4167        let kind = match ty {
4168            TransmitIndex::Stream(_) => TransmitKind::Stream,
4169            TransmitIndex::Future(_) => TransmitKind::Future,
4170        };
4171        let id = TableId::<TransmitHandle>::new(rep);
4172        log::trace!("guest_drop_readable: drop reader {id:?}");
4173        store.host_drop_reader(id, kind)
4174    }
4175
4176    /// Create a new error context for the given component.
4177    pub(crate) fn error_context_new(
4178        self,
4179        store: &mut StoreOpaque,
4180        ty: TypeComponentLocalErrorContextTableIndex,
4181        options: OptionsIndex,
4182        debug_msg_address: u32,
4183        debug_msg_len: u32,
4184    ) -> Result<u32> {
4185        let lift_ctx = &mut LiftContext::new(store, options, self);
4186        let debug_msg = String::linear_lift_from_flat(
4187            lift_ctx,
4188            InterfaceType::String,
4189            &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4190        )?;
4191
4192        // Create a new ErrorContext that is tracked along with other concurrent state
4193        let err_ctx = ErrorContextState { debug_msg };
4194        let state = store.concurrent_state_mut();
4195        let table_id = state.push(err_ctx)?;
4196        let global_ref_count_idx =
4197            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4198
4199        // Add to the global error context ref counts
4200        let _ = state
4201            .global_error_context_ref_counts
4202            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4203
4204        // Error context are tracked both locally (to a single component instance) and globally
4205        // the counts for both must stay in sync.
4206        //
4207        // Here we reflect the newly created global concurrent error context state into the
4208        // component instance's locally tracked count, along with the appropriate key into the global
4209        // ref tracking data structures to enable later lookup
4210        let local_idx = self
4211            .id()
4212            .get_mut(store)
4213            .table_for_error_context(ty)
4214            .error_context_insert(table_id.rep())?;
4215
4216        Ok(local_idx)
4217    }
4218
4219    /// Retrieve the debug message from the specified error context.
4220    pub(super) fn error_context_debug_message<T>(
4221        self,
4222        store: StoreContextMut<T>,
4223        ty: TypeComponentLocalErrorContextTableIndex,
4224        options: OptionsIndex,
4225        err_ctx_handle: u32,
4226        debug_msg_address: u32,
4227    ) -> Result<()> {
4228        // Retrieve the error context and internal debug message
4229        let handle_table_id_rep = self
4230            .id()
4231            .get_mut(store.0)
4232            .table_for_error_context(ty)
4233            .error_context_rep(err_ctx_handle)?;
4234
4235        let state = store.0.concurrent_state_mut();
4236        // Get the state associated with the error context
4237        let ErrorContextState { debug_msg } =
4238            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4239        let debug_msg = debug_msg.clone();
4240
4241        let lower_cx = &mut LowerContext::new(store, options, self);
4242        let debug_msg_address = usize::try_from(debug_msg_address)?;
4243        // Lower the string into the component's memory
4244        let offset = lower_cx
4245            .as_slice_mut()
4246            .get(debug_msg_address..)
4247            .and_then(|b| b.get(..debug_msg.bytes().len()))
4248            .map(|_| debug_msg_address)
4249            .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4250        debug_msg
4251            .as_str()
4252            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4253
4254        Ok(())
4255    }
4256
4257    /// Implements the `future.cancel-read` intrinsic.
4258    pub(crate) fn future_cancel_read(
4259        self,
4260        store: &mut StoreOpaque,
4261        ty: TypeFutureTableIndex,
4262        async_: bool,
4263        reader: u32,
4264    ) -> Result<u32> {
4265        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4266            .map(|v| v.encode())
4267    }
4268
4269    /// Implements the `future.cancel-write` intrinsic.
4270    pub(crate) fn future_cancel_write(
4271        self,
4272        store: &mut StoreOpaque,
4273        ty: TypeFutureTableIndex,
4274        async_: bool,
4275        writer: u32,
4276    ) -> Result<u32> {
4277        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4278            .map(|v| v.encode())
4279    }
4280
4281    /// Implements the `stream.cancel-read` intrinsic.
4282    pub(crate) fn stream_cancel_read(
4283        self,
4284        store: &mut StoreOpaque,
4285        ty: TypeStreamTableIndex,
4286        async_: bool,
4287        reader: u32,
4288    ) -> Result<u32> {
4289        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4290            .map(|v| v.encode())
4291    }
4292
4293    /// Implements the `stream.cancel-write` intrinsic.
4294    pub(crate) fn stream_cancel_write(
4295        self,
4296        store: &mut StoreOpaque,
4297        ty: TypeStreamTableIndex,
4298        async_: bool,
4299        writer: u32,
4300    ) -> Result<u32> {
4301        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4302            .map(|v| v.encode())
4303    }
4304
4305    /// Implements the `future.drop-readable` intrinsic.
4306    pub(crate) fn future_drop_readable(
4307        self,
4308        store: &mut StoreOpaque,
4309        ty: TypeFutureTableIndex,
4310        reader: u32,
4311    ) -> Result<()> {
4312        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4313    }
4314
4315    /// Implements the `stream.drop-readable` intrinsic.
4316    pub(crate) fn stream_drop_readable(
4317        self,
4318        store: &mut StoreOpaque,
4319        ty: TypeStreamTableIndex,
4320        reader: u32,
4321    ) -> Result<()> {
4322        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4323    }
4324
4325    /// Allocate a new future or stream and grant ownership of both the read and
4326    /// write ends to the (sub-)component instance to which the specified
4327    /// `TransmitIndex` belongs.
4328    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4329        let (write, read) = store
4330            .concurrent_state_mut()
4331            .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4332
4333        let table = self.id().get_mut(store).table_for_transmit(ty);
4334        let (read_handle, write_handle) = match ty {
4335            TransmitIndex::Future(ty) => (
4336                table.future_insert_read(ty, read.rep())?,
4337                table.future_insert_write(ty, write.rep())?,
4338            ),
4339            TransmitIndex::Stream(ty) => (
4340                table.stream_insert_read(ty, read.rep())?,
4341                table.stream_insert_write(ty, write.rep())?,
4342            ),
4343        };
4344
4345        let state = store.concurrent_state_mut();
4346        state.get_mut(read)?.common.handle = Some(read_handle);
4347        state.get_mut(write)?.common.handle = Some(write_handle);
4348
4349        Ok(ResourcePair {
4350            write: write_handle,
4351            read: read_handle,
4352        })
4353    }
4354
4355    /// Drop the specified error context.
4356    pub(crate) fn error_context_drop(
4357        self,
4358        store: &mut StoreOpaque,
4359        ty: TypeComponentLocalErrorContextTableIndex,
4360        error_context: u32,
4361    ) -> Result<()> {
4362        let instance = self.id().get_mut(store);
4363
4364        let local_handle_table = instance.table_for_error_context(ty);
4365
4366        let rep = local_handle_table.error_context_drop(error_context)?;
4367
4368        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4369
4370        let state = store.concurrent_state_mut();
4371        let Some(GlobalErrorContextRefCount(global_ref_count)) = state
4372            .global_error_context_ref_counts
4373            .get_mut(&global_ref_count_idx)
4374        else {
4375            bail_bug!("retrieve concurrent state for error context during drop")
4376        };
4377
4378        // Reduce the component-global ref count, removing tracking if necessary
4379        if *global_ref_count < 1 {
4380            bail_bug!("ref count unexpectedly zero");
4381        }
4382        *global_ref_count -= 1;
4383        if *global_ref_count == 0 {
4384            state
4385                .global_error_context_ref_counts
4386                .remove(&global_ref_count_idx);
4387
4388            state
4389                .delete(TableId::<ErrorContextState>::new(rep))
4390                .context("deleting component-global error context data")?;
4391        }
4392
4393        Ok(())
4394    }
4395
4396    /// Transfer ownership of the specified stream or future read end from one
4397    /// guest to another.
4398    fn guest_transfer(
4399        self,
4400        store: &mut StoreOpaque,
4401        src_idx: u32,
4402        src: TransmitIndex,
4403        dst: TransmitIndex,
4404    ) -> Result<u32> {
4405        let mut instance = self.id().get_mut(store);
4406        let src_table = instance.as_mut().table_for_transmit(src);
4407        let (rep, is_done) = match src {
4408            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4409            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4410        };
4411        if is_done {
4412            bail!("cannot lift after being notified that the writable end dropped");
4413        }
4414        let dst_table = instance.table_for_transmit(dst);
4415        let handle = match dst {
4416            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4417            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4418        }?;
4419        store
4420            .concurrent_state_mut()
4421            .get_mut(TableId::<TransmitHandle>::new(rep))?
4422            .common
4423            .handle = Some(handle);
4424        Ok(handle)
4425    }
4426
4427    /// Implements the `future.new` intrinsic.
4428    pub(crate) fn future_new(
4429        self,
4430        store: &mut StoreOpaque,
4431        ty: TypeFutureTableIndex,
4432    ) -> Result<ResourcePair> {
4433        self.guest_new(store, TransmitIndex::Future(ty))
4434    }
4435
4436    /// Implements the `stream.new` intrinsic.
4437    pub(crate) fn stream_new(
4438        self,
4439        store: &mut StoreOpaque,
4440        ty: TypeStreamTableIndex,
4441    ) -> Result<ResourcePair> {
4442        self.guest_new(store, TransmitIndex::Stream(ty))
4443    }
4444
4445    /// Transfer ownership of the specified future read end from one guest to
4446    /// another.
4447    pub(crate) fn future_transfer(
4448        self,
4449        store: &mut StoreOpaque,
4450        src_idx: u32,
4451        src: TypeFutureTableIndex,
4452        dst: TypeFutureTableIndex,
4453    ) -> Result<u32> {
4454        self.guest_transfer(
4455            store,
4456            src_idx,
4457            TransmitIndex::Future(src),
4458            TransmitIndex::Future(dst),
4459        )
4460    }
4461
4462    /// Transfer ownership of the specified stream read end from one guest to
4463    /// another.
4464    pub(crate) fn stream_transfer(
4465        self,
4466        store: &mut StoreOpaque,
4467        src_idx: u32,
4468        src: TypeStreamTableIndex,
4469        dst: TypeStreamTableIndex,
4470    ) -> Result<u32> {
4471        self.guest_transfer(
4472            store,
4473            src_idx,
4474            TransmitIndex::Stream(src),
4475            TransmitIndex::Stream(dst),
4476        )
4477    }
4478
4479    /// Copy the specified error context from one component to another.
4480    pub(crate) fn error_context_transfer(
4481        self,
4482        store: &mut StoreOpaque,
4483        src_idx: u32,
4484        src: TypeComponentLocalErrorContextTableIndex,
4485        dst: TypeComponentLocalErrorContextTableIndex,
4486    ) -> Result<u32> {
4487        let mut instance = self.id().get_mut(store);
4488        let rep = instance
4489            .as_mut()
4490            .table_for_error_context(src)
4491            .error_context_rep(src_idx)?;
4492        let dst_idx = instance
4493            .table_for_error_context(dst)
4494            .error_context_insert(rep)?;
4495
4496        // Update the global (cross-subcomponent) count for error contexts
4497        // as the new component has essentially created a new reference that will
4498        // be dropped/handled independently
4499        let global_ref_count = store
4500            .concurrent_state_mut()
4501            .global_error_context_ref_counts
4502            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4503            .context("global ref count present for existing (sub)component error context")?;
4504        global_ref_count.0 += 1;
4505
4506        Ok(dst_idx)
4507    }
4508}
4509
4510impl ComponentInstance {
4511    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4512        let (states, types) = self.instance_states();
4513        let runtime_instance = match ty {
4514            TransmitIndex::Stream(ty) => types[ty].instance,
4515            TransmitIndex::Future(ty) => types[ty].instance,
4516        };
4517        states[runtime_instance].handle_table()
4518    }
4519
4520    fn table_for_error_context(
4521        self: Pin<&mut Self>,
4522        ty: TypeComponentLocalErrorContextTableIndex,
4523    ) -> &mut HandleTable {
4524        let (states, types) = self.instance_states();
4525        let runtime_instance = types[ty].instance;
4526        states[runtime_instance].handle_table()
4527    }
4528
4529    fn get_mut_by_index(
4530        self: Pin<&mut Self>,
4531        ty: TransmitIndex,
4532        index: u32,
4533    ) -> Result<(u32, &mut TransmitLocalState)> {
4534        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4535    }
4536}
4537
4538impl ConcurrentState {
4539    fn send_write_result(
4540        &mut self,
4541        ty: TransmitIndex,
4542        id: TableId<TransmitState>,
4543        handle: u32,
4544        code: ReturnCode,
4545    ) -> Result<()> {
4546        let write_handle = self.get_mut(id)?.write_handle.rep();
4547        self.set_event(
4548            write_handle,
4549            match ty {
4550                TransmitIndex::Future(ty) => Event::FutureWrite {
4551                    code,
4552                    pending: Some((ty, handle)),
4553                },
4554                TransmitIndex::Stream(ty) => Event::StreamWrite {
4555                    code,
4556                    pending: Some((ty, handle)),
4557                },
4558            },
4559        )
4560    }
4561
4562    fn send_read_result(
4563        &mut self,
4564        ty: TransmitIndex,
4565        id: TableId<TransmitState>,
4566        handle: u32,
4567        code: ReturnCode,
4568    ) -> Result<()> {
4569        let read_handle = self.get_mut(id)?.read_handle.rep();
4570        self.set_event(
4571            read_handle,
4572            match ty {
4573                TransmitIndex::Future(ty) => Event::FutureRead {
4574                    code,
4575                    pending: Some((ty, handle)),
4576                },
4577                TransmitIndex::Stream(ty) => Event::StreamRead {
4578                    code,
4579                    pending: Some((ty, handle)),
4580                },
4581            },
4582        )
4583    }
4584
4585    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4586        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4587    }
4588
4589    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4590        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4591    }
4592
4593    /// Set or update the event for the specified waitable.
4594    ///
4595    /// If there is already an event set for this waitable, we assert that it is
4596    /// of the same variant as the new one and reuse the `ReturnCode` count and
4597    /// the `pending` field if applicable.
4598    // TODO: This is a bit awkward due to how
4599    // `Event::{Stream,Future}{Write,Read}` and
4600    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4601    // Consider updating those representations in a way that allows this
4602    // function to be simplified.
4603    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4604        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4605
4606        fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
4607            let (ReturnCode::Completed(count)
4608            | ReturnCode::Dropped(count)
4609            | ReturnCode::Cancelled(count)) = old
4610            else {
4611                bail_bug!("unexpected old return code")
4612            };
4613
4614            Ok(match new {
4615                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4616                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4617                _ => bail_bug!("unexpected new return code"),
4618            })
4619        }
4620
4621        let event = match (waitable.take_event(self)?, event) {
4622            (None, _) => event,
4623            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4624            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4625            (
4626                Some(Event::StreamWrite {
4627                    code: old_code,
4628                    pending: old_pending,
4629                }),
4630                Event::StreamWrite { code, pending },
4631            ) => Event::StreamWrite {
4632                code: update_code(old_code, code)?,
4633                pending: old_pending.or(pending),
4634            },
4635            (
4636                Some(Event::StreamRead {
4637                    code: old_code,
4638                    pending: old_pending,
4639                }),
4640                Event::StreamRead { code, pending },
4641            ) => Event::StreamRead {
4642                code: update_code(old_code, code)?,
4643                pending: old_pending.or(pending),
4644            },
4645            _ => bail_bug!("unexpected event combination"),
4646        };
4647
4648        waitable.set_event(self, Some(event))
4649    }
4650
4651    /// Allocate a new future or stream, including the `TransmitState` and the
4652    /// `TransmitHandle`s corresponding to the read and write ends.
4653    fn new_transmit(
4654        &mut self,
4655        origin: TransmitOrigin,
4656    ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4657        let state_id = self.push(TransmitState::new(origin))?;
4658
4659        let write = self.push(TransmitHandle::new(state_id))?;
4660        let read = self.push(TransmitHandle::new(state_id))?;
4661
4662        let state = self.get_mut(state_id)?;
4663        state.write_handle = write;
4664        state.read_handle = read;
4665
4666        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4667
4668        Ok((write, read))
4669    }
4670
4671    /// Delete the specified future or stream, including the read and write ends.
4672    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4673        let state = self.delete(state_id)?;
4674        self.delete(state.write_handle)?;
4675        self.delete(state.read_handle)?;
4676
4677        log::trace!(
4678            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4679            state.write_handle,
4680            state.read_handle,
4681        );
4682
4683        Ok(())
4684    }
4685}
4686
4687pub(crate) struct ResourcePair {
4688    pub(crate) write: u32,
4689    pub(crate) read: u32,
4690}
4691
4692impl Waitable {
4693    /// Handle the imminent delivery of the specified event, e.g. by updating
4694    /// the state of the stream or future.
4695    pub(super) fn on_delivery(
4696        &self,
4697        store: &mut StoreOpaque,
4698        instance: Instance,
4699        event: Event,
4700    ) -> Result<()> {
4701        match event {
4702            Event::FutureRead {
4703                pending: Some((ty, handle)),
4704                ..
4705            }
4706            | Event::FutureWrite {
4707                pending: Some((ty, handle)),
4708                ..
4709            } => {
4710                let instance = instance.id().get_mut(store);
4711                let runtime_instance = instance.component().types()[ty].instance;
4712                let (rep, state) = instance.instance_states().0[runtime_instance]
4713                    .handle_table()
4714                    .future_rep(ty, handle)?;
4715                if rep != self.rep() {
4716                    bail_bug!("unexpected rep mismatch");
4717                }
4718                if *state != TransmitLocalState::Busy {
4719                    bail_bug!("expected state to be busy");
4720                }
4721                *state = match event {
4722                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4723                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4724                    _ => bail_bug!("unexpected event for future"),
4725                };
4726            }
4727            Event::StreamRead {
4728                pending: Some((ty, handle)),
4729                code,
4730            }
4731            | Event::StreamWrite {
4732                pending: Some((ty, handle)),
4733                code,
4734            } => {
4735                let instance = instance.id().get_mut(store);
4736                let runtime_instance = instance.component().types()[ty].instance;
4737                let (rep, state) = instance.instance_states().0[runtime_instance]
4738                    .handle_table()
4739                    .stream_rep(ty, handle)?;
4740                if rep != self.rep() {
4741                    bail_bug!("unexpected rep mismatch");
4742                }
4743                if *state != TransmitLocalState::Busy {
4744                    bail_bug!("expected state to be busy");
4745                }
4746                let done = matches!(code, ReturnCode::Dropped(_));
4747                *state = match event {
4748                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
4749                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4750                    _ => bail_bug!("unexpected event for stream"),
4751                };
4752
4753                let transmit_handle = TableId::<TransmitHandle>::new(rep);
4754                let state = store.concurrent_state_mut();
4755                let transmit_id = state.get_mut(transmit_handle)?.state;
4756                let transmit = state.get_mut(transmit_id)?;
4757
4758                match event {
4759                    Event::StreamRead { .. } => {
4760                        transmit.read = ReadState::Open;
4761                    }
4762                    Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4763                    _ => bail_bug!("unexpected event for stream"),
4764                };
4765            }
4766            _ => {}
4767        }
4768        Ok(())
4769    }
4770}
4771
4772/// Determine whether an intra-component read/write is allowed for the specified
4773/// `stream` or `future` payload type according to the component model
4774/// specification.
4775fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4776    matches!(
4777        ty,
4778        None | Some(
4779            InterfaceType::S8
4780                | InterfaceType::U8
4781                | InterfaceType::S16
4782                | InterfaceType::U16
4783                | InterfaceType::S32
4784                | InterfaceType::U32
4785                | InterfaceType::S64
4786                | InterfaceType::U64
4787                | InterfaceType::Float32
4788                | InterfaceType::Float64
4789        )
4790    )
4791}
4792
4793/// Helper structure to manage moving a `T` in/out of an interior `Mutex` which
4794/// contains an
4795/// `Option<T>`
4796struct LockedState<T> {
4797    inner: Mutex<Option<T>>,
4798}
4799
4800impl<T> LockedState<T> {
4801    /// Creates a new initial state with `value` stored.
4802    fn new(value: T) -> Self {
4803        Self {
4804            inner: Mutex::new(Some(value)),
4805        }
4806    }
4807
4808    /// Attempts to lock the inner mutex and return its guard.
4809    ///
4810    /// # Errors
4811    ///
4812    /// Fails if this lock is either poisoned or if it's currently locked.
4813    /// As-used in this file there should never actually be contention on this
4814    /// lock nor recursive access so failing to acquire the lock is a fatal
4815    /// error that gets propagated upwards.
4816    fn try_lock(&self) -> Result<MutexGuard<'_, Option<T>>> {
4817        match self.inner.try_lock() {
4818            Ok(lock) => Ok(lock),
4819            Err(_) => bail_bug!("should not have contention on state lock"),
4820        }
4821    }
4822
4823    /// Takes the inner `T` out of this state, returning it as a guard which
4824    /// will put it back when finished.
4825    ///
4826    /// # Errors
4827    ///
4828    /// Returns an error if the state `T` isn't present.
4829    fn take(&self) -> Result<LockedStateGuard<'_, T>> {
4830        let result = self.try_lock()?.take();
4831        match result {
4832            Some(result) => Ok(LockedStateGuard {
4833                value: ManuallyDrop::new(result),
4834                state: self,
4835            }),
4836            None => bail_bug!("lock value unexpectedly missing"),
4837        }
4838    }
4839
4840    /// Performs the operation `f` on the inner state `&mut T`.
4841    ///
4842    /// This will acquire the internal lock and invoke `f`, so `f` should not
4843    /// expect to be able to recursively acquire this lock.
4844    ///
4845    /// # Errors
4846    ///
4847    /// Returns an error if the state `T` isn't present.
4848    fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
4849        let mut inner = self.try_lock()?;
4850        match &mut *inner {
4851            Some(state) => Ok(f(state)),
4852            None => bail_bug!("lock value unexpectedly missing"),
4853        }
4854    }
4855}
4856
4857/// Helper structure returned from [`LockedState::take`] which will put the
4858/// state specified by `value` back into the original lock once this is dropped.
4859struct LockedStateGuard<'a, T> {
4860    value: ManuallyDrop<T>,
4861    state: &'a LockedState<T>,
4862}
4863
4864impl<T> Deref for LockedStateGuard<'_, T> {
4865    type Target = T;
4866
4867    fn deref(&self) -> &T {
4868        &self.value
4869    }
4870}
4871
4872impl<T> DerefMut for LockedStateGuard<'_, T> {
4873    fn deref_mut(&mut self) -> &mut T {
4874        &mut self.value
4875    }
4876}
4877
4878impl<T> Drop for LockedStateGuard<'_, T> {
4879    fn drop(&mut self) {
4880        // SAFETY: `ManuallyDrop::take` requires that after invoked the
4881        // original value is not read. This is the `Drop` for this type which
4882        // means we have exclusive ownership and it is not read further in the
4883        // destructor, satisfying this requirement.
4884        let value = unsafe { ManuallyDrop::take(&mut self.value) };
4885
4886        // If this fails due to contention that's a bug, but we're not in a
4887        // position to panic due to this being a destructor nor return an error,
4888        // so defer the bug to showing up later.
4889        if let Ok(mut lock) = self.state.try_lock() {
4890            *lock = Some(value);
4891        }
4892    }
4893}
4894
4895#[cfg(test)]
4896mod tests {
4897    use super::*;
4898    use crate::{Engine, Store};
4899    use core::future::pending;
4900    use core::pin::pin;
4901    use std::sync::LazyLock;
4902
4903    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4904
4905    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4906    where
4907        T: FutureProducer<()>,
4908    {
4909        rx.poll_produce(
4910            &mut Context::from_waker(Waker::noop()),
4911            Store::new(&ENGINE, ()).as_context_mut(),
4912            finish,
4913        )
4914    }
4915
4916    #[test]
4917    fn future_producer() {
4918        let mut fut = pin!(async { crate::error::Ok(()) });
4919        assert!(matches!(
4920            poll_future_producer(fut.as_mut(), false),
4921            Poll::Ready(Ok(Some(()))),
4922        ));
4923
4924        let mut fut = pin!(async { crate::error::Ok(()) });
4925        assert!(matches!(
4926            poll_future_producer(fut.as_mut(), true),
4927            Poll::Ready(Ok(Some(()))),
4928        ));
4929
4930        let mut fut = pin!(pending::<Result<()>>());
4931        assert!(matches!(
4932            poll_future_producer(fut.as_mut(), false),
4933            Poll::Pending,
4934        ));
4935        assert!(matches!(
4936            poll_future_producer(fut.as_mut(), true),
4937            Poll::Ready(Ok(None)),
4938        ));
4939
4940        let (tx, rx) = oneshot::channel();
4941        let mut rx = pin!(rx);
4942        assert!(matches!(
4943            poll_future_producer(rx.as_mut(), false),
4944            Poll::Pending,
4945        ));
4946        assert!(matches!(
4947            poll_future_producer(rx.as_mut(), true),
4948            Poll::Ready(Ok(None)),
4949        ));
4950        tx.send(()).unwrap();
4951        assert!(matches!(
4952            poll_future_producer(rx.as_mut(), true),
4953            Poll::Ready(Ok(Some(()))),
4954        ));
4955
4956        let (tx, rx) = oneshot::channel();
4957        let mut rx = pin!(rx);
4958        tx.send(()).unwrap();
4959        assert!(matches!(
4960            poll_future_producer(rx.as_mut(), false),
4961            Poll::Ready(Ok(Some(()))),
4962        ));
4963
4964        let (tx, rx) = oneshot::channel::<()>();
4965        let mut rx = pin!(rx);
4966        drop(tx);
4967        assert!(matches!(
4968            poll_future_producer(rx.as_mut(), false),
4969            Poll::Ready(Err(..)),
4970        ));
4971
4972        let (tx, rx) = oneshot::channel::<()>();
4973        let mut rx = pin!(rx);
4974        drop(tx);
4975        assert!(matches!(
4976            poll_future_producer(rx.as_mut(), true),
4977            Poll::Ready(Err(..)),
4978        ));
4979    }
4980}