wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9    AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10    Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContextMut, StoreContextMut, ValRaw};
16use anyhow::{Context as _, Error, Result, anyhow, bail};
17use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
18use core::fmt;
19use core::future;
20use core::iter;
21use core::marker::PhantomData;
22use core::mem::{self, MaybeUninit};
23use core::pin::Pin;
24use core::task::{Context, Poll, Waker, ready};
25use futures::channel::oneshot;
26use futures::{FutureExt as _, stream};
27use std::any::{Any, TypeId};
28use std::boxed::Box;
29use std::io::Cursor;
30use std::string::String;
31use std::sync::{Arc, Mutex};
32use std::vec::Vec;
33use wasmtime_environ::component::{
34    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
35    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
36    TypeFutureTableIndex, TypeStreamTableIndex,
37};
38
39pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
40
41mod buffers;
42
43/// Enum for distinguishing between a stream or future in functions that handle
44/// both.
45#[derive(Copy, Clone, Debug)]
46pub enum TransmitKind {
47    Stream,
48    Future,
49}
50
51/// Represents `{stream,future}.{read,write}` results.
52#[derive(Copy, Clone, Debug, PartialEq)]
53pub enum ReturnCode {
54    Blocked,
55    Completed(u32),
56    Dropped(u32),
57    Cancelled(u32),
58}
59
60impl ReturnCode {
61    /// Pack `self` into a single 32-bit integer that may be returned to the
62    /// guest.
63    ///
64    /// This corresponds to `pack_copy_result` in the Component Model spec.
65    pub fn encode(&self) -> u32 {
66        const BLOCKED: u32 = 0xffff_ffff;
67        const COMPLETED: u32 = 0x0;
68        const DROPPED: u32 = 0x1;
69        const CANCELLED: u32 = 0x2;
70        match self {
71            ReturnCode::Blocked => BLOCKED,
72            ReturnCode::Completed(n) => {
73                debug_assert!(*n < (1 << 28));
74                (n << 4) | COMPLETED
75            }
76            ReturnCode::Dropped(n) => {
77                debug_assert!(*n < (1 << 28));
78                (n << 4) | DROPPED
79            }
80            ReturnCode::Cancelled(n) => {
81                debug_assert!(*n < (1 << 28));
82                (n << 4) | CANCELLED
83            }
84        }
85    }
86
87    /// Returns `Self::Completed` with the specified count (or zero if
88    /// `matches!(kind, TransmitKind::Future)`)
89    fn completed(kind: TransmitKind, count: u32) -> Self {
90        Self::Completed(if let TransmitKind::Future = kind {
91            0
92        } else {
93            count
94        })
95    }
96}
97
98/// Represents a stream or future type index.
99///
100/// This is useful as a parameter type for functions which operate on either a
101/// future or a stream.
102#[derive(Copy, Clone, Debug)]
103pub enum TransmitIndex {
104    Stream(TypeStreamTableIndex),
105    Future(TypeFutureTableIndex),
106}
107
108impl TransmitIndex {
109    pub fn kind(&self) -> TransmitKind {
110        match self {
111            TransmitIndex::Stream(_) => TransmitKind::Stream,
112            TransmitIndex::Future(_) => TransmitKind::Future,
113        }
114    }
115}
116
117/// Retrieve the payload type of the specified stream or future, or `None` if it
118/// has no payload type.
119fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
120    match ty {
121        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
122        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
123    }
124}
125
126/// Retrieve the host rep and state for the specified guest-visible waitable
127/// handle.
128fn get_mut_by_index_from(
129    handle_table: &mut HandleTable,
130    ty: TransmitIndex,
131    index: u32,
132) -> Result<(u32, &mut TransmitLocalState)> {
133    match ty {
134        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
135        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
136    }
137}
138
139fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
140    mut store: StoreContextMut<U>,
141    instance: Instance,
142    options: OptionsIndex,
143    ty: TransmitIndex,
144    address: usize,
145    count: usize,
146    buffer: &mut B,
147) -> Result<()> {
148    let count = buffer.remaining().len().min(count);
149
150    let lower = &mut if T::MAY_REQUIRE_REALLOC {
151        LowerContext::new
152    } else {
153        LowerContext::new_without_realloc
154    }(store.as_context_mut(), options, instance);
155
156    if address % usize::try_from(T::ALIGN32)? != 0 {
157        bail!("read pointer not aligned");
158    }
159    lower
160        .as_slice_mut()
161        .get_mut(address..)
162        .and_then(|b| b.get_mut(..T::SIZE32 * count))
163        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
164
165    if let Some(ty) = payload(ty, lower.types) {
166        T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
167    }
168
169    buffer.skip(count);
170
171    Ok(())
172}
173
174fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
175    lift: &mut LiftContext<'_>,
176    ty: Option<InterfaceType>,
177    buffer: &mut B,
178    address: usize,
179    count: usize,
180) -> Result<()> {
181    let count = count.min(buffer.remaining_capacity());
182    if T::IS_RUST_UNIT_TYPE {
183        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
184        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
185        // is a valid way to populate the zero-sized buffer.
186        buffer.extend(
187            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
188        )
189    } else {
190        let ty = ty.unwrap();
191        if address % usize::try_from(T::ALIGN32)? != 0 {
192            bail!("write pointer not aligned");
193        }
194        lift.memory()
195            .get(address..)
196            .and_then(|b| b.get(..T::SIZE32 * count))
197            .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
198
199        let list = &WasmList::new(address, count, lift, ty)?;
200        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
201    }
202    Ok(())
203}
204
205/// Represents the state associated with an error context
206#[derive(Debug, PartialEq, Eq, PartialOrd)]
207pub(super) struct ErrorContextState {
208    /// Debug message associated with the error context
209    pub(crate) debug_msg: String,
210}
211
212/// Represents the size and alignment for a "flat" Component Model type,
213/// i.e. one containing no pointers or handles.
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub(super) struct FlatAbi {
216    pub(super) size: u32,
217    pub(super) align: u32,
218}
219
220/// Represents the buffer for a host- or guest-initiated stream read.
221pub struct Destination<'a, T, B> {
222    id: TableId<TransmitState>,
223    buffer: &'a mut B,
224    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
225    _phantom: PhantomData<fn() -> T>,
226}
227
228impl<'a, T, B> Destination<'a, T, B> {
229    /// Reborrow `self` so it can be used again later.
230    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
231        Destination {
232            id: self.id,
233            buffer: &mut *self.buffer,
234            host_buffer: self.host_buffer.as_deref_mut(),
235            _phantom: PhantomData,
236        }
237    }
238
239    /// Take the buffer out of `self`, leaving a default-initialized one in its
240    /// place.
241    ///
242    /// This can be useful for reusing the previously-stored buffer's capacity
243    /// instead of allocating a fresh one.
244    pub fn take_buffer(&mut self) -> B
245    where
246        B: Default,
247    {
248        mem::take(self.buffer)
249    }
250
251    /// Store the specified buffer in `self`.
252    ///
253    /// Any items contained in the buffer will be delivered to the reader after
254    /// the `StreamProducer::poll_produce` call to which this `Destination` was
255    /// passed returns (unless overwritten by another call to `set_buffer`).
256    ///
257    /// If items are stored via this buffer _and_ written via a
258    /// `DirectDestination` view of `self`, then the items in the buffer will be
259    /// delivered after the ones written using `DirectDestination`.
260    pub fn set_buffer(&mut self, buffer: B) {
261        *self.buffer = buffer;
262    }
263
264    /// Return the remaining number of items the current read has capacity to
265    /// accept, if known.
266    ///
267    /// This will return `Some(_)` if the reader is a guest; it will return
268    /// `None` if the reader is the host.
269    ///
270    /// Note that this can return `Some(0)`. This means that the guest is
271    /// attempting to perform a zero-length read which typically means that it's
272    /// trying to wait for this stream to be ready-to-read but is not actually
273    /// ready to receive the items yet. The host in this case is allowed to
274    /// either block waiting for readiness or immediately complete the
275    /// operation. The guest is expected to handle both cases. Some more
276    /// discussion about this case can be found in the discussion of ["Stream
277    /// Readiness" in the component-model repo][docs].
278    ///
279    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
280    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
281        let transmit = store
282            .as_context_mut()
283            .0
284            .concurrent_state_mut()
285            .get_mut(self.id)
286            .unwrap();
287
288        if let &ReadState::GuestReady { count, .. } = &transmit.read {
289            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
290                unreachable!()
291            };
292
293            Some(count - guest_offset)
294        } else {
295            None
296        }
297    }
298}
299
300impl<'a, B> Destination<'a, u8, B> {
301    /// Return a `DirectDestination` view of `self`.
302    ///
303    /// If the reader is a guest, this will provide direct access to the guest's
304    /// read buffer.  If the reader is a host, this will provide access to a
305    /// buffer which will be delivered to the host before any items stored using
306    /// `Destination::set_buffer`.
307    ///
308    /// `capacity` will only be used if the reader is a host, in which case it
309    /// will update the length of the buffer, possibly zero-initializing the new
310    /// elements if the new length is larger than the old length.
311    pub fn as_direct<D>(
312        mut self,
313        store: StoreContextMut<'a, D>,
314        capacity: usize,
315    ) -> DirectDestination<'a, D> {
316        if let Some(buffer) = self.host_buffer.as_deref_mut() {
317            buffer.set_position(0);
318            if buffer.get_mut().is_empty() {
319                buffer.get_mut().resize(capacity, 0);
320            }
321        }
322
323        DirectDestination {
324            id: self.id,
325            host_buffer: self.host_buffer,
326            store,
327        }
328    }
329}
330
331/// Represents a read from a `stream<u8>`, providing direct access to the
332/// writer's buffer.
333pub struct DirectDestination<'a, D: 'static> {
334    id: TableId<TransmitState>,
335    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
336    store: StoreContextMut<'a, D>,
337}
338
339impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
340    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
341        let rem = self.remaining();
342        let n = rem.len().min(buf.len());
343        rem[..n].copy_from_slice(&buf[..n]);
344        self.mark_written(n);
345        Ok(n)
346    }
347
348    fn flush(&mut self) -> std::io::Result<()> {
349        Ok(())
350    }
351}
352
353impl<D: 'static> DirectDestination<'_, D> {
354    /// Provide direct access to the writer's buffer.
355    pub fn remaining(&mut self) -> &mut [u8] {
356        if let Some(buffer) = self.host_buffer.as_deref_mut() {
357            buffer.get_mut()
358        } else {
359            let transmit = self
360                .store
361                .as_context_mut()
362                .0
363                .concurrent_state_mut()
364                .get_mut(self.id)
365                .unwrap();
366
367            let &ReadState::GuestReady {
368                address,
369                count,
370                options,
371                instance,
372                ..
373            } = &transmit.read
374            else {
375                unreachable!();
376            };
377
378            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
379                unreachable!()
380            };
381
382            instance
383                .options_memory_mut(self.store.0, options)
384                .get_mut((address + guest_offset)..)
385                .and_then(|b| b.get_mut(..(count - guest_offset)))
386                .unwrap()
387        }
388    }
389
390    /// Mark the specified number of bytes as written to the writer's buffer.
391    ///
392    /// This will panic if the count is larger than the size of the
393    /// buffer returned by `Self::remaining`.
394    pub fn mark_written(&mut self, count: usize) {
395        if let Some(buffer) = self.host_buffer.as_deref_mut() {
396            buffer.set_position(
397                buffer
398                    .position()
399                    .checked_add(u64::try_from(count).unwrap())
400                    .unwrap(),
401            );
402        } else {
403            let transmit = self
404                .store
405                .as_context_mut()
406                .0
407                .concurrent_state_mut()
408                .get_mut(self.id)
409                .unwrap();
410
411            let ReadState::GuestReady {
412                count: read_count, ..
413            } = &transmit.read
414            else {
415                unreachable!();
416            };
417
418            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
419                unreachable!()
420            };
421
422            if *guest_offset + count > *read_count {
423                panic!(
424                    "write count ({count}) must be less than or equal to read count ({read_count})"
425                )
426            } else {
427                *guest_offset += count;
428            }
429        }
430    }
431}
432
433/// Represents the state of a `Stream{Producer,Consumer}`.
434#[derive(Copy, Clone, Debug)]
435pub enum StreamResult {
436    /// The operation completed normally, and the producer or consumer may be
437    /// able to produce or consume more items, respectively.
438    Completed,
439    /// The operation was interrupted (i.e. it wrapped up early after receiving
440    /// a `finish` parameter value of true in a call to `poll_produce` or
441    /// `poll_consume`), and the producer or consumer may be able to produce or
442    /// consume more items, respectively.
443    Cancelled,
444    /// The operation completed normally, but the producer or consumer will
445    /// _not_ able to produce or consume more items, respectively.
446    Dropped,
447}
448
449/// Represents the host-owned write end of a stream.
450pub trait StreamProducer<D>: Send + 'static {
451    /// The payload type of this stream.
452    type Item;
453
454    /// The `WriteBuffer` type to use when delivering items.
455    type Buffer: WriteBuffer<Self::Item> + Default;
456
457    /// Handle a host- or guest-initiated read by delivering zero or more items
458    /// to the specified destination.
459    ///
460    /// This will be called whenever the reader starts a read.
461    ///
462    /// # Arguments
463    ///
464    /// * `self` - a `Pin`'d version of self to perform Rust-level
465    ///   future-related operations on.
466    /// * `cx` - a Rust-related [`Context`] which is passed to other
467    ///   future-related operations or used to acquire a waker.
468    /// * `store` - the Wasmtime store that this operation is happening within.
469    ///   Used, for example, to consult the state `D` associated with the store.
470    /// * `destination` - the location that items are to be written to.
471    /// * `finish` - a flag indicating whether the host should strive to
472    ///   immediately complete/cancel any pending operation. See below for more
473    ///   details.
474    ///
475    /// # Behavior
476    ///
477    /// If the implementation is able to produce one or more items immediately,
478    /// it should write them to `destination` and return either
479    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
480    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
481    /// any more items.
482    ///
483    /// If the implementation is unable to produce any items immediately, but
484    /// expects to do so later, and `finish` is _false_, it should store the
485    /// waker from `cx` for later and return `Poll::Pending` without writing
486    /// anything to `destination`.  Later, it should alert the waker when either
487    /// the items arrive, the stream has ended, or an error occurs.
488    ///
489    /// If more items are written to `destination` than the reader has immediate
490    /// capacity to accept, they will be retained in memory by the caller and
491    /// used to satisfy future reads, in which case `poll_produce` will only be
492    /// called again once all those items have been delivered.
493    ///
494    /// # Zero-length reads
495    ///
496    /// This function may be called with a zero-length capacity buffer
497    /// (i.e. `Destination::remaining` returns `Some(0)`). This indicates that
498    /// the guest wants to wait to see if an item is ready without actually
499    /// reading the item. For example think of a UNIX `poll` function run on a
500    /// TCP stream, seeing if it's readable without actually reading it.
501    ///
502    /// In this situation the host is allowed to either return immediately or
503    /// wait for readiness. Note that waiting for readiness is not always
504    /// possible. For example it's impossible to test if a Rust-native `Future`
505    /// is ready without actually reading the item. Stream-specific
506    /// optimizations, such as testing if a TCP stream is readable, may be
507    /// possible however.
508    ///
509    /// For a zero-length read, the host is allowed to:
510    ///
511    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
512    ///   anything if it expects to be able to produce items immediately (i.e.
513    ///   without first returning `Poll::Pending`) the next time `poll_produce`
514    ///   is called with non-zero capacity. This is the best-case scenario of
515    ///   fulfilling the guest's desire -- items aren't read/buffered but the
516    ///   host is saying it's ready when the guest is.
517    ///
518    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without actually
519    ///   testing for readiness. The guest doesn't know this yet, but the guest
520    ///   will realize that zero-length reads won't work on this stream when a
521    ///   subsequent nonzero read attempt is made which returns `Poll::Pending`
522    ///   here.
523    ///
524    /// - Return `Poll::Pending` if the host has performed necessary async work
525    ///   to wait for this stream to be readable without actually reading
526    ///   anything. This is also a best-case scenario where the host is letting
527    ///   the guest know that nothing is ready yet. Later the zero-length read
528    ///   will complete and then the guest will attempt a nonzero-length read to
529    ///   actually read some bytes.
530    ///
531    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
532    ///   `Destination::set_buffer` with one more more items. Note, however,
533    ///   that this creates the hazard that the items will never be received by
534    ///   the guest if it decides not to do another non-zero-length read before
535    ///   closing the stream.  Moreover, if `Self::Item` is e.g. a
536    ///   `Resource<_>`, they may end up leaking in that scenario. It is not
537    ///   recommended to do this and it's better to return
538    ///   `StreamResult::Completed` without buffering anything instead.
539    ///
540    /// For more discussion on zero-length reads see the [documentation in the
541    /// component-model repo itself][docs].
542    ///
543    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
544    ///
545    /// # Return
546    ///
547    /// This function can return a number of possible cases from this function:
548    ///
549    /// * `Poll::Pending` - this operation cannot complete at this time. The
550    ///   Rust-level `Future::poll` contract applies here where a waker should
551    ///   be stored from the `cx` argument and be arranged to receive a
552    ///   notification when this implementation can make progress. For example
553    ///   if you call `Future::poll` on a sub-future, that's enough. If items
554    ///   were written to `destination` then a trap in the guest will be raised.
555    ///
556    ///   Note that implementations should strive to avoid this return value
557    ///   when `finish` is `true`. In such a situation the guest is attempting
558    ///   to, for example, cancel a previous operation. By returning
559    ///   `Poll::Pending` the guest will be blocked during the cancellation
560    ///   request. If `finish` is `true` then `StreamResult::Cancelled` is
561    ///   favored to indicate that no items were read. If a short read happened,
562    ///   however, it's ok to return `StreamResult::Completed` indicating some
563    ///   items were read.
564    ///
565    /// * `Poll::Ok(StreamResult::Completed)` - items, if applicable, were
566    ///   written to the `destination`.
567    ///
568    /// * `Poll::Ok(StreamResult::Cancelled)` - used when `finish` is `true` and
569    ///   the implementation was able to successfully cancel any async work that
570    ///   a previous read kicked off, if any. The host should not buffer values
571    ///   received after returning `Cancelled` because the guest will not be
572    ///   aware of these values and the guest could close the stream after
573    ///   cancelling a read. Hosts should only return `Cancelled` when there are
574    ///   no more async operations in flight for a previous read.
575    ///
576    ///   If items were written to `destination` then a trap in the guest will
577    ///   be raised. If `finish` is `false` then this return value will raise a
578    ///   trap in the guest.
579    ///
580    /// * `Poll::Ok(StreamResult::Dropped)` - end-of-stream marker, indicating
581    ///   that this producer should not be polled again. Note that items may
582    ///   still be written to `destination`.
583    ///
584    /// # Errors
585    ///
586    /// The implementation may alternatively choose to return `Err(_)` to
587    /// indicate an unrecoverable error. This will cause the guest (if any) to
588    /// trap and render the component instance (if any) unusable. The
589    /// implementation should report errors that _are_ recoverable by other
590    /// means (e.g. by writing to a `future`) and return
591    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
592    fn poll_produce<'a>(
593        self: Pin<&mut Self>,
594        cx: &mut Context<'_>,
595        store: StoreContextMut<'a, D>,
596        destination: Destination<'a, Self::Item, Self::Buffer>,
597        finish: bool,
598    ) -> Poll<Result<StreamResult>>;
599
600    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
601    /// be downcast to the specified type.
602    ///
603    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
604    /// to the specified type is guaranteed to succeed.
605    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
606        Err(me)
607    }
608}
609
610impl<T, D> StreamProducer<D> for iter::Empty<T>
611where
612    T: Send + Sync + 'static,
613{
614    type Item = T;
615    type Buffer = Option<Self::Item>;
616
617    fn poll_produce<'a>(
618        self: Pin<&mut Self>,
619        _: &mut Context<'_>,
620        _: StoreContextMut<'a, D>,
621        _: Destination<'a, Self::Item, Self::Buffer>,
622        _: bool,
623    ) -> Poll<Result<StreamResult>> {
624        Poll::Ready(Ok(StreamResult::Dropped))
625    }
626}
627
628impl<T, D> StreamProducer<D> for stream::Empty<T>
629where
630    T: Send + Sync + 'static,
631{
632    type Item = T;
633    type Buffer = Option<Self::Item>;
634
635    fn poll_produce<'a>(
636        self: Pin<&mut Self>,
637        _: &mut Context<'_>,
638        _: StoreContextMut<'a, D>,
639        _: Destination<'a, Self::Item, Self::Buffer>,
640        _: bool,
641    ) -> Poll<Result<StreamResult>> {
642        Poll::Ready(Ok(StreamResult::Dropped))
643    }
644}
645
646impl<T, D> StreamProducer<D> for Vec<T>
647where
648    T: Unpin + Send + Sync + 'static,
649{
650    type Item = T;
651    type Buffer = VecBuffer<T>;
652
653    fn poll_produce<'a>(
654        self: Pin<&mut Self>,
655        _: &mut Context<'_>,
656        _: StoreContextMut<'a, D>,
657        mut dst: Destination<'a, Self::Item, Self::Buffer>,
658        _: bool,
659    ) -> Poll<Result<StreamResult>> {
660        dst.set_buffer(mem::take(self.get_mut()).into());
661        Poll::Ready(Ok(StreamResult::Dropped))
662    }
663}
664
665impl<T, D> StreamProducer<D> for Box<[T]>
666where
667    T: Unpin + Send + Sync + 'static,
668{
669    type Item = T;
670    type Buffer = VecBuffer<T>;
671
672    fn poll_produce<'a>(
673        self: Pin<&mut Self>,
674        _: &mut Context<'_>,
675        _: StoreContextMut<'a, D>,
676        mut dst: Destination<'a, Self::Item, Self::Buffer>,
677        _: bool,
678    ) -> Poll<Result<StreamResult>> {
679        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
680        Poll::Ready(Ok(StreamResult::Dropped))
681    }
682}
683
684#[cfg(feature = "component-model-async-bytes")]
685impl<D> StreamProducer<D> for bytes::Bytes {
686    type Item = u8;
687    type Buffer = Cursor<Self>;
688
689    fn poll_produce<'a>(
690        mut self: Pin<&mut Self>,
691        _: &mut Context<'_>,
692        mut store: StoreContextMut<'a, D>,
693        mut dst: Destination<'a, Self::Item, Self::Buffer>,
694        _: bool,
695    ) -> Poll<Result<StreamResult>> {
696        let cap = dst.remaining(&mut store);
697        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
698            // on 0-length or host reads, buffer the bytes
699            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
700            return Poll::Ready(Ok(StreamResult::Dropped));
701        };
702        let cap = cap.into();
703        // data does not fit in destination, fill it and buffer the rest
704        dst.set_buffer(Cursor::new(self.split_off(cap)));
705        let mut dst = dst.as_direct(store, cap);
706        dst.remaining().copy_from_slice(&self);
707        dst.mark_written(cap);
708        Poll::Ready(Ok(StreamResult::Dropped))
709    }
710}
711
712#[cfg(feature = "component-model-async-bytes")]
713impl<D> StreamProducer<D> for bytes::BytesMut {
714    type Item = u8;
715    type Buffer = Cursor<Self>;
716
717    fn poll_produce<'a>(
718        mut self: Pin<&mut Self>,
719        _: &mut Context<'_>,
720        mut store: StoreContextMut<'a, D>,
721        mut dst: Destination<'a, Self::Item, Self::Buffer>,
722        _: bool,
723    ) -> Poll<Result<StreamResult>> {
724        let cap = dst.remaining(&mut store);
725        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
726            // on 0-length or host reads, buffer the bytes
727            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
728            return Poll::Ready(Ok(StreamResult::Dropped));
729        };
730        let cap = cap.into();
731        // data does not fit in destination, fill it and buffer the rest
732        dst.set_buffer(Cursor::new(self.split_off(cap)));
733        let mut dst = dst.as_direct(store, cap);
734        dst.remaining().copy_from_slice(&self);
735        dst.mark_written(cap);
736        Poll::Ready(Ok(StreamResult::Dropped))
737    }
738}
739
740/// Represents the buffer for a host- or guest-initiated stream write.
741pub struct Source<'a, T> {
742    id: TableId<TransmitState>,
743    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
744}
745
746impl<'a, T> Source<'a, T> {
747    /// Reborrow `self` so it can be used again later.
748    pub fn reborrow(&mut self) -> Source<'_, T> {
749        Source {
750            id: self.id,
751            host_buffer: self.host_buffer.as_deref_mut(),
752        }
753    }
754
755    /// Accept zero or more items from the writer.
756    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
757    where
758        T: func::Lift + 'static,
759        B: ReadBuffer<T>,
760    {
761        if let Some(input) = &mut self.host_buffer {
762            let count = input.remaining().len().min(buffer.remaining_capacity());
763            buffer.move_from(*input, count);
764        } else {
765            let store = store.as_context_mut();
766            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
767
768            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
769                unreachable!();
770            };
771
772            let &WriteState::GuestReady {
773                ty,
774                address,
775                count,
776                options,
777                instance,
778                ..
779            } = &transmit.write
780            else {
781                unreachable!()
782            };
783
784            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
785            let ty = payload(ty, cx.types);
786            let old_remaining = buffer.remaining_capacity();
787            lift::<T, B>(
788                cx,
789                ty,
790                buffer,
791                address + (T::SIZE32 * guest_offset),
792                count - guest_offset,
793            )?;
794
795            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
796
797            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
798                unreachable!();
799            };
800
801            *guest_offset += old_remaining - buffer.remaining_capacity();
802        }
803
804        Ok(())
805    }
806
807    /// Return the number of items remaining to be read from the current write
808    /// operation.
809    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
810    where
811        T: 'static,
812    {
813        let transmit = store
814            .as_context_mut()
815            .0
816            .concurrent_state_mut()
817            .get_mut(self.id)
818            .unwrap();
819
820        if let &WriteState::GuestReady { count, .. } = &transmit.write {
821            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
822                unreachable!()
823            };
824
825            count - guest_offset
826        } else if let Some(host_buffer) = &self.host_buffer {
827            host_buffer.remaining().len()
828        } else {
829            unreachable!()
830        }
831    }
832}
833
834impl<'a> Source<'a, u8> {
835    /// Return a `DirectSource` view of `self`.
836    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
837        DirectSource {
838            id: self.id,
839            host_buffer: self.host_buffer,
840            store,
841        }
842    }
843}
844
845/// Represents a write to a `stream<u8>`, providing direct access to the
846/// writer's buffer.
847pub struct DirectSource<'a, D: 'static> {
848    id: TableId<TransmitState>,
849    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
850    store: StoreContextMut<'a, D>,
851}
852
853impl<D: 'static> std::io::Read for DirectSource<'_, D> {
854    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
855        let rem = self.remaining();
856        let n = rem.len().min(buf.len());
857        buf[..n].copy_from_slice(&rem[..n]);
858        self.mark_read(n);
859        Ok(n)
860    }
861}
862
863impl<D: 'static> DirectSource<'_, D> {
864    /// Provide direct access to the writer's buffer.
865    pub fn remaining(&mut self) -> &[u8] {
866        if let Some(buffer) = self.host_buffer.as_deref_mut() {
867            buffer.remaining()
868        } else {
869            let transmit = self
870                .store
871                .as_context_mut()
872                .0
873                .concurrent_state_mut()
874                .get_mut(self.id)
875                .unwrap();
876
877            let &WriteState::GuestReady {
878                address,
879                count,
880                options,
881                instance,
882                ..
883            } = &transmit.write
884            else {
885                unreachable!()
886            };
887
888            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
889                unreachable!()
890            };
891
892            instance
893                .options_memory(self.store.0, options)
894                .get((address + guest_offset)..)
895                .and_then(|b| b.get(..(count - guest_offset)))
896                .unwrap()
897        }
898    }
899
900    /// Mark the specified number of bytes as read from the writer's buffer.
901    ///
902    /// This will panic if the count is larger than the size of the buffer
903    /// returned by `Self::remaining`.
904    pub fn mark_read(&mut self, count: usize) {
905        if let Some(buffer) = self.host_buffer.as_deref_mut() {
906            buffer.skip(count);
907        } else {
908            let transmit = self
909                .store
910                .as_context_mut()
911                .0
912                .concurrent_state_mut()
913                .get_mut(self.id)
914                .unwrap();
915
916            let WriteState::GuestReady {
917                count: write_count, ..
918            } = &transmit.write
919            else {
920                unreachable!()
921            };
922
923            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
924                unreachable!()
925            };
926
927            if *guest_offset + count > *write_count {
928                panic!(
929                    "read count ({count}) must be less than or equal to write count ({write_count})"
930                )
931            } else {
932                *guest_offset += count;
933            }
934        }
935    }
936}
937
938/// Represents the host-owned read end of a stream.
939pub trait StreamConsumer<D>: Send + 'static {
940    /// The payload type of this stream.
941    type Item;
942
943    /// Handle a host- or guest-initiated write by accepting zero or more items
944    /// from the specified source.
945    ///
946    /// This will be called whenever the writer starts a write.
947    ///
948    /// If the implementation is able to consume one or more items immediately,
949    /// it should take them from `source` and return either
950    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
951    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
952    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
953    /// indicate that the caller should delay sending a `COMPLETED` event to the
954    /// writer until a later call to this function returns `Poll::Ready(_)`.
955    /// For more about that, see the `Backpressure` section below.
956    ///
957    /// If the implementation cannot consume any items immediately and `finish`
958    /// is _false_, it should store the waker from `cx` for later and return
959    /// `Poll::Pending` without writing anything to `destination`.  Later, it
960    /// should alert the waker when either (1) the items arrive, (2) the stream
961    /// has ended, or (3) an error occurs.
962    ///
963    /// If the implementation cannot consume any items immediately and `finish`
964    /// is _true_, it should, if possible, return
965    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
966    /// anything from `source`.  However, that might not be possible if an
967    /// earlier call to `poll_consume` kicked off an asynchronous operation
968    /// which needs to be completed (and possibly interrupted) gracefully, in
969    /// which case the implementation may return `Poll::Pending` and later alert
970    /// the waker as described above.  In other words, when `finish` is true,
971    /// the implementation should prioritize returning a result to the reader
972    /// (even if no items can be consumed) rather than wait indefinitely for at
973    /// capacity to free up.
974    ///
975    /// In all of the above cases, the implementation may alternatively choose
976    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
977    /// the guest (if any) to trap and render the component instance (if any)
978    /// unusable.  The implementation should report errors that _are_
979    /// recoverable by other means (e.g. by writing to a `future`) and return
980    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
981    ///
982    /// Note that the implementation should only return
983    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
984    /// items from `source` if called with `finish` set to true.  If it does so
985    /// when `finish` is false, the caller will trap.  Additionally, it should
986    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
987    /// least one item from `source` if there is an item available; otherwise,
988    /// the caller will trap.  If `poll_consume` is called with no items in
989    /// `source`, it should only return `Poll::Ready(_)` once it is able to
990    /// accept at least one item during the next call to `poll_consume`.
991    ///
992    /// Note that any items which the implementation of this trait takes from
993    /// `source` become the responsibility of that implementation.  For that
994    /// reason, an implementation which forwards items to an upstream sink
995    /// should reserve capacity in that sink before taking items out of
996    /// `source`, if possible.  Alternatively, it might buffer items which can't
997    /// be forwarded immediately and send them once capacity is freed up.
998    ///
999    /// ## Backpressure
1000    ///
1001    /// As mentioned above, an implementation might choose to return
1002    /// `Poll::Pending` after taking items from `source`, which tells the caller
1003    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
1004    /// a form of backpressure when the items are forwarded to an upstream sink
1005    /// asynchronously.  Note, however, that it's not possible to "put back"
1006    /// items into `source` once they've been taken out, so if the upstream sink
1007    /// is unable to accept all the items, that cannot be communicated to the
1008    /// writer at this level of abstraction.  Just as with application-specific,
1009    /// recoverable errors, information about which items could be forwarded and
1010    /// which could not must be communicated out-of-band, e.g. by writing to an
1011    /// application-specific `future`.
1012    ///
1013    /// Similarly, if the writer cancels the write after items have been taken
1014    /// from `source` but before the items have all been forwarded to an
1015    /// upstream sink, `poll_consume` will be called with `finish` set to true,
1016    /// and the implementation may either:
1017    ///
1018    /// - Interrupt the forwarding process gracefully.  This may be preferable
1019    /// if there is an out-of-band channel for communicating to the writer how
1020    /// many items were forwarded before being interrupted.
1021    ///
1022    /// - Allow the forwarding to complete without interrupting it.  This is
1023    /// usually preferable if there's no out-of-band channel for reporting back
1024    /// to the writer how many items were forwarded.
1025    fn poll_consume(
1026        self: Pin<&mut Self>,
1027        cx: &mut Context<'_>,
1028        store: StoreContextMut<D>,
1029        source: Source<'_, Self::Item>,
1030        finish: bool,
1031    ) -> Poll<Result<StreamResult>>;
1032}
1033
1034/// Represents a host-owned write end of a future.
1035pub trait FutureProducer<D>: Send + 'static {
1036    /// The payload type of this future.
1037    type Item;
1038
1039    /// Handle a host- or guest-initiated read by producing a value.
1040    ///
1041    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1042    /// simplified interface for futures.
1043    ///
1044    /// If `finish` is true, the implementation may return
1045    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
1046    /// could produce a value.  Otherwise, it must either return
1047    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
1048    fn poll_produce(
1049        self: Pin<&mut Self>,
1050        cx: &mut Context<'_>,
1051        store: StoreContextMut<D>,
1052        finish: bool,
1053    ) -> Poll<Result<Option<Self::Item>>>;
1054}
1055
1056impl<T, E, D, Fut> FutureProducer<D> for Fut
1057where
1058    E: Into<Error>,
1059    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1060{
1061    type Item = T;
1062
1063    fn poll_produce<'a>(
1064        self: Pin<&mut Self>,
1065        cx: &mut Context<'_>,
1066        _: StoreContextMut<'a, D>,
1067        finish: bool,
1068    ) -> Poll<Result<Option<T>>> {
1069        match self.poll(cx) {
1070            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1071            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1072            Poll::Pending if finish => Poll::Ready(Ok(None)),
1073            Poll::Pending => Poll::Pending,
1074        }
1075    }
1076}
1077
1078/// Represents a host-owned read end of a future.
1079pub trait FutureConsumer<D>: Send + 'static {
1080    /// The payload type of this future.
1081    type Item;
1082
1083    /// Handle a host- or guest-initiated write by consuming a value.
1084    ///
1085    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1086    /// simplified interface for futures.
1087    ///
1088    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
1089    /// without taking the item from `source`, which indicates the operation was
1090    /// canceled before it could consume the value.  Otherwise, it must either
1091    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
1092    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
1093    /// the item).
1094    fn poll_consume(
1095        self: Pin<&mut Self>,
1096        cx: &mut Context<'_>,
1097        store: StoreContextMut<D>,
1098        source: Source<'_, Self::Item>,
1099        finish: bool,
1100    ) -> Poll<Result<()>>;
1101}
1102
1103/// Represents the readable end of a Component Model `future`.
1104///
1105/// Note that `FutureReader` instances must be disposed of using either `pipe`
1106/// or `close`; otherwise the in-store representation will leak and the writer
1107/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1108/// ensure that disposal happens automatically.
1109pub struct FutureReader<T> {
1110    id: TableId<TransmitHandle>,
1111    _phantom: PhantomData<T>,
1112}
1113
1114impl<T> FutureReader<T> {
1115    /// Create a new future with the specified producer.
1116    pub fn new<S: AsContextMut>(
1117        mut store: S,
1118        producer: impl FutureProducer<S::Data, Item = T>,
1119    ) -> Self
1120    where
1121        T: func::Lower + func::Lift + Send + Sync + 'static,
1122    {
1123        struct Producer<P>(P);
1124
1125        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1126            for Producer<P>
1127        {
1128            type Item = P::Item;
1129            type Buffer = Option<P::Item>;
1130
1131            fn poll_produce<'a>(
1132                self: Pin<&mut Self>,
1133                cx: &mut Context<'_>,
1134                store: StoreContextMut<D>,
1135                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1136                finish: bool,
1137            ) -> Poll<Result<StreamResult>> {
1138                // SAFETY: This is a standard pin-projection, and we never move
1139                // out of `self`.
1140                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1141
1142                Poll::Ready(Ok(
1143                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1144                        destination.set_buffer(Some(value));
1145
1146                        // Here we return `StreamResult::Completed` even though
1147                        // we've produced the last item we'll ever produce.
1148                        // That's because the ABI expects
1149                        // `ReturnCode::Completed(1)` rather than
1150                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1151                        // called again since the future will have resolved.
1152                        StreamResult::Completed
1153                    } else {
1154                        StreamResult::Cancelled
1155                    },
1156                ))
1157            }
1158        }
1159
1160        Self::new_(
1161            store
1162                .as_context_mut()
1163                .new_transmit(TransmitKind::Future, Producer(producer)),
1164        )
1165    }
1166
1167    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1168        Self {
1169            id,
1170            _phantom: PhantomData,
1171        }
1172    }
1173
1174    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1175        self.id
1176    }
1177
1178    /// Set the consumer that accepts the result of this future.
1179    pub fn pipe<S: AsContextMut>(
1180        self,
1181        mut store: S,
1182        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1183    ) where
1184        T: func::Lift + 'static,
1185    {
1186        struct Consumer<C>(C);
1187
1188        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1189            for Consumer<C>
1190        {
1191            type Item = T;
1192
1193            fn poll_consume(
1194                self: Pin<&mut Self>,
1195                cx: &mut Context<'_>,
1196                mut store: StoreContextMut<D>,
1197                mut source: Source<Self::Item>,
1198                finish: bool,
1199            ) -> Poll<Result<StreamResult>> {
1200                // SAFETY: This is a standard pin-projection, and we never move
1201                // out of `self`.
1202                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1203
1204                ready!(consumer.poll_consume(
1205                    cx,
1206                    store.as_context_mut(),
1207                    source.reborrow(),
1208                    finish
1209                ))?;
1210
1211                Poll::Ready(Ok(if source.remaining(store) == 0 {
1212                    // Here we return `StreamResult::Completed` even though
1213                    // we've consumed the last item we'll ever consume.  That's
1214                    // because the ABI expects `ReturnCode::Completed(1)` rather
1215                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1216                    // called again since the future will have resolved.
1217                    StreamResult::Completed
1218                } else {
1219                    StreamResult::Cancelled
1220                }))
1221            }
1222        }
1223
1224        store
1225            .as_context_mut()
1226            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1227    }
1228
1229    /// Transfer ownership of the read end of a future from a guest to the host.
1230    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1231        let id = lift_index_to_future(cx, ty, index)?;
1232        Ok(Self::new_(id))
1233    }
1234
1235    /// Close this `FutureReader`.
1236    ///
1237    /// This will close this half of the future which will signal to a pending
1238    /// write, if any, that the reader side is dropped. If the writer half has
1239    /// not yet written a value then when it attempts to write a value it will
1240    /// see that this end is closed.
1241    ///
1242    /// # Panics
1243    ///
1244    /// Panics if the store that the [`Accessor`] is derived from does not own
1245    /// this future. Usage of this future after calling `close` will also cause
1246    /// a panic.
1247    pub fn close(&mut self, mut store: impl AsContextMut) {
1248        future_close(store.as_context_mut().0, &mut self.id)
1249    }
1250
1251    /// Convenience method around [`Self::close`].
1252    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1253        accessor.as_accessor().with(|access| self.close(access))
1254    }
1255
1256    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1257    /// drop and clean it up from the store.
1258    ///
1259    /// Note that the `accessor` provided must own this future and is
1260    /// additionally transferred to the `GuardedFutureReader` return value.
1261    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1262    where
1263        A: AsAccessor,
1264    {
1265        GuardedFutureReader::new(accessor, self)
1266    }
1267
1268    /// Attempts to convert this [`FutureReader<T>`] to a [`FutureAny`].
1269    ///
1270    /// # Errors
1271    ///
1272    /// This function will return an error if `self` does not belong to
1273    /// `store`.
1274    pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1275    where
1276        T: ComponentType + 'static,
1277    {
1278        FutureAny::try_from_future_reader(store, self)
1279    }
1280
1281    /// Attempts to convert a [`FutureAny`] into a [`FutureReader<T>`].
1282    ///
1283    /// # Errors
1284    ///
1285    /// This function will fail if `T` doesn't match the type of the value that
1286    /// `future` is sending.
1287    pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1288    where
1289        T: ComponentType + 'static,
1290    {
1291        future.try_into_future_reader()
1292    }
1293}
1294
1295impl<T> fmt::Debug for FutureReader<T> {
1296    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1297        f.debug_struct("FutureReader")
1298            .field("id", &self.id)
1299            .finish()
1300    }
1301}
1302
1303pub(super) fn future_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1304    let id = mem::replace(id, TableId::new(u32::MAX));
1305    store.host_drop_reader(id, TransmitKind::Future).unwrap();
1306}
1307
1308/// Transfer ownership of the read end of a future from the host to a guest.
1309pub(super) fn lift_index_to_future(
1310    cx: &mut LiftContext<'_>,
1311    ty: InterfaceType,
1312    index: u32,
1313) -> Result<TableId<TransmitHandle>> {
1314    match ty {
1315        InterfaceType::Future(src) => {
1316            let handle_table = cx
1317                .instance_mut()
1318                .table_for_transmit(TransmitIndex::Future(src));
1319            let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1320            if is_done {
1321                bail!("cannot lift future after being notified that the writable end dropped");
1322            }
1323            let id = TableId::<TransmitHandle>::new(rep);
1324            let concurrent_state = cx.concurrent_state_mut();
1325            let future = concurrent_state.get_mut(id)?;
1326            future.common.handle = None;
1327            let state = future.state;
1328
1329            if concurrent_state.get_mut(state)?.done {
1330                bail!("cannot lift future after previous read succeeded");
1331            }
1332
1333            Ok(id)
1334        }
1335        _ => func::bad_type_info(),
1336    }
1337}
1338
1339/// Transfer ownership of the read end of a future from the host to a guest.
1340pub(super) fn lower_future_to_index<U>(
1341    id: TableId<TransmitHandle>,
1342    cx: &mut LowerContext<'_, U>,
1343    ty: InterfaceType,
1344) -> Result<u32> {
1345    match ty {
1346        InterfaceType::Future(dst) => {
1347            let concurrent_state = cx.store.0.concurrent_state_mut();
1348            let state = concurrent_state.get_mut(id)?.state;
1349            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1350
1351            let handle = cx
1352                .instance_mut()
1353                .table_for_transmit(TransmitIndex::Future(dst))
1354                .future_insert_read(dst, rep)?;
1355
1356            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1357
1358            Ok(handle)
1359        }
1360        _ => func::bad_type_info(),
1361    }
1362}
1363
1364// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1365// safe and correct since we lift and lower future handles as `u32`s.
1366unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1367    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1368
1369    type Lower = <u32 as func::ComponentType>::Lower;
1370
1371    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1372        match ty {
1373            InterfaceType::Future(ty) => {
1374                let ty = types.types[*ty].ty;
1375                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1376            }
1377            other => bail!("expected `future`, found `{}`", func::desc(other)),
1378        }
1379    }
1380}
1381
1382// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1383unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1384    fn linear_lower_to_flat<U>(
1385        &self,
1386        cx: &mut LowerContext<'_, U>,
1387        ty: InterfaceType,
1388        dst: &mut MaybeUninit<Self::Lower>,
1389    ) -> Result<()> {
1390        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1391    }
1392
1393    fn linear_lower_to_memory<U>(
1394        &self,
1395        cx: &mut LowerContext<'_, U>,
1396        ty: InterfaceType,
1397        offset: usize,
1398    ) -> Result<()> {
1399        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1400            cx,
1401            InterfaceType::U32,
1402            offset,
1403        )
1404    }
1405}
1406
1407// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1408unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1409    fn linear_lift_from_flat(
1410        cx: &mut LiftContext<'_>,
1411        ty: InterfaceType,
1412        src: &Self::Lower,
1413    ) -> Result<Self> {
1414        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1415        Self::lift_from_index(cx, ty, index)
1416    }
1417
1418    fn linear_lift_from_memory(
1419        cx: &mut LiftContext<'_>,
1420        ty: InterfaceType,
1421        bytes: &[u8],
1422    ) -> Result<Self> {
1423        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1424        Self::lift_from_index(cx, ty, index)
1425    }
1426}
1427
1428/// A [`FutureReader`] paired with an [`Accessor`].
1429///
1430/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1431/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1432/// [`FutureReader::guard`].
1433pub struct GuardedFutureReader<T, A>
1434where
1435    A: AsAccessor,
1436{
1437    // This field is `None` to implement the conversion from this guard back to
1438    // `FutureReader`. When `None` is seen in the destructor it will cause the
1439    // destructor to do nothing.
1440    reader: Option<FutureReader<T>>,
1441    accessor: A,
1442}
1443
1444impl<T, A> GuardedFutureReader<T, A>
1445where
1446    A: AsAccessor,
1447{
1448    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1449    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1450        Self {
1451            reader: Some(reader),
1452            accessor,
1453        }
1454    }
1455
1456    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1457    /// back.
1458    pub fn into_future(self) -> FutureReader<T> {
1459        self.into()
1460    }
1461}
1462
1463impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1464where
1465    A: AsAccessor,
1466{
1467    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1468        guard.reader.take().unwrap()
1469    }
1470}
1471
1472impl<T, A> Drop for GuardedFutureReader<T, A>
1473where
1474    A: AsAccessor,
1475{
1476    fn drop(&mut self) {
1477        if let Some(reader) = &mut self.reader {
1478            reader.close_with(&self.accessor)
1479        }
1480    }
1481}
1482
1483/// Represents the readable end of a Component Model `stream`.
1484///
1485/// Note that `StreamReader` instances must be disposed of using `close`;
1486/// otherwise the in-store representation will leak and the writer end will hang
1487/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1488/// disposal happens automatically.
1489pub struct StreamReader<T> {
1490    id: TableId<TransmitHandle>,
1491    _phantom: PhantomData<T>,
1492}
1493
1494impl<T> StreamReader<T> {
1495    /// Create a new stream with the specified producer.
1496    pub fn new<S: AsContextMut>(
1497        mut store: S,
1498        producer: impl StreamProducer<S::Data, Item = T>,
1499    ) -> Self
1500    where
1501        T: func::Lower + func::Lift + Send + Sync + 'static,
1502    {
1503        Self::new_(
1504            store
1505                .as_context_mut()
1506                .new_transmit(TransmitKind::Stream, producer),
1507        )
1508    }
1509
1510    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1511        Self {
1512            id,
1513            _phantom: PhantomData,
1514        }
1515    }
1516
1517    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1518        self.id
1519    }
1520
1521    /// Attempt to consume this object by converting it into the specified type.
1522    ///
1523    /// This can be useful for "short-circuiting" host-to-host streams,
1524    /// bypassing the guest entirely.  For example, if a guest task returns a
1525    /// host-created stream and then exits, this function may be used to
1526    /// retrieve the write end, after which the guest instance and store may be
1527    /// disposed of if no longer needed.
1528    ///
1529    /// This will return `Ok(_)` if and only if the following conditions are
1530    /// met:
1531    ///
1532    /// - The stream was created by the host (i.e. not by the guest).
1533    ///
1534    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1535    /// producer provided to `StreamReader::new` when the stream was created,
1536    /// along with `TypeId::of::<V>()`.
1537    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1538        let store = store.as_context_mut();
1539        let state = store.0.concurrent_state_mut();
1540        let id = state.get_mut(self.id).unwrap().state;
1541        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1542            match try_into(TypeId::of::<V>()) {
1543                Some(result) => {
1544                    self.close(store);
1545                    Ok(*result.downcast::<V>().unwrap())
1546                }
1547                None => Err(self),
1548            }
1549        } else {
1550            Err(self)
1551        }
1552    }
1553
1554    /// Set the consumer that accepts the items delivered to this stream.
1555    pub fn pipe<S: AsContextMut>(
1556        self,
1557        mut store: S,
1558        consumer: impl StreamConsumer<S::Data, Item = T>,
1559    ) where
1560        T: 'static,
1561    {
1562        store
1563            .as_context_mut()
1564            .set_consumer(self.id, TransmitKind::Stream, consumer);
1565    }
1566
1567    /// Transfer ownership of the read end of a stream from a guest to the host.
1568    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1569        let id = lift_index_to_stream(cx, ty, index)?;
1570        Ok(Self::new_(id))
1571    }
1572
1573    /// Close this `StreamReader`.
1574    ///
1575    /// This will signal that this portion of the stream is closed causing all
1576    /// future writes to return immediately with "DROPPED".
1577    ///
1578    /// # Panics
1579    ///
1580    /// Panics if the `store` does not own this future. Usage of this future
1581    /// after calling `close` will also cause a panic.
1582    pub fn close(&mut self, mut store: impl AsContextMut) {
1583        stream_close(store.as_context_mut().0, &mut self.id);
1584    }
1585
1586    /// Convenience method around [`Self::close`].
1587    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1588        accessor.as_accessor().with(|access| self.close(access))
1589    }
1590
1591    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1592    /// drop and clean it up from the store.
1593    ///
1594    /// Note that the `accessor` provided must own this future and is
1595    /// additionally transferred to the `GuardedStreamReader` return value.
1596    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1597    where
1598        A: AsAccessor,
1599    {
1600        GuardedStreamReader::new(accessor, self)
1601    }
1602
1603    /// Attempts to convert this [`StreamReader<T>`] to a [`StreamAny`].
1604    ///
1605    /// # Errors
1606    ///
1607    /// This function will return an error if `self` does not belong to
1608    /// `store`.
1609    pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1610    where
1611        T: ComponentType + 'static,
1612    {
1613        StreamAny::try_from_stream_reader(store, self)
1614    }
1615
1616    /// Attempts to convert a [`StreamAny`] into a [`StreamReader<T>`].
1617    ///
1618    /// # Errors
1619    ///
1620    /// This function will fail if `T` doesn't match the type of the value that
1621    /// `stream` is sending.
1622    pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1623    where
1624        T: ComponentType + 'static,
1625    {
1626        stream.try_into_stream_reader()
1627    }
1628}
1629
1630impl<T> fmt::Debug for StreamReader<T> {
1631    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1632        f.debug_struct("StreamReader")
1633            .field("id", &self.id)
1634            .finish()
1635    }
1636}
1637
1638pub(super) fn stream_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1639    let id = mem::replace(id, TableId::new(u32::MAX));
1640    store.host_drop_reader(id, TransmitKind::Stream).unwrap();
1641}
1642
1643/// Transfer ownership of the read end of a stream from a guest to the host.
1644pub(super) fn lift_index_to_stream(
1645    cx: &mut LiftContext<'_>,
1646    ty: InterfaceType,
1647    index: u32,
1648) -> Result<TableId<TransmitHandle>> {
1649    match ty {
1650        InterfaceType::Stream(src) => {
1651            let handle_table = cx
1652                .instance_mut()
1653                .table_for_transmit(TransmitIndex::Stream(src));
1654            let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1655            if is_done {
1656                bail!("cannot lift stream after being notified that the writable end dropped");
1657            }
1658            let id = TableId::<TransmitHandle>::new(rep);
1659            cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1660            Ok(id)
1661        }
1662        _ => func::bad_type_info(),
1663    }
1664}
1665
1666/// Transfer ownership of the read end of a stream from the host to a guest.
1667pub(super) fn lower_stream_to_index<U>(
1668    id: TableId<TransmitHandle>,
1669    cx: &mut LowerContext<'_, U>,
1670    ty: InterfaceType,
1671) -> Result<u32> {
1672    match ty {
1673        InterfaceType::Stream(dst) => {
1674            let concurrent_state = cx.store.0.concurrent_state_mut();
1675            let state = concurrent_state.get_mut(id)?.state;
1676            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1677
1678            let handle = cx
1679                .instance_mut()
1680                .table_for_transmit(TransmitIndex::Stream(dst))
1681                .stream_insert_read(dst, rep)?;
1682
1683            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1684
1685            Ok(handle)
1686        }
1687        _ => func::bad_type_info(),
1688    }
1689}
1690
1691// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1692// safe and correct since we lift and lower stream handles as `u32`s.
1693unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1694    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1695
1696    type Lower = <u32 as func::ComponentType>::Lower;
1697
1698    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1699        match ty {
1700            InterfaceType::Stream(ty) => {
1701                let ty = types.types[*ty].ty;
1702                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1703            }
1704            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1705        }
1706    }
1707}
1708
1709// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1710unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1711    fn linear_lower_to_flat<U>(
1712        &self,
1713        cx: &mut LowerContext<'_, U>,
1714        ty: InterfaceType,
1715        dst: &mut MaybeUninit<Self::Lower>,
1716    ) -> Result<()> {
1717        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1718    }
1719
1720    fn linear_lower_to_memory<U>(
1721        &self,
1722        cx: &mut LowerContext<'_, U>,
1723        ty: InterfaceType,
1724        offset: usize,
1725    ) -> Result<()> {
1726        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1727            cx,
1728            InterfaceType::U32,
1729            offset,
1730        )
1731    }
1732}
1733
1734// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1735unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1736    fn linear_lift_from_flat(
1737        cx: &mut LiftContext<'_>,
1738        ty: InterfaceType,
1739        src: &Self::Lower,
1740    ) -> Result<Self> {
1741        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1742        Self::lift_from_index(cx, ty, index)
1743    }
1744
1745    fn linear_lift_from_memory(
1746        cx: &mut LiftContext<'_>,
1747        ty: InterfaceType,
1748        bytes: &[u8],
1749    ) -> Result<Self> {
1750        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1751        Self::lift_from_index(cx, ty, index)
1752    }
1753}
1754
1755/// A [`StreamReader`] paired with an [`Accessor`].
1756///
1757/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1758/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1759/// [`StreamReader::guard`].
1760pub struct GuardedStreamReader<T, A>
1761where
1762    A: AsAccessor,
1763{
1764    // This field is `None` to implement the conversion from this guard back to
1765    // `StreamReader`. When `None` is seen in the destructor it will cause the
1766    // destructor to do nothing.
1767    reader: Option<StreamReader<T>>,
1768    accessor: A,
1769}
1770
1771impl<T, A> GuardedStreamReader<T, A>
1772where
1773    A: AsAccessor,
1774{
1775    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1776    /// `reader`.
1777    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1778        Self {
1779            reader: Some(reader),
1780            accessor,
1781        }
1782    }
1783
1784    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1785    /// back.
1786    pub fn into_stream(self) -> StreamReader<T> {
1787        self.into()
1788    }
1789}
1790
1791impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1792where
1793    A: AsAccessor,
1794{
1795    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1796        guard.reader.take().unwrap()
1797    }
1798}
1799
1800impl<T, A> Drop for GuardedStreamReader<T, A>
1801where
1802    A: AsAccessor,
1803{
1804    fn drop(&mut self) {
1805        if let Some(reader) = &mut self.reader {
1806            reader.close_with(&self.accessor)
1807        }
1808    }
1809}
1810
1811/// Represents a Component Model `error-context`.
1812pub struct ErrorContext {
1813    rep: u32,
1814}
1815
1816impl ErrorContext {
1817    pub(crate) fn new(rep: u32) -> Self {
1818        Self { rep }
1819    }
1820
1821    /// Convert this `ErrorContext` into a [`Val`].
1822    pub fn into_val(self) -> Val {
1823        Val::ErrorContext(ErrorContextAny(self.rep))
1824    }
1825
1826    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1827    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1828        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1829            bail!("expected `error-context`; got `{}`", value.desc());
1830        };
1831        Ok(Self::new(*rep))
1832    }
1833
1834    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1835        match ty {
1836            InterfaceType::ErrorContext(src) => {
1837                let rep = cx
1838                    .instance_mut()
1839                    .table_for_error_context(src)
1840                    .error_context_rep(index)?;
1841
1842                Ok(Self { rep })
1843            }
1844            _ => func::bad_type_info(),
1845        }
1846    }
1847}
1848
1849pub(crate) fn lower_error_context_to_index<U>(
1850    rep: u32,
1851    cx: &mut LowerContext<'_, U>,
1852    ty: InterfaceType,
1853) -> Result<u32> {
1854    match ty {
1855        InterfaceType::ErrorContext(dst) => {
1856            let tbl = cx.instance_mut().table_for_error_context(dst);
1857            tbl.error_context_insert(rep)
1858        }
1859        _ => func::bad_type_info(),
1860    }
1861}
1862// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1863// safe and correct since we lift and lower future handles as `u32`s.
1864unsafe impl func::ComponentType for ErrorContext {
1865    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1866
1867    type Lower = <u32 as func::ComponentType>::Lower;
1868
1869    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1870        match ty {
1871            InterfaceType::ErrorContext(_) => Ok(()),
1872            other => bail!("expected `error`, found `{}`", func::desc(other)),
1873        }
1874    }
1875}
1876
1877// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1878unsafe impl func::Lower for ErrorContext {
1879    fn linear_lower_to_flat<T>(
1880        &self,
1881        cx: &mut LowerContext<'_, T>,
1882        ty: InterfaceType,
1883        dst: &mut MaybeUninit<Self::Lower>,
1884    ) -> Result<()> {
1885        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1886            cx,
1887            InterfaceType::U32,
1888            dst,
1889        )
1890    }
1891
1892    fn linear_lower_to_memory<T>(
1893        &self,
1894        cx: &mut LowerContext<'_, T>,
1895        ty: InterfaceType,
1896        offset: usize,
1897    ) -> Result<()> {
1898        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1899            cx,
1900            InterfaceType::U32,
1901            offset,
1902        )
1903    }
1904}
1905
1906// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1907unsafe impl func::Lift for ErrorContext {
1908    fn linear_lift_from_flat(
1909        cx: &mut LiftContext<'_>,
1910        ty: InterfaceType,
1911        src: &Self::Lower,
1912    ) -> Result<Self> {
1913        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1914        Self::lift_from_index(cx, ty, index)
1915    }
1916
1917    fn linear_lift_from_memory(
1918        cx: &mut LiftContext<'_>,
1919        ty: InterfaceType,
1920        bytes: &[u8],
1921    ) -> Result<Self> {
1922        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1923        Self::lift_from_index(cx, ty, index)
1924    }
1925}
1926
1927/// Represents the read or write end of a stream or future.
1928pub(super) struct TransmitHandle {
1929    pub(super) common: WaitableCommon,
1930    /// See `TransmitState`
1931    state: TableId<TransmitState>,
1932}
1933
1934impl TransmitHandle {
1935    fn new(state: TableId<TransmitState>) -> Self {
1936        Self {
1937            common: WaitableCommon::default(),
1938            state,
1939        }
1940    }
1941}
1942
1943impl TableDebug for TransmitHandle {
1944    fn type_name() -> &'static str {
1945        "TransmitHandle"
1946    }
1947}
1948
1949/// Represents the state of a stream or future.
1950struct TransmitState {
1951    /// The write end of the stream or future.
1952    write_handle: TableId<TransmitHandle>,
1953    /// The read end of the stream or future.
1954    read_handle: TableId<TransmitHandle>,
1955    /// See `WriteState`
1956    write: WriteState,
1957    /// See `ReadState`
1958    read: ReadState,
1959    /// Whether futher values may be transmitted via this stream or future.
1960    done: bool,
1961    /// The original creator of this stream, used for type-checking with
1962    /// `{Future,Stream}Any`.
1963    pub(super) origin: TransmitOrigin,
1964}
1965
1966#[derive(Copy, Clone)]
1967pub(super) enum TransmitOrigin {
1968    Host,
1969    GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
1970    GuestStream(ComponentInstanceId, TypeStreamTableIndex),
1971}
1972
1973impl TransmitState {
1974    fn new(origin: TransmitOrigin) -> Self {
1975        Self {
1976            write_handle: TableId::new(u32::MAX),
1977            read_handle: TableId::new(u32::MAX),
1978            read: ReadState::Open,
1979            write: WriteState::Open,
1980            done: false,
1981            origin,
1982        }
1983    }
1984}
1985
1986impl TableDebug for TransmitState {
1987    fn type_name() -> &'static str {
1988        "TransmitState"
1989    }
1990}
1991
1992impl TransmitOrigin {
1993    fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
1994        match index {
1995            TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
1996            TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
1997        }
1998    }
1999}
2000
2001type PollStream = Box<
2002    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2003>;
2004
2005type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2006
2007/// Represents the state of the write end of a stream or future.
2008enum WriteState {
2009    /// The write end is open, but no write is pending.
2010    Open,
2011    /// The write end is owned by a guest task and a write is pending.
2012    GuestReady {
2013        instance: Instance,
2014        caller: RuntimeComponentInstanceIndex,
2015        ty: TransmitIndex,
2016        flat_abi: Option<FlatAbi>,
2017        options: OptionsIndex,
2018        address: usize,
2019        count: usize,
2020        handle: u32,
2021    },
2022    /// The write end is owned by the host, which is ready to produce items.
2023    HostReady {
2024        produce: PollStream,
2025        try_into: TryInto,
2026        guest_offset: usize,
2027        cancel: bool,
2028        cancel_waker: Option<Waker>,
2029    },
2030    /// The write end has been dropped.
2031    Dropped,
2032}
2033
2034impl fmt::Debug for WriteState {
2035    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2036        match self {
2037            Self::Open => f.debug_tuple("Open").finish(),
2038            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2039            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2040            Self::Dropped => f.debug_tuple("Dropped").finish(),
2041        }
2042    }
2043}
2044
2045/// Represents the state of the read end of a stream or future.
2046enum ReadState {
2047    /// The read end is open, but no read is pending.
2048    Open,
2049    /// The read end is owned by a guest task and a read is pending.
2050    GuestReady {
2051        ty: TransmitIndex,
2052        caller: RuntimeComponentInstanceIndex,
2053        flat_abi: Option<FlatAbi>,
2054        instance: Instance,
2055        options: OptionsIndex,
2056        address: usize,
2057        count: usize,
2058        handle: u32,
2059    },
2060    /// The read end is owned by a host task, and it is ready to consume items.
2061    HostReady {
2062        consume: PollStream,
2063        guest_offset: usize,
2064        cancel: bool,
2065        cancel_waker: Option<Waker>,
2066    },
2067    /// Both the read and write ends are owned by the host.
2068    HostToHost {
2069        accept: Box<
2070            dyn for<'a> Fn(
2071                    &'a mut UntypedWriteBuffer<'a>,
2072                )
2073                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2074                + Send
2075                + Sync,
2076        >,
2077        buffer: Vec<u8>,
2078        limit: usize,
2079    },
2080    /// The read end has been dropped.
2081    Dropped,
2082}
2083
2084impl fmt::Debug for ReadState {
2085    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2086        match self {
2087            Self::Open => f.debug_tuple("Open").finish(),
2088            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2089            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2090            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2091            Self::Dropped => f.debug_tuple("Dropped").finish(),
2092        }
2093    }
2094}
2095
2096fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2097    let count = guest_offset.try_into().unwrap();
2098    match state {
2099        StreamResult::Dropped => ReturnCode::Dropped(count),
2100        StreamResult::Completed => ReturnCode::completed(kind, count),
2101        StreamResult::Cancelled => ReturnCode::Cancelled(count),
2102    }
2103}
2104
2105impl StoreOpaque {
2106    fn pipe_from_guest(
2107        &mut self,
2108        kind: TransmitKind,
2109        id: TableId<TransmitState>,
2110        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2111    ) {
2112        let future = async move {
2113            let stream_state = future.await?;
2114            tls::get(|store| {
2115                let state = store.concurrent_state_mut();
2116                let transmit = state.get_mut(id)?;
2117                let ReadState::HostReady {
2118                    consume,
2119                    guest_offset,
2120                    ..
2121                } = mem::replace(&mut transmit.read, ReadState::Open)
2122                else {
2123                    unreachable!();
2124                };
2125                let code = return_code(kind, stream_state, guest_offset);
2126                transmit.read = match stream_state {
2127                    StreamResult::Dropped => ReadState::Dropped,
2128                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2129                        consume,
2130                        guest_offset: 0,
2131                        cancel: false,
2132                        cancel_waker: None,
2133                    },
2134                };
2135                let WriteState::GuestReady { ty, handle, .. } =
2136                    mem::replace(&mut transmit.write, WriteState::Open)
2137                else {
2138                    unreachable!();
2139                };
2140                state.send_write_result(ty, id, handle, code)?;
2141                Ok(())
2142            })
2143        };
2144
2145        self.concurrent_state_mut().push_future(future.boxed());
2146    }
2147
2148    fn pipe_to_guest(
2149        &mut self,
2150        kind: TransmitKind,
2151        id: TableId<TransmitState>,
2152        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2153    ) {
2154        let future = async move {
2155            let stream_state = future.await?;
2156            tls::get(|store| {
2157                let state = store.concurrent_state_mut();
2158                let transmit = state.get_mut(id)?;
2159                let WriteState::HostReady {
2160                    produce,
2161                    try_into,
2162                    guest_offset,
2163                    ..
2164                } = mem::replace(&mut transmit.write, WriteState::Open)
2165                else {
2166                    unreachable!();
2167                };
2168                let code = return_code(kind, stream_state, guest_offset);
2169                transmit.write = match stream_state {
2170                    StreamResult::Dropped => WriteState::Dropped,
2171                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2172                        produce,
2173                        try_into,
2174                        guest_offset: 0,
2175                        cancel: false,
2176                        cancel_waker: None,
2177                    },
2178                };
2179                let ReadState::GuestReady { ty, handle, .. } =
2180                    mem::replace(&mut transmit.read, ReadState::Open)
2181                else {
2182                    unreachable!();
2183                };
2184                state.send_read_result(ty, id, handle, code)?;
2185                Ok(())
2186            })
2187        };
2188
2189        self.concurrent_state_mut().push_future(future.boxed());
2190    }
2191
2192    /// Drop the read end of a stream or future read from the host.
2193    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2194        let state = self.concurrent_state_mut();
2195        let transmit_id = state.get_mut(id)?.state;
2196        let transmit = state
2197            .get_mut(transmit_id)
2198            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2199        log::trace!(
2200            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2201            transmit.read,
2202            transmit.write
2203        );
2204
2205        transmit.read = ReadState::Dropped;
2206
2207        // If the write end is already dropped, it should stay dropped,
2208        // otherwise, it should be opened.
2209        let new_state = if let WriteState::Dropped = &transmit.write {
2210            WriteState::Dropped
2211        } else {
2212            WriteState::Open
2213        };
2214
2215        let write_handle = transmit.write_handle;
2216
2217        match mem::replace(&mut transmit.write, new_state) {
2218            // If a guest is waiting to write, notify it that the read end has
2219            // been dropped.
2220            WriteState::GuestReady { ty, handle, .. } => {
2221                state.update_event(
2222                    write_handle.rep(),
2223                    match ty {
2224                        TransmitIndex::Future(ty) => Event::FutureWrite {
2225                            code: ReturnCode::Dropped(0),
2226                            pending: Some((ty, handle)),
2227                        },
2228                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2229                            code: ReturnCode::Dropped(0),
2230                            pending: Some((ty, handle)),
2231                        },
2232                    },
2233                )?;
2234            }
2235
2236            WriteState::HostReady { .. } => {}
2237
2238            WriteState::Open => {
2239                state.update_event(
2240                    write_handle.rep(),
2241                    match kind {
2242                        TransmitKind::Future => Event::FutureWrite {
2243                            code: ReturnCode::Dropped(0),
2244                            pending: None,
2245                        },
2246                        TransmitKind::Stream => Event::StreamWrite {
2247                            code: ReturnCode::Dropped(0),
2248                            pending: None,
2249                        },
2250                    },
2251                )?;
2252            }
2253
2254            WriteState::Dropped => {
2255                log::trace!("host_drop_reader delete {transmit_id:?}");
2256                state.delete_transmit(transmit_id)?;
2257            }
2258        }
2259        Ok(())
2260    }
2261
2262    /// Drop the write end of a stream or future read from the host.
2263    fn host_drop_writer(
2264        &mut self,
2265        id: TableId<TransmitHandle>,
2266        on_drop_open: Option<fn() -> Result<()>>,
2267    ) -> Result<()> {
2268        let state = self.concurrent_state_mut();
2269        let transmit_id = state.get_mut(id)?.state;
2270        let transmit = state
2271            .get_mut(transmit_id)
2272            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2273        log::trace!(
2274            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2275            transmit.read,
2276            transmit.write
2277        );
2278
2279        // Existing queued transmits must be updated with information for the impending writer closure
2280        match &mut transmit.write {
2281            WriteState::GuestReady { .. } => {
2282                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2283            }
2284            WriteState::HostReady { .. } => {}
2285            v @ WriteState::Open => {
2286                if let (Some(on_drop_open), false) = (
2287                    on_drop_open,
2288                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2289                ) {
2290                    on_drop_open()?;
2291                } else {
2292                    *v = WriteState::Dropped;
2293                }
2294            }
2295            WriteState::Dropped => unreachable!("write state is already dropped"),
2296        }
2297
2298        let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2299
2300        // If the existing read state is dropped, then there's nothing to read
2301        // and we can keep it that way.
2302        //
2303        // If the read state was any other state, then we must set the new state to open
2304        // to indicate that there *is* data to be read
2305        let new_state = if let ReadState::Dropped = &transmit.read {
2306            ReadState::Dropped
2307        } else {
2308            ReadState::Open
2309        };
2310
2311        let read_handle = transmit.read_handle;
2312
2313        // Swap in the new read state
2314        match mem::replace(&mut transmit.read, new_state) {
2315            // If the guest was ready to read, then we cannot drop the reader (or writer);
2316            // we must deliver the event, and update the state associated with the handle to
2317            // represent that a read must be performed
2318            ReadState::GuestReady { ty, handle, .. } => {
2319                // Ensure the final read of the guest is queued, with appropriate closure indicator
2320                self.concurrent_state_mut().update_event(
2321                    read_handle.rep(),
2322                    match ty {
2323                        TransmitIndex::Future(ty) => Event::FutureRead {
2324                            code: ReturnCode::Dropped(0),
2325                            pending: Some((ty, handle)),
2326                        },
2327                        TransmitIndex::Stream(ty) => Event::StreamRead {
2328                            code: ReturnCode::Dropped(0),
2329                            pending: Some((ty, handle)),
2330                        },
2331                    },
2332                )?;
2333            }
2334
2335            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2336
2337            // If the read state is open, then there are no registered readers of the stream/future
2338            ReadState::Open => {
2339                self.concurrent_state_mut().update_event(
2340                    read_handle.rep(),
2341                    match on_drop_open {
2342                        Some(_) => Event::FutureRead {
2343                            code: ReturnCode::Dropped(0),
2344                            pending: None,
2345                        },
2346                        None => Event::StreamRead {
2347                            code: ReturnCode::Dropped(0),
2348                            pending: None,
2349                        },
2350                    },
2351                )?;
2352            }
2353
2354            // If the read state was already dropped, then we can remove the transmit state completely
2355            // (both writer and reader have been dropped)
2356            ReadState::Dropped => {
2357                log::trace!("host_drop_writer delete {transmit_id:?}");
2358                self.concurrent_state_mut().delete_transmit(transmit_id)?;
2359            }
2360        }
2361        Ok(())
2362    }
2363
2364    pub(super) fn transmit_origin(
2365        &mut self,
2366        id: TableId<TransmitHandle>,
2367    ) -> Result<TransmitOrigin> {
2368        let state = self.concurrent_state_mut();
2369        let state_id = state.get_mut(id)?.state;
2370        Ok(state.get_mut(state_id)?.origin)
2371    }
2372}
2373
2374impl<T> StoreContextMut<'_, T> {
2375    fn new_transmit<P: StreamProducer<T>>(
2376        mut self,
2377        kind: TransmitKind,
2378        producer: P,
2379    ) -> TableId<TransmitHandle>
2380    where
2381        P::Item: func::Lower,
2382    {
2383        let token = StoreToken::new(self.as_context_mut());
2384        let state = self.0.concurrent_state_mut();
2385        let (_, read) = state.new_transmit(TransmitOrigin::Host).unwrap();
2386        let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2387        let id = state.get_mut(read).unwrap().state;
2388        let mut dropped = false;
2389        let produce = Box::new({
2390            let producer = producer.clone();
2391            move || {
2392                let producer = producer.clone();
2393                async move {
2394                    let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2395
2396                    let (result, cancelled) = if buffer.remaining().is_empty() {
2397                        future::poll_fn(|cx| {
2398                            tls::get(|store| {
2399                                let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2400
2401                                let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2402                                    unreachable!();
2403                                };
2404
2405                                let mut host_buffer =
2406                                    if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2407                                        Some(Cursor::new(mem::take(buffer)))
2408                                    } else {
2409                                        None
2410                                    };
2411
2412                                let poll = mine.as_mut().poll_produce(
2413                                    cx,
2414                                    token.as_context_mut(store),
2415                                    Destination {
2416                                        id,
2417                                        buffer: &mut buffer,
2418                                        host_buffer: host_buffer.as_mut(),
2419                                        _phantom: PhantomData,
2420                                    },
2421                                    cancel,
2422                                );
2423
2424                                let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2425
2426                                let host_offset = if let (
2427                                    Some(host_buffer),
2428                                    ReadState::HostToHost { buffer, limit, .. },
2429                                ) = (host_buffer, &mut transmit.read)
2430                                {
2431                                    *limit = usize::try_from(host_buffer.position()).unwrap();
2432                                    *buffer = host_buffer.into_inner();
2433                                    *limit
2434                                } else {
2435                                    0
2436                                };
2437
2438                                {
2439                                    let WriteState::HostReady {
2440                                        guest_offset,
2441                                        cancel,
2442                                        cancel_waker,
2443                                        ..
2444                                    } = &mut transmit.write
2445                                    else {
2446                                        unreachable!();
2447                                    };
2448
2449                                    if poll.is_pending() {
2450                                        if !buffer.remaining().is_empty()
2451                                            || *guest_offset > 0
2452                                            || host_offset > 0
2453                                        {
2454                                            return Poll::Ready(Err(anyhow!(
2455                                                "StreamProducer::poll_produce returned Poll::Pending \
2456                                                 after producing at least one item"
2457                                            )));
2458                                        }
2459                                        *cancel_waker = Some(cx.waker().clone());
2460                                    } else {
2461                                        *cancel_waker = None;
2462                                        *cancel = false;
2463                                    }
2464                                }
2465
2466                                poll.map(|v| v.map(|result| (result, cancel)))
2467                            })
2468                        })
2469                            .await?
2470                    } else {
2471                        (StreamResult::Completed, false)
2472                    };
2473
2474                    let (guest_offset, host_offset, count) = tls::get(|store| {
2475                        let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2476                        let (count, host_offset) = match &transmit.read {
2477                            &ReadState::GuestReady { count, .. } => (count, 0),
2478                            &ReadState::HostToHost { limit, .. } => (1, limit),
2479                            _ => unreachable!(),
2480                        };
2481                        let guest_offset = match &transmit.write {
2482                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2483                            _ => unreachable!(),
2484                        };
2485                        (guest_offset, host_offset, count)
2486                    });
2487
2488                    match result {
2489                        StreamResult::Completed => {
2490                            if count > 1
2491                                && buffer.remaining().is_empty()
2492                                && guest_offset == 0
2493                                && host_offset == 0
2494                            {
2495                                bail!(
2496                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2497                                     without producing any items"
2498                                );
2499                            }
2500                        }
2501                        StreamResult::Cancelled => {
2502                            if !cancelled {
2503                                bail!(
2504                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2505                                     without being given a `finish` parameter value of true"
2506                                );
2507                            }
2508                        }
2509                        StreamResult::Dropped => {
2510                            dropped = true;
2511                        }
2512                    }
2513
2514                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2515
2516                    *producer.lock().unwrap() = Some((mine, buffer));
2517
2518                    if write_buffer {
2519                        write(token, id, producer.clone(), kind).await?;
2520                    }
2521
2522                    Ok(if dropped {
2523                        if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2524                        {
2525                            StreamResult::Dropped
2526                        } else {
2527                            StreamResult::Completed
2528                        }
2529                    } else {
2530                        result
2531                    })
2532                }
2533                .boxed()
2534            }
2535        });
2536        let try_into = Box::new(move |ty| {
2537            let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2538            match P::try_into(mine, ty) {
2539                Ok(value) => Some(value),
2540                Err(mine) => {
2541                    *producer.lock().unwrap() = Some((mine, buffer));
2542                    None
2543                }
2544            }
2545        });
2546        state.get_mut(id).unwrap().write = WriteState::HostReady {
2547            produce,
2548            try_into,
2549            guest_offset: 0,
2550            cancel: false,
2551            cancel_waker: None,
2552        };
2553        read
2554    }
2555
2556    fn set_consumer<C: StreamConsumer<T>>(
2557        mut self,
2558        id: TableId<TransmitHandle>,
2559        kind: TransmitKind,
2560        consumer: C,
2561    ) {
2562        let token = StoreToken::new(self.as_context_mut());
2563        let state = self.0.concurrent_state_mut();
2564        let id = state.get_mut(id).unwrap().state;
2565        let transmit = state.get_mut(id).unwrap();
2566        let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2567        let consume_with_buffer = {
2568            let consumer = consumer.clone();
2569            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2570                let mut mine = consumer.lock().unwrap().take().unwrap();
2571
2572                let host_buffer_remaining_before =
2573                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2574
2575                let (result, cancelled) = future::poll_fn(|cx| {
2576                    tls::get(|store| {
2577                        let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2578                            &ReadState::HostReady { cancel, .. } => cancel,
2579                            ReadState::Open => false,
2580                            _ => unreachable!(),
2581                        };
2582
2583                        let poll = mine.as_mut().poll_consume(
2584                            cx,
2585                            token.as_context_mut(store),
2586                            Source {
2587                                id,
2588                                host_buffer: host_buffer.as_deref_mut(),
2589                            },
2590                            cancel,
2591                        );
2592
2593                        if let ReadState::HostReady {
2594                            cancel_waker,
2595                            cancel,
2596                            ..
2597                        } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2598                        {
2599                            if poll.is_pending() {
2600                                *cancel_waker = Some(cx.waker().clone());
2601                            } else {
2602                                *cancel_waker = None;
2603                                *cancel = false;
2604                            }
2605                        }
2606
2607                        poll.map(|v| v.map(|result| (result, cancel)))
2608                    })
2609                })
2610                .await?;
2611
2612                let (guest_offset, count) = tls::get(|store| {
2613                    let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2614                    (
2615                        match &transmit.read {
2616                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2617                            ReadState::Open => 0,
2618                            _ => unreachable!(),
2619                        },
2620                        match &transmit.write {
2621                            &WriteState::GuestReady { count, .. } => count,
2622                            WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2623                            _ => unreachable!(),
2624                        },
2625                    )
2626                });
2627
2628                match result {
2629                    StreamResult::Completed => {
2630                        if count > 0
2631                            && guest_offset == 0
2632                            && host_buffer_remaining_before
2633                                .zip(host_buffer.map(|v| v.remaining().len()))
2634                                .map(|(before, after)| before == after)
2635                                .unwrap_or(false)
2636                        {
2637                            bail!(
2638                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2639                                 without consuming any items"
2640                            );
2641                        }
2642
2643                        if let TransmitKind::Future = kind {
2644                            tls::get(|store| {
2645                                store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2646                            });
2647                        }
2648                    }
2649                    StreamResult::Cancelled => {
2650                        if !cancelled {
2651                            bail!(
2652                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2653                                 without being given a `finish` parameter value of true"
2654                            );
2655                        }
2656                    }
2657                    StreamResult::Dropped => {}
2658                }
2659
2660                *consumer.lock().unwrap() = Some(mine);
2661
2662                Ok(result)
2663            }
2664        };
2665        let consume = {
2666            let consume = consume_with_buffer.clone();
2667            Box::new(move || {
2668                let consume = consume.clone();
2669                async move { consume(None).await }.boxed()
2670            })
2671        };
2672
2673        match &transmit.write {
2674            WriteState::Open => {
2675                transmit.read = ReadState::HostReady {
2676                    consume,
2677                    guest_offset: 0,
2678                    cancel: false,
2679                    cancel_waker: None,
2680                };
2681            }
2682            &WriteState::GuestReady { .. } => {
2683                let future = consume();
2684                transmit.read = ReadState::HostReady {
2685                    consume,
2686                    guest_offset: 0,
2687                    cancel: false,
2688                    cancel_waker: None,
2689                };
2690                self.0.pipe_from_guest(kind, id, future);
2691            }
2692            WriteState::HostReady { .. } => {
2693                let WriteState::HostReady { produce, .. } = mem::replace(
2694                    &mut transmit.write,
2695                    WriteState::HostReady {
2696                        produce: Box::new(|| unreachable!()),
2697                        try_into: Box::new(|_| unreachable!()),
2698                        guest_offset: 0,
2699                        cancel: false,
2700                        cancel_waker: None,
2701                    },
2702                ) else {
2703                    unreachable!();
2704                };
2705
2706                transmit.read = ReadState::HostToHost {
2707                    accept: Box::new(move |input| {
2708                        let consume = consume_with_buffer.clone();
2709                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2710                    }),
2711                    buffer: Vec::new(),
2712                    limit: 0,
2713                };
2714
2715                let future = async move {
2716                    loop {
2717                        if tls::get(|store| {
2718                            anyhow::Ok(matches!(
2719                                store.concurrent_state_mut().get_mut(id)?.read,
2720                                ReadState::Dropped
2721                            ))
2722                        })? {
2723                            break Ok(());
2724                        }
2725
2726                        match produce().await? {
2727                            StreamResult::Completed | StreamResult::Cancelled => {}
2728                            StreamResult::Dropped => break Ok(()),
2729                        }
2730
2731                        if let TransmitKind::Future = kind {
2732                            break Ok(());
2733                        }
2734                    }
2735                }
2736                .map(move |result| {
2737                    tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2738                    result
2739                });
2740
2741                state.push_future(Box::pin(future));
2742            }
2743            WriteState::Dropped => {
2744                let reader = transmit.read_handle;
2745                self.0.host_drop_reader(reader, kind).unwrap();
2746            }
2747        }
2748    }
2749}
2750
2751async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2752    token: StoreToken<D>,
2753    id: TableId<TransmitState>,
2754    pair: Arc<Mutex<Option<(P, B)>>>,
2755    kind: TransmitKind,
2756) -> Result<()> {
2757    let (read, guest_offset) = tls::get(|store| {
2758        let transmit = store.concurrent_state_mut().get_mut(id)?;
2759
2760        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2761            Some(guest_offset)
2762        } else {
2763            None
2764        };
2765
2766        anyhow::Ok((
2767            mem::replace(&mut transmit.read, ReadState::Open),
2768            guest_offset,
2769        ))
2770    })?;
2771
2772    match read {
2773        ReadState::GuestReady {
2774            ty,
2775            flat_abi,
2776            options,
2777            address,
2778            count,
2779            handle,
2780            instance,
2781            caller,
2782        } => {
2783            let guest_offset = guest_offset.unwrap();
2784
2785            if let TransmitKind::Future = kind {
2786                tls::get(|store| {
2787                    store.concurrent_state_mut().get_mut(id)?.done = true;
2788                    anyhow::Ok(())
2789                })?;
2790            }
2791
2792            let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2793            let accept = {
2794                let pair = pair.clone();
2795                move |mut store: StoreContextMut<D>| {
2796                    lower::<T, B, D>(
2797                        store.as_context_mut(),
2798                        instance,
2799                        options,
2800                        ty,
2801                        address + (T::SIZE32 * guest_offset),
2802                        count - guest_offset,
2803                        &mut pair.lock().unwrap().as_mut().unwrap().1,
2804                    )?;
2805                    anyhow::Ok(())
2806                }
2807            };
2808
2809            if guest_offset < count {
2810                if T::MAY_REQUIRE_REALLOC {
2811                    // For payloads which may require a realloc call, use a
2812                    // oneshot::channel and background task.  This is
2813                    // necessary because calling the guest while there are
2814                    // host embedder frames on the stack is unsound.
2815                    let (tx, rx) = oneshot::channel();
2816                    tls::get(move |store| {
2817                        store
2818                            .concurrent_state_mut()
2819                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2820                                move |store| {
2821                                    _ = tx.send(accept(token.as_context_mut(store))?);
2822                                    Ok(())
2823                                },
2824                            ))))
2825                    });
2826                    rx.await?
2827                } else {
2828                    // Optimize flat payloads (i.e. those which do not
2829                    // require calling the guest's realloc function) by
2830                    // lowering directly instead of using a oneshot::channel
2831                    // and background task.
2832                    tls::get(|store| accept(token.as_context_mut(store)))?
2833                };
2834            }
2835
2836            tls::get(|store| {
2837                let count =
2838                    old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2839
2840                let transmit = store.concurrent_state_mut().get_mut(id)?;
2841
2842                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2843                    unreachable!();
2844                };
2845
2846                *guest_offset += count;
2847
2848                transmit.read = ReadState::GuestReady {
2849                    ty,
2850                    flat_abi,
2851                    options,
2852                    address,
2853                    count,
2854                    handle,
2855                    instance,
2856                    caller,
2857                };
2858
2859                anyhow::Ok(())
2860            })?;
2861
2862            Ok(())
2863        }
2864
2865        ReadState::HostToHost {
2866            accept,
2867            mut buffer,
2868            limit,
2869        } => {
2870            let mut state = StreamResult::Completed;
2871            let mut position = 0;
2872
2873            while !matches!(state, StreamResult::Dropped) && position < limit {
2874                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2875                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2876                (buffer, position, _) = slice_buffer.into_parts();
2877            }
2878
2879            {
2880                let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2881
2882                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2883                    state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2884                }
2885
2886                *pair.lock().unwrap() = Some((mine, buffer));
2887            }
2888
2889            tls::get(|store| {
2890                store.concurrent_state_mut().get_mut(id)?.read = match state {
2891                    StreamResult::Dropped => ReadState::Dropped,
2892                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2893                        accept,
2894                        buffer,
2895                        limit: 0,
2896                    },
2897                };
2898
2899                anyhow::Ok(())
2900            })?;
2901            Ok(())
2902        }
2903
2904        _ => unreachable!(),
2905    }
2906}
2907
2908impl Instance {
2909    /// Handle a host- or guest-initiated write by delivering the item(s) to the
2910    /// `StreamConsumer` for the specified stream or future.
2911    fn consume(
2912        self,
2913        store: &mut dyn VMStore,
2914        kind: TransmitKind,
2915        transmit_id: TableId<TransmitState>,
2916        consume: PollStream,
2917        guest_offset: usize,
2918        cancel: bool,
2919    ) -> Result<ReturnCode> {
2920        let mut future = consume();
2921        store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2922            consume,
2923            guest_offset,
2924            cancel,
2925            cancel_waker: None,
2926        };
2927        let poll = tls::set(store, || {
2928            future
2929                .as_mut()
2930                .poll(&mut Context::from_waker(&Waker::noop()))
2931        });
2932
2933        Ok(match poll {
2934            Poll::Ready(state) => {
2935                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2936                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2937                    unreachable!();
2938                };
2939                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2940                transmit.write = WriteState::Open;
2941                code
2942            }
2943            Poll::Pending => {
2944                store.pipe_from_guest(kind, transmit_id, future);
2945                ReturnCode::Blocked
2946            }
2947        })
2948    }
2949
2950    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
2951    /// for the specified stream or future for items.
2952    fn produce(
2953        self,
2954        store: &mut dyn VMStore,
2955        kind: TransmitKind,
2956        transmit_id: TableId<TransmitState>,
2957        produce: PollStream,
2958        try_into: TryInto,
2959        guest_offset: usize,
2960        cancel: bool,
2961    ) -> Result<ReturnCode> {
2962        let mut future = produce();
2963        store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2964            produce,
2965            try_into,
2966            guest_offset,
2967            cancel,
2968            cancel_waker: None,
2969        };
2970        let poll = tls::set(store, || {
2971            future
2972                .as_mut()
2973                .poll(&mut Context::from_waker(&Waker::noop()))
2974        });
2975
2976        Ok(match poll {
2977            Poll::Ready(state) => {
2978                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2979                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2980                    unreachable!();
2981                };
2982                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2983                transmit.read = ReadState::Open;
2984                code
2985            }
2986            Poll::Pending => {
2987                store.pipe_to_guest(kind, transmit_id, future);
2988                ReturnCode::Blocked
2989            }
2990        })
2991    }
2992
2993    /// Drop the writable end of the specified stream or future from the guest.
2994    pub(super) fn guest_drop_writable(
2995        self,
2996        store: &mut StoreOpaque,
2997        ty: TransmitIndex,
2998        writer: u32,
2999    ) -> Result<()> {
3000        let table = self.id().get_mut(store).table_for_transmit(ty);
3001        let transmit_rep = match ty {
3002            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3003            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3004        };
3005
3006        let id = TableId::<TransmitHandle>::new(transmit_rep);
3007        log::trace!("guest_drop_writable: drop writer {id:?}");
3008        match ty {
3009            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3010            TransmitIndex::Future(_) => store.host_drop_writer(
3011                id,
3012                Some(|| {
3013                    Err(anyhow!(
3014                        "cannot drop future write end without first writing a value"
3015                    ))
3016                }),
3017            ),
3018        }
3019    }
3020
3021    /// Copy `count` items from `read_address` to `write_address` for the
3022    /// specified stream or future.
3023    fn copy<T: 'static>(
3024        self,
3025        mut store: StoreContextMut<T>,
3026        flat_abi: Option<FlatAbi>,
3027        write_caller: RuntimeComponentInstanceIndex,
3028        write_ty: TransmitIndex,
3029        write_options: OptionsIndex,
3030        write_address: usize,
3031        read_caller: RuntimeComponentInstanceIndex,
3032        read_ty: TransmitIndex,
3033        read_options: OptionsIndex,
3034        read_address: usize,
3035        count: usize,
3036        rep: u32,
3037    ) -> Result<()> {
3038        let types = self.id().get(store.0).component().types();
3039        match (write_ty, read_ty) {
3040            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3041                assert_eq!(count, 1);
3042
3043                let payload = types[types[write_ty].ty].payload;
3044
3045                if write_caller == read_caller && !allow_intra_component_read_write(payload) {
3046                    bail!(
3047                        "cannot read from and write to intra-component future with non-numeric payload"
3048                    )
3049                }
3050
3051                let val = payload
3052                    .map(|ty| {
3053                        let lift =
3054                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3055
3056                        let abi = lift.types.canonical_abi(&ty);
3057                        // FIXME: needs to read an i64 for memory64
3058                        if write_address % usize::try_from(abi.align32)? != 0 {
3059                            bail!("write pointer not aligned");
3060                        }
3061
3062                        let bytes = lift
3063                            .memory()
3064                            .get(write_address..)
3065                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
3066                            .ok_or_else(|| {
3067                                anyhow::anyhow!("write pointer out of bounds of memory")
3068                            })?;
3069
3070                        Val::load(lift, ty, bytes)
3071                    })
3072                    .transpose()?;
3073
3074                if let Some(val) = val {
3075                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3076                    let types = lower.types;
3077                    let ty = types[types[read_ty].ty].payload.unwrap();
3078                    let ptr = func::validate_inbounds_dynamic(
3079                        types.canonical_abi(&ty),
3080                        lower.as_slice_mut(),
3081                        &ValRaw::u32(read_address.try_into().unwrap()),
3082                    )?;
3083                    val.store(lower, ty, ptr)?;
3084                }
3085            }
3086            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3087                if write_caller == read_caller
3088                    && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3089                {
3090                    bail!(
3091                        "cannot read from and write to intra-component stream with non-numeric payload"
3092                    )
3093                }
3094
3095                if let Some(flat_abi) = flat_abi {
3096                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
3097                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
3098                    if length_in_bytes > 0 {
3099                        if write_address % usize::try_from(flat_abi.align)? != 0 {
3100                            bail!("write pointer not aligned");
3101                        }
3102                        if read_address % usize::try_from(flat_abi.align)? != 0 {
3103                            bail!("read pointer not aligned");
3104                        }
3105
3106                        let store_opaque = store.0.store_opaque_mut();
3107
3108                        {
3109                            let src = self
3110                                .options_memory(store_opaque, write_options)
3111                                .get(write_address..)
3112                                .and_then(|b| b.get(..length_in_bytes))
3113                                .ok_or_else(|| {
3114                                    anyhow::anyhow!("write pointer out of bounds of memory")
3115                                })?
3116                                .as_ptr();
3117                            let dst = self
3118                                .options_memory_mut(store_opaque, read_options)
3119                                .get_mut(read_address..)
3120                                .and_then(|b| b.get_mut(..length_in_bytes))
3121                                .ok_or_else(|| {
3122                                    anyhow::anyhow!("read pointer out of bounds of memory")
3123                                })?
3124                                .as_mut_ptr();
3125                            // SAFETY: Both `src` and `dst` have been validated
3126                            // above.
3127                            unsafe {
3128                                if write_caller == read_caller {
3129                                    // If the same instance owns both ends of
3130                                    // the stream, the source and destination
3131                                    // buffers might overlap.
3132                                    src.copy_to(dst, length_in_bytes)
3133                                } else {
3134                                    // Since the read and write ends of the
3135                                    // stream are owned by distinct instances,
3136                                    // the buffers cannot possibly belong to the
3137                                    // same memory and thus cannot overlap.
3138                                    src.copy_to_nonoverlapping(dst, length_in_bytes)
3139                                }
3140                            }
3141                        }
3142                    }
3143                } else {
3144                    let store_opaque = store.0.store_opaque_mut();
3145                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
3146                    let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3147                    let abi = lift.types.canonical_abi(&ty);
3148                    let size = usize::try_from(abi.size32).unwrap();
3149                    if write_address % usize::try_from(abi.align32)? != 0 {
3150                        bail!("write pointer not aligned");
3151                    }
3152                    let bytes = lift
3153                        .memory()
3154                        .get(write_address..)
3155                        .and_then(|b| b.get(..size * count))
3156                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
3157
3158                    let values = (0..count)
3159                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3160                        .collect::<Result<Vec<_>>>()?;
3161
3162                    let id = TableId::<TransmitHandle>::new(rep);
3163                    log::trace!("copy values {values:?} for {id:?}");
3164
3165                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3166                    let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3167                    let abi = lower.types.canonical_abi(&ty);
3168                    if read_address % usize::try_from(abi.align32)? != 0 {
3169                        bail!("read pointer not aligned");
3170                    }
3171                    let size = usize::try_from(abi.size32).unwrap();
3172                    lower
3173                        .as_slice_mut()
3174                        .get_mut(read_address..)
3175                        .and_then(|b| b.get_mut(..size * count))
3176                        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
3177                    let mut ptr = read_address;
3178                    for value in values {
3179                        value.store(lower, ty, ptr)?;
3180                        ptr += size
3181                    }
3182                }
3183            }
3184            _ => unreachable!(),
3185        }
3186
3187        Ok(())
3188    }
3189
3190    fn check_bounds(
3191        self,
3192        store: &StoreOpaque,
3193        options: OptionsIndex,
3194        ty: TransmitIndex,
3195        address: usize,
3196        count: usize,
3197    ) -> Result<()> {
3198        let types = self.id().get(store).component().types();
3199        let size = usize::try_from(
3200            match ty {
3201                TransmitIndex::Future(ty) => types[types[ty].ty]
3202                    .payload
3203                    .map(|ty| types.canonical_abi(&ty).size32),
3204                TransmitIndex::Stream(ty) => types[types[ty].ty]
3205                    .payload
3206                    .map(|ty| types.canonical_abi(&ty).size32),
3207            }
3208            .unwrap_or(0),
3209        )
3210        .unwrap();
3211
3212        if count > 0 && size > 0 {
3213            self.options_memory(store, options)
3214                .get(address..)
3215                .and_then(|b| b.get(..(size * count)))
3216                .map(drop)
3217                .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
3218        } else {
3219            Ok(())
3220        }
3221    }
3222
3223    /// Write to the specified stream or future from the guest.
3224    pub(super) fn guest_write<T: 'static>(
3225        self,
3226        mut store: StoreContextMut<T>,
3227        caller: RuntimeComponentInstanceIndex,
3228        ty: TransmitIndex,
3229        options: OptionsIndex,
3230        flat_abi: Option<FlatAbi>,
3231        handle: u32,
3232        address: u32,
3233        count: u32,
3234    ) -> Result<ReturnCode> {
3235        if !self.options(store.0, options).async_ {
3236            // The caller may only sync call `{stream,future}.write` from an
3237            // async task (i.e. a task created via a call to an async export).
3238            // Otherwise, we'll trap.
3239            store.0.concurrent_state_mut().check_blocking()?;
3240        }
3241
3242        let address = usize::try_from(address).unwrap();
3243        let count = usize::try_from(count).unwrap();
3244        self.check_bounds(store.0, options, ty, address, count)?;
3245        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3246        let TransmitLocalState::Write { done } = *state else {
3247            bail!(
3248                "invalid handle {handle}; expected `Write`; got {:?}",
3249                *state
3250            );
3251        };
3252
3253        if done {
3254            bail!("cannot write to stream after being notified that the readable end dropped");
3255        }
3256
3257        *state = TransmitLocalState::Busy;
3258        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3259        let concurrent_state = store.0.concurrent_state_mut();
3260        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3261        let transmit = concurrent_state.get_mut(transmit_id)?;
3262        log::trace!(
3263            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3264            transmit.read
3265        );
3266
3267        if transmit.done {
3268            bail!("cannot write to future after previous write succeeded or readable end dropped");
3269        }
3270
3271        let new_state = if let ReadState::Dropped = &transmit.read {
3272            ReadState::Dropped
3273        } else {
3274            ReadState::Open
3275        };
3276
3277        let set_guest_ready = |me: &mut ConcurrentState| {
3278            let transmit = me.get_mut(transmit_id)?;
3279            assert!(
3280                matches!(&transmit.write, WriteState::Open),
3281                "expected `WriteState::Open`; got `{:?}`",
3282                transmit.write
3283            );
3284            transmit.write = WriteState::GuestReady {
3285                instance: self,
3286                caller,
3287                ty,
3288                flat_abi,
3289                options,
3290                address,
3291                count,
3292                handle,
3293            };
3294            Ok::<_, crate::Error>(())
3295        };
3296
3297        let mut result = match mem::replace(&mut transmit.read, new_state) {
3298            ReadState::GuestReady {
3299                ty: read_ty,
3300                flat_abi: read_flat_abi,
3301                options: read_options,
3302                address: read_address,
3303                count: read_count,
3304                handle: read_handle,
3305                instance: read_instance,
3306                caller: read_caller,
3307            } => {
3308                assert_eq!(flat_abi, read_flat_abi);
3309
3310                if let TransmitIndex::Future(_) = ty {
3311                    transmit.done = true;
3312                }
3313
3314                // Note that zero-length reads and writes are handling specially
3315                // by the spec to allow each end to signal readiness to the
3316                // other.  Quoting the spec:
3317                //
3318                // ```
3319                // The meaning of a read or write when the length is 0 is that
3320                // the caller is querying the "readiness" of the other
3321                // side. When a 0-length read/write rendezvous with a
3322                // non-0-length read/write, only the 0-length read/write
3323                // completes; the non-0-length read/write is kept pending (and
3324                // ready for a subsequent rendezvous).
3325                //
3326                // In the corner case where a 0-length read and write
3327                // rendezvous, only the writer is notified of readiness. To
3328                // avoid livelock, the Canonical ABI requires that a writer must
3329                // (eventually) follow a completed 0-length write with a
3330                // non-0-length write that is allowed to block (allowing the
3331                // reader end to run and rendezvous with its own non-0-length
3332                // read).
3333                // ```
3334
3335                let write_complete = count == 0 || read_count > 0;
3336                let read_complete = count > 0;
3337                let read_buffer_remaining = count < read_count;
3338
3339                let read_handle_rep = transmit.read_handle.rep();
3340
3341                let count = count.min(read_count);
3342
3343                self.copy(
3344                    store.as_context_mut(),
3345                    flat_abi,
3346                    caller,
3347                    ty,
3348                    options,
3349                    address,
3350                    read_caller,
3351                    read_ty,
3352                    read_options,
3353                    read_address,
3354                    count,
3355                    rep,
3356                )?;
3357
3358                let instance = self.id().get_mut(store.0);
3359                let types = instance.component().types();
3360                let item_size = payload(ty, types)
3361                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3362                    .unwrap_or(0);
3363                let concurrent_state = store.0.concurrent_state_mut();
3364                if read_complete {
3365                    let count = u32::try_from(count).unwrap();
3366                    let total = if let Some(Event::StreamRead {
3367                        code: ReturnCode::Completed(old_total),
3368                        ..
3369                    }) = concurrent_state.take_event(read_handle_rep)?
3370                    {
3371                        count + old_total
3372                    } else {
3373                        count
3374                    };
3375
3376                    let code = ReturnCode::completed(ty.kind(), total);
3377
3378                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3379                }
3380
3381                if read_buffer_remaining {
3382                    let transmit = concurrent_state.get_mut(transmit_id)?;
3383                    transmit.read = ReadState::GuestReady {
3384                        ty: read_ty,
3385                        flat_abi: read_flat_abi,
3386                        options: read_options,
3387                        address: read_address + (count * item_size),
3388                        count: read_count - count,
3389                        handle: read_handle,
3390                        instance: read_instance,
3391                        caller: read_caller,
3392                    };
3393                }
3394
3395                if write_complete {
3396                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3397                } else {
3398                    set_guest_ready(concurrent_state)?;
3399                    ReturnCode::Blocked
3400                }
3401            }
3402
3403            ReadState::HostReady {
3404                consume,
3405                guest_offset,
3406                cancel,
3407                cancel_waker,
3408            } => {
3409                assert!(cancel_waker.is_none());
3410                assert!(!cancel);
3411                assert_eq!(0, guest_offset);
3412
3413                if let TransmitIndex::Future(_) = ty {
3414                    transmit.done = true;
3415                }
3416
3417                set_guest_ready(concurrent_state)?;
3418                self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3419            }
3420
3421            ReadState::HostToHost { .. } => unreachable!(),
3422
3423            ReadState::Open => {
3424                set_guest_ready(concurrent_state)?;
3425                ReturnCode::Blocked
3426            }
3427
3428            ReadState::Dropped => {
3429                if let TransmitIndex::Future(_) = ty {
3430                    transmit.done = true;
3431                }
3432
3433                ReturnCode::Dropped(0)
3434            }
3435        };
3436
3437        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3438            result = self.wait_for_write(store.0, transmit_handle)?;
3439        }
3440
3441        if result != ReturnCode::Blocked {
3442            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3443                TransmitLocalState::Write {
3444                    done: matches!(
3445                        (result, ty),
3446                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3447                    ),
3448                };
3449        }
3450
3451        log::trace!(
3452            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3453        );
3454
3455        Ok(result)
3456    }
3457
3458    /// Read from the specified stream or future from the guest.
3459    pub(super) fn guest_read<T: 'static>(
3460        self,
3461        mut store: StoreContextMut<T>,
3462        caller: RuntimeComponentInstanceIndex,
3463        ty: TransmitIndex,
3464        options: OptionsIndex,
3465        flat_abi: Option<FlatAbi>,
3466        handle: u32,
3467        address: u32,
3468        count: u32,
3469    ) -> Result<ReturnCode> {
3470        if !self.options(store.0, options).async_ {
3471            // The caller may only sync call `{stream,future}.read` from an
3472            // async task (i.e. a task created via a call to an async export).
3473            // Otherwise, we'll trap.
3474            store.0.concurrent_state_mut().check_blocking()?;
3475        }
3476
3477        let address = usize::try_from(address).unwrap();
3478        let count = usize::try_from(count).unwrap();
3479        self.check_bounds(store.0, options, ty, address, count)?;
3480        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3481        let TransmitLocalState::Read { done } = *state else {
3482            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3483        };
3484
3485        if done {
3486            bail!("cannot read from stream after being notified that the writable end dropped");
3487        }
3488
3489        *state = TransmitLocalState::Busy;
3490        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3491        let concurrent_state = store.0.concurrent_state_mut();
3492        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3493        let transmit = concurrent_state.get_mut(transmit_id)?;
3494        log::trace!(
3495            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3496            transmit.write
3497        );
3498
3499        if transmit.done {
3500            bail!("cannot read from future after previous read succeeded");
3501        }
3502
3503        let new_state = if let WriteState::Dropped = &transmit.write {
3504            WriteState::Dropped
3505        } else {
3506            WriteState::Open
3507        };
3508
3509        let set_guest_ready = |me: &mut ConcurrentState| {
3510            let transmit = me.get_mut(transmit_id)?;
3511            assert!(
3512                matches!(&transmit.read, ReadState::Open),
3513                "expected `ReadState::Open`; got `{:?}`",
3514                transmit.read
3515            );
3516            transmit.read = ReadState::GuestReady {
3517                ty,
3518                flat_abi,
3519                options,
3520                address,
3521                count,
3522                handle,
3523                instance: self,
3524                caller,
3525            };
3526            Ok::<_, crate::Error>(())
3527        };
3528
3529        let mut result = match mem::replace(&mut transmit.write, new_state) {
3530            WriteState::GuestReady {
3531                instance: _,
3532                ty: write_ty,
3533                flat_abi: write_flat_abi,
3534                options: write_options,
3535                address: write_address,
3536                count: write_count,
3537                handle: write_handle,
3538                caller: write_caller,
3539            } => {
3540                assert_eq!(flat_abi, write_flat_abi);
3541
3542                if let TransmitIndex::Future(_) = ty {
3543                    transmit.done = true;
3544                }
3545
3546                let write_handle_rep = transmit.write_handle.rep();
3547
3548                // See the comment in `guest_write` for the
3549                // `ReadState::GuestReady` case concerning zero-length reads and
3550                // writes.
3551
3552                let write_complete = write_count == 0 || count > 0;
3553                let read_complete = write_count > 0;
3554                let write_buffer_remaining = count < write_count;
3555
3556                let count = count.min(write_count);
3557
3558                self.copy(
3559                    store.as_context_mut(),
3560                    flat_abi,
3561                    write_caller,
3562                    write_ty,
3563                    write_options,
3564                    write_address,
3565                    caller,
3566                    ty,
3567                    options,
3568                    address,
3569                    count,
3570                    rep,
3571                )?;
3572
3573                let instance = self.id().get_mut(store.0);
3574                let types = instance.component().types();
3575                let item_size = payload(ty, types)
3576                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3577                    .unwrap_or(0);
3578                let concurrent_state = store.0.concurrent_state_mut();
3579
3580                if write_complete {
3581                    let count = u32::try_from(count).unwrap();
3582                    let total = if let Some(Event::StreamWrite {
3583                        code: ReturnCode::Completed(old_total),
3584                        ..
3585                    }) = concurrent_state.take_event(write_handle_rep)?
3586                    {
3587                        count + old_total
3588                    } else {
3589                        count
3590                    };
3591
3592                    let code = ReturnCode::completed(ty.kind(), total);
3593
3594                    concurrent_state.send_write_result(
3595                        write_ty,
3596                        transmit_id,
3597                        write_handle,
3598                        code,
3599                    )?;
3600                }
3601
3602                if write_buffer_remaining {
3603                    let transmit = concurrent_state.get_mut(transmit_id)?;
3604                    transmit.write = WriteState::GuestReady {
3605                        instance: self,
3606                        caller: write_caller,
3607                        ty: write_ty,
3608                        flat_abi: write_flat_abi,
3609                        options: write_options,
3610                        address: write_address + (count * item_size),
3611                        count: write_count - count,
3612                        handle: write_handle,
3613                    };
3614                }
3615
3616                if read_complete {
3617                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3618                } else {
3619                    set_guest_ready(concurrent_state)?;
3620                    ReturnCode::Blocked
3621                }
3622            }
3623
3624            WriteState::HostReady {
3625                produce,
3626                try_into,
3627                guest_offset,
3628                cancel,
3629                cancel_waker,
3630            } => {
3631                assert!(cancel_waker.is_none());
3632                assert!(!cancel);
3633                assert_eq!(0, guest_offset);
3634
3635                set_guest_ready(concurrent_state)?;
3636
3637                let code =
3638                    self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3639
3640                if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3641                    store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3642                }
3643
3644                code
3645            }
3646
3647            WriteState::Open => {
3648                set_guest_ready(concurrent_state)?;
3649                ReturnCode::Blocked
3650            }
3651
3652            WriteState::Dropped => ReturnCode::Dropped(0),
3653        };
3654
3655        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3656            result = self.wait_for_read(store.0, transmit_handle)?;
3657        }
3658
3659        if result != ReturnCode::Blocked {
3660            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3661                TransmitLocalState::Read {
3662                    done: matches!(
3663                        (result, ty),
3664                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3665                    ),
3666                };
3667        }
3668
3669        log::trace!(
3670            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3671        );
3672
3673        Ok(result)
3674    }
3675
3676    fn wait_for_write(
3677        self,
3678        store: &mut StoreOpaque,
3679        handle: TableId<TransmitHandle>,
3680    ) -> Result<ReturnCode> {
3681        let waitable = Waitable::Transmit(handle);
3682        store.wait_for_event(waitable)?;
3683        let event = waitable.take_event(store.concurrent_state_mut())?;
3684        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3685            event
3686        {
3687            waitable.on_delivery(store, self, event);
3688            Ok(code)
3689        } else {
3690            unreachable!()
3691        }
3692    }
3693
3694    /// Cancel a pending stream or future write.
3695    fn cancel_write(
3696        self,
3697        store: &mut StoreOpaque,
3698        transmit_id: TableId<TransmitState>,
3699        async_: bool,
3700    ) -> Result<ReturnCode> {
3701        let state = store.concurrent_state_mut();
3702        let transmit = state.get_mut(transmit_id)?;
3703        log::trace!(
3704            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3705            transmit.read,
3706            transmit.write
3707        );
3708
3709        let code = if let Some(event) =
3710            Waitable::Transmit(transmit.write_handle).take_event(state)?
3711        {
3712            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3713                unreachable!();
3714            };
3715            match (code, event) {
3716                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3717                    ReturnCode::Cancelled(count)
3718                }
3719                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3720                _ => unreachable!(),
3721            }
3722        } else if let ReadState::HostReady {
3723            cancel,
3724            cancel_waker,
3725            ..
3726        } = &mut state.get_mut(transmit_id)?.read
3727        {
3728            *cancel = true;
3729            if let Some(waker) = cancel_waker.take() {
3730                waker.wake();
3731            }
3732
3733            if async_ {
3734                ReturnCode::Blocked
3735            } else {
3736                let handle = store
3737                    .concurrent_state_mut()
3738                    .get_mut(transmit_id)?
3739                    .write_handle;
3740                self.wait_for_write(store, handle)?
3741            }
3742        } else {
3743            ReturnCode::Cancelled(0)
3744        };
3745
3746        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3747
3748        match &transmit.write {
3749            WriteState::GuestReady { .. } => {
3750                transmit.write = WriteState::Open;
3751            }
3752            WriteState::HostReady { .. } => todo!("support host write cancellation"),
3753            WriteState::Open | WriteState::Dropped => {}
3754        }
3755
3756        log::trace!("cancelled write {transmit_id:?}: {code:?}");
3757
3758        Ok(code)
3759    }
3760
3761    fn wait_for_read(
3762        self,
3763        store: &mut StoreOpaque,
3764        handle: TableId<TransmitHandle>,
3765    ) -> Result<ReturnCode> {
3766        let waitable = Waitable::Transmit(handle);
3767        store.wait_for_event(waitable)?;
3768        let event = waitable.take_event(store.concurrent_state_mut())?;
3769        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3770            event
3771        {
3772            waitable.on_delivery(store, self, event);
3773            Ok(code)
3774        } else {
3775            unreachable!()
3776        }
3777    }
3778
3779    /// Cancel a pending stream or future read.
3780    fn cancel_read(
3781        self,
3782        store: &mut StoreOpaque,
3783        transmit_id: TableId<TransmitState>,
3784        async_: bool,
3785    ) -> Result<ReturnCode> {
3786        let state = store.concurrent_state_mut();
3787        let transmit = state.get_mut(transmit_id)?;
3788        log::trace!(
3789            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3790            transmit.read,
3791            transmit.write
3792        );
3793
3794        let code = if let Some(event) =
3795            Waitable::Transmit(transmit.read_handle).take_event(state)?
3796        {
3797            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3798                unreachable!();
3799            };
3800            match (code, event) {
3801                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3802                    ReturnCode::Cancelled(count)
3803                }
3804                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3805                _ => unreachable!(),
3806            }
3807        } else if let WriteState::HostReady {
3808            cancel,
3809            cancel_waker,
3810            ..
3811        } = &mut state.get_mut(transmit_id)?.write
3812        {
3813            *cancel = true;
3814            if let Some(waker) = cancel_waker.take() {
3815                waker.wake();
3816            }
3817
3818            if async_ {
3819                ReturnCode::Blocked
3820            } else {
3821                let handle = store
3822                    .concurrent_state_mut()
3823                    .get_mut(transmit_id)?
3824                    .read_handle;
3825                self.wait_for_read(store, handle)?
3826            }
3827        } else {
3828            ReturnCode::Cancelled(0)
3829        };
3830
3831        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3832
3833        match &transmit.read {
3834            ReadState::GuestReady { .. } => {
3835                transmit.read = ReadState::Open;
3836            }
3837            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3838                todo!("support host read cancellation")
3839            }
3840            ReadState::Open | ReadState::Dropped => {}
3841        }
3842
3843        log::trace!("cancelled read {transmit_id:?}: {code:?}");
3844
3845        Ok(code)
3846    }
3847
3848    /// Cancel a pending write for the specified stream or future from the guest.
3849    fn guest_cancel_write(
3850        self,
3851        store: &mut StoreOpaque,
3852        ty: TransmitIndex,
3853        async_: bool,
3854        writer: u32,
3855    ) -> Result<ReturnCode> {
3856        if !async_ {
3857            // The caller may only sync call `{stream,future}.cancel-write` from
3858            // an async task (i.e. a task created via a call to an async
3859            // export).  Otherwise, we'll trap.
3860            store.concurrent_state_mut().check_blocking()?;
3861        }
3862
3863        let (rep, state) =
3864            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3865        let id = TableId::<TransmitHandle>::new(rep);
3866        log::trace!("guest cancel write {id:?} (handle {writer})");
3867        match state {
3868            TransmitLocalState::Write { .. } => {
3869                bail!("stream or future write cancelled when no write is pending")
3870            }
3871            TransmitLocalState::Read { .. } => {
3872                bail!("passed read end to `{{stream|future}}.cancel-write`")
3873            }
3874            TransmitLocalState::Busy => {}
3875        }
3876        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3877        let code = self.cancel_write(store, transmit_id, async_)?;
3878        if !matches!(code, ReturnCode::Blocked) {
3879            let state =
3880                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3881                    .1;
3882            if let TransmitLocalState::Busy = state {
3883                *state = TransmitLocalState::Write { done: false };
3884            }
3885        }
3886        Ok(code)
3887    }
3888
3889    /// Cancel a pending read for the specified stream or future from the guest.
3890    fn guest_cancel_read(
3891        self,
3892        store: &mut StoreOpaque,
3893        ty: TransmitIndex,
3894        async_: bool,
3895        reader: u32,
3896    ) -> Result<ReturnCode> {
3897        if !async_ {
3898            // The caller may only sync call `{stream,future}.cancel-read` from
3899            // an async task (i.e. a task created via a call to an async
3900            // export).  Otherwise, we'll trap.
3901            store.concurrent_state_mut().check_blocking()?;
3902        }
3903
3904        let (rep, state) =
3905            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3906        let id = TableId::<TransmitHandle>::new(rep);
3907        log::trace!("guest cancel read {id:?} (handle {reader})");
3908        match state {
3909            TransmitLocalState::Read { .. } => {
3910                bail!("stream or future read cancelled when no read is pending")
3911            }
3912            TransmitLocalState::Write { .. } => {
3913                bail!("passed write end to `{{stream|future}}.cancel-read`")
3914            }
3915            TransmitLocalState::Busy => {}
3916        }
3917        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3918        let code = self.cancel_read(store, transmit_id, async_)?;
3919        if !matches!(code, ReturnCode::Blocked) {
3920            let state =
3921                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3922                    .1;
3923            if let TransmitLocalState::Busy = state {
3924                *state = TransmitLocalState::Read { done: false };
3925            }
3926        }
3927        Ok(code)
3928    }
3929
3930    /// Drop the readable end of the specified stream or future from the guest.
3931    fn guest_drop_readable(
3932        self,
3933        store: &mut StoreOpaque,
3934        ty: TransmitIndex,
3935        reader: u32,
3936    ) -> Result<()> {
3937        let table = self.id().get_mut(store).table_for_transmit(ty);
3938        let (rep, _is_done) = match ty {
3939            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3940            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3941        };
3942        let kind = match ty {
3943            TransmitIndex::Stream(_) => TransmitKind::Stream,
3944            TransmitIndex::Future(_) => TransmitKind::Future,
3945        };
3946        let id = TableId::<TransmitHandle>::new(rep);
3947        log::trace!("guest_drop_readable: drop reader {id:?}");
3948        store.host_drop_reader(id, kind)
3949    }
3950
3951    /// Create a new error context for the given component.
3952    pub(crate) fn error_context_new(
3953        self,
3954        store: &mut StoreOpaque,
3955        caller: RuntimeComponentInstanceIndex,
3956        ty: TypeComponentLocalErrorContextTableIndex,
3957        options: OptionsIndex,
3958        debug_msg_address: u32,
3959        debug_msg_len: u32,
3960    ) -> Result<u32> {
3961        self.id().get(store).check_may_leave(caller)?;
3962        let lift_ctx = &mut LiftContext::new(store, options, self);
3963        let debug_msg = String::linear_lift_from_flat(
3964            lift_ctx,
3965            InterfaceType::String,
3966            &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
3967        )?;
3968
3969        // Create a new ErrorContext that is tracked along with other concurrent state
3970        let err_ctx = ErrorContextState { debug_msg };
3971        let state = store.concurrent_state_mut();
3972        let table_id = state.push(err_ctx)?;
3973        let global_ref_count_idx =
3974            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3975
3976        // Add to the global error context ref counts
3977        let _ = state
3978            .global_error_context_ref_counts
3979            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3980
3981        // Error context are tracked both locally (to a single component instance) and globally
3982        // the counts for both must stay in sync.
3983        //
3984        // Here we reflect the newly created global concurrent error context state into the
3985        // component instance's locally tracked count, along with the appropriate key into the global
3986        // ref tracking data structures to enable later lookup
3987        let local_idx = self
3988            .id()
3989            .get_mut(store)
3990            .table_for_error_context(ty)
3991            .error_context_insert(table_id.rep())?;
3992
3993        Ok(local_idx)
3994    }
3995
3996    /// Retrieve the debug message from the specified error context.
3997    pub(super) fn error_context_debug_message<T>(
3998        self,
3999        store: StoreContextMut<T>,
4000        ty: TypeComponentLocalErrorContextTableIndex,
4001        options: OptionsIndex,
4002        err_ctx_handle: u32,
4003        debug_msg_address: u32,
4004    ) -> Result<()> {
4005        // Retrieve the error context and internal debug message
4006        let handle_table_id_rep = self
4007            .id()
4008            .get_mut(store.0)
4009            .table_for_error_context(ty)
4010            .error_context_rep(err_ctx_handle)?;
4011
4012        let state = store.0.concurrent_state_mut();
4013        // Get the state associated with the error context
4014        let ErrorContextState { debug_msg } =
4015            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4016        let debug_msg = debug_msg.clone();
4017
4018        let lower_cx = &mut LowerContext::new(store, options, self);
4019        let debug_msg_address = usize::try_from(debug_msg_address)?;
4020        // Lower the string into the component's memory
4021        let offset = lower_cx
4022            .as_slice_mut()
4023            .get(debug_msg_address..)
4024            .and_then(|b| b.get(..debug_msg.bytes().len()))
4025            .map(|_| debug_msg_address)
4026            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
4027        debug_msg
4028            .as_str()
4029            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4030
4031        Ok(())
4032    }
4033
4034    /// Implements the `future.cancel-read` intrinsic.
4035    pub(crate) fn future_cancel_read(
4036        self,
4037        store: &mut StoreOpaque,
4038        caller: RuntimeComponentInstanceIndex,
4039        ty: TypeFutureTableIndex,
4040        async_: bool,
4041        reader: u32,
4042    ) -> Result<u32> {
4043        self.id().get(store).check_may_leave(caller)?;
4044        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4045            .map(|v| v.encode())
4046    }
4047
4048    /// Implements the `future.cancel-write` intrinsic.
4049    pub(crate) fn future_cancel_write(
4050        self,
4051        store: &mut StoreOpaque,
4052        caller: RuntimeComponentInstanceIndex,
4053        ty: TypeFutureTableIndex,
4054        async_: bool,
4055        writer: u32,
4056    ) -> Result<u32> {
4057        self.id().get(store).check_may_leave(caller)?;
4058        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4059            .map(|v| v.encode())
4060    }
4061
4062    /// Implements the `stream.cancel-read` intrinsic.
4063    pub(crate) fn stream_cancel_read(
4064        self,
4065        store: &mut StoreOpaque,
4066        caller: RuntimeComponentInstanceIndex,
4067        ty: TypeStreamTableIndex,
4068        async_: bool,
4069        reader: u32,
4070    ) -> Result<u32> {
4071        self.id().get(store).check_may_leave(caller)?;
4072        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4073            .map(|v| v.encode())
4074    }
4075
4076    /// Implements the `stream.cancel-write` intrinsic.
4077    pub(crate) fn stream_cancel_write(
4078        self,
4079        store: &mut StoreOpaque,
4080        caller: RuntimeComponentInstanceIndex,
4081        ty: TypeStreamTableIndex,
4082        async_: bool,
4083        writer: u32,
4084    ) -> Result<u32> {
4085        self.id().get(store).check_may_leave(caller)?;
4086        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4087            .map(|v| v.encode())
4088    }
4089
4090    /// Implements the `future.drop-readable` intrinsic.
4091    pub(crate) fn future_drop_readable(
4092        self,
4093        store: &mut StoreOpaque,
4094        caller: RuntimeComponentInstanceIndex,
4095        ty: TypeFutureTableIndex,
4096        reader: u32,
4097    ) -> Result<()> {
4098        self.id().get(store).check_may_leave(caller)?;
4099        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4100    }
4101
4102    /// Implements the `stream.drop-readable` intrinsic.
4103    pub(crate) fn stream_drop_readable(
4104        self,
4105        store: &mut StoreOpaque,
4106        caller: RuntimeComponentInstanceIndex,
4107        ty: TypeStreamTableIndex,
4108        reader: u32,
4109    ) -> Result<()> {
4110        self.id().get(store).check_may_leave(caller)?;
4111        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4112    }
4113
4114    /// Allocate a new future or stream and grant ownership of both the read and
4115    /// write ends to the (sub-)component instance to which the specified
4116    /// `TransmitIndex` belongs.
4117    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4118        let (write, read) = store
4119            .concurrent_state_mut()
4120            .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4121
4122        let table = self.id().get_mut(store).table_for_transmit(ty);
4123        let (read_handle, write_handle) = match ty {
4124            TransmitIndex::Future(ty) => (
4125                table.future_insert_read(ty, read.rep())?,
4126                table.future_insert_write(ty, write.rep())?,
4127            ),
4128            TransmitIndex::Stream(ty) => (
4129                table.stream_insert_read(ty, read.rep())?,
4130                table.stream_insert_write(ty, write.rep())?,
4131            ),
4132        };
4133
4134        let state = store.concurrent_state_mut();
4135        state.get_mut(read)?.common.handle = Some(read_handle);
4136        state.get_mut(write)?.common.handle = Some(write_handle);
4137
4138        Ok(ResourcePair {
4139            write: write_handle,
4140            read: read_handle,
4141        })
4142    }
4143
4144    /// Drop the specified error context.
4145    pub(crate) fn error_context_drop(
4146        self,
4147        store: &mut StoreOpaque,
4148        caller: RuntimeComponentInstanceIndex,
4149        ty: TypeComponentLocalErrorContextTableIndex,
4150        error_context: u32,
4151    ) -> Result<()> {
4152        let instance = self.id().get_mut(store);
4153        instance.check_may_leave(caller)?;
4154
4155        let local_handle_table = instance.table_for_error_context(ty);
4156
4157        let rep = local_handle_table.error_context_drop(error_context)?;
4158
4159        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4160
4161        let state = store.concurrent_state_mut();
4162        let GlobalErrorContextRefCount(global_ref_count) = state
4163            .global_error_context_ref_counts
4164            .get_mut(&global_ref_count_idx)
4165            .expect("retrieve concurrent state for error context during drop");
4166
4167        // Reduce the component-global ref count, removing tracking if necessary
4168        assert!(*global_ref_count >= 1);
4169        *global_ref_count -= 1;
4170        if *global_ref_count == 0 {
4171            state
4172                .global_error_context_ref_counts
4173                .remove(&global_ref_count_idx);
4174
4175            state
4176                .delete(TableId::<ErrorContextState>::new(rep))
4177                .context("deleting component-global error context data")?;
4178        }
4179
4180        Ok(())
4181    }
4182
4183    /// Transfer ownership of the specified stream or future read end from one
4184    /// guest to another.
4185    fn guest_transfer(
4186        self,
4187        store: &mut StoreOpaque,
4188        src_idx: u32,
4189        src: TransmitIndex,
4190        dst: TransmitIndex,
4191    ) -> Result<u32> {
4192        let mut instance = self.id().get_mut(store);
4193        let src_table = instance.as_mut().table_for_transmit(src);
4194        let (rep, is_done) = match src {
4195            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4196            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4197        };
4198        if is_done {
4199            bail!("cannot lift after being notified that the writable end dropped");
4200        }
4201        let dst_table = instance.table_for_transmit(dst);
4202        let handle = match dst {
4203            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4204            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4205        }?;
4206        store
4207            .concurrent_state_mut()
4208            .get_mut(TableId::<TransmitHandle>::new(rep))?
4209            .common
4210            .handle = Some(handle);
4211        Ok(handle)
4212    }
4213
4214    /// Implements the `future.new` intrinsic.
4215    pub(crate) fn future_new(
4216        self,
4217        store: &mut StoreOpaque,
4218        caller: RuntimeComponentInstanceIndex,
4219        ty: TypeFutureTableIndex,
4220    ) -> Result<ResourcePair> {
4221        self.id().get(store).check_may_leave(caller)?;
4222        self.guest_new(store, TransmitIndex::Future(ty))
4223    }
4224
4225    /// Implements the `stream.new` intrinsic.
4226    pub(crate) fn stream_new(
4227        self,
4228        store: &mut StoreOpaque,
4229        caller: RuntimeComponentInstanceIndex,
4230        ty: TypeStreamTableIndex,
4231    ) -> Result<ResourcePair> {
4232        self.id().get(store).check_may_leave(caller)?;
4233        self.guest_new(store, TransmitIndex::Stream(ty))
4234    }
4235
4236    /// Transfer ownership of the specified future read end from one guest to
4237    /// another.
4238    pub(crate) fn future_transfer(
4239        self,
4240        store: &mut StoreOpaque,
4241        src_idx: u32,
4242        src: TypeFutureTableIndex,
4243        dst: TypeFutureTableIndex,
4244    ) -> Result<u32> {
4245        self.guest_transfer(
4246            store,
4247            src_idx,
4248            TransmitIndex::Future(src),
4249            TransmitIndex::Future(dst),
4250        )
4251    }
4252
4253    /// Transfer ownership of the specified stream read end from one guest to
4254    /// another.
4255    pub(crate) fn stream_transfer(
4256        self,
4257        store: &mut StoreOpaque,
4258        src_idx: u32,
4259        src: TypeStreamTableIndex,
4260        dst: TypeStreamTableIndex,
4261    ) -> Result<u32> {
4262        self.guest_transfer(
4263            store,
4264            src_idx,
4265            TransmitIndex::Stream(src),
4266            TransmitIndex::Stream(dst),
4267        )
4268    }
4269
4270    /// Copy the specified error context from one component to another.
4271    pub(crate) fn error_context_transfer(
4272        self,
4273        store: &mut StoreOpaque,
4274        src_idx: u32,
4275        src: TypeComponentLocalErrorContextTableIndex,
4276        dst: TypeComponentLocalErrorContextTableIndex,
4277    ) -> Result<u32> {
4278        let mut instance = self.id().get_mut(store);
4279        let rep = instance
4280            .as_mut()
4281            .table_for_error_context(src)
4282            .error_context_rep(src_idx)?;
4283        let dst_idx = instance
4284            .table_for_error_context(dst)
4285            .error_context_insert(rep)?;
4286
4287        // Update the global (cross-subcomponent) count for error contexts
4288        // as the new component has essentially created a new reference that will
4289        // be dropped/handled independently
4290        let global_ref_count = store
4291            .concurrent_state_mut()
4292            .global_error_context_ref_counts
4293            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4294            .context("global ref count present for existing (sub)component error context")?;
4295        global_ref_count.0 += 1;
4296
4297        Ok(dst_idx)
4298    }
4299}
4300
4301impl ComponentInstance {
4302    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4303        let (states, types) = self.instance_states();
4304        let runtime_instance = match ty {
4305            TransmitIndex::Stream(ty) => types[ty].instance,
4306            TransmitIndex::Future(ty) => types[ty].instance,
4307        };
4308        states[runtime_instance].handle_table()
4309    }
4310
4311    fn table_for_error_context(
4312        self: Pin<&mut Self>,
4313        ty: TypeComponentLocalErrorContextTableIndex,
4314    ) -> &mut HandleTable {
4315        let (states, types) = self.instance_states();
4316        let runtime_instance = types[ty].instance;
4317        states[runtime_instance].handle_table()
4318    }
4319
4320    fn get_mut_by_index(
4321        self: Pin<&mut Self>,
4322        ty: TransmitIndex,
4323        index: u32,
4324    ) -> Result<(u32, &mut TransmitLocalState)> {
4325        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4326    }
4327}
4328
4329impl ConcurrentState {
4330    fn send_write_result(
4331        &mut self,
4332        ty: TransmitIndex,
4333        id: TableId<TransmitState>,
4334        handle: u32,
4335        code: ReturnCode,
4336    ) -> Result<()> {
4337        let write_handle = self.get_mut(id)?.write_handle.rep();
4338        self.set_event(
4339            write_handle,
4340            match ty {
4341                TransmitIndex::Future(ty) => Event::FutureWrite {
4342                    code,
4343                    pending: Some((ty, handle)),
4344                },
4345                TransmitIndex::Stream(ty) => Event::StreamWrite {
4346                    code,
4347                    pending: Some((ty, handle)),
4348                },
4349            },
4350        )
4351    }
4352
4353    fn send_read_result(
4354        &mut self,
4355        ty: TransmitIndex,
4356        id: TableId<TransmitState>,
4357        handle: u32,
4358        code: ReturnCode,
4359    ) -> Result<()> {
4360        let read_handle = self.get_mut(id)?.read_handle.rep();
4361        self.set_event(
4362            read_handle,
4363            match ty {
4364                TransmitIndex::Future(ty) => Event::FutureRead {
4365                    code,
4366                    pending: Some((ty, handle)),
4367                },
4368                TransmitIndex::Stream(ty) => Event::StreamRead {
4369                    code,
4370                    pending: Some((ty, handle)),
4371                },
4372            },
4373        )
4374    }
4375
4376    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4377        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4378    }
4379
4380    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4381        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4382    }
4383
4384    /// Set or update the event for the specified waitable.
4385    ///
4386    /// If there is already an event set for this waitable, we assert that it is
4387    /// of the same variant as the new one and reuse the `ReturnCode` count and
4388    /// the `pending` field if applicable.
4389    // TODO: This is a bit awkward due to how
4390    // `Event::{Stream,Future}{Write,Read}` and
4391    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4392    // Consider updating those representations in a way that allows this
4393    // function to be simplified.
4394    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4395        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4396
4397        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4398            let (ReturnCode::Completed(count)
4399            | ReturnCode::Dropped(count)
4400            | ReturnCode::Cancelled(count)) = old
4401            else {
4402                unreachable!()
4403            };
4404
4405            match new {
4406                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4407                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4408                _ => unreachable!(),
4409            }
4410        }
4411
4412        let event = match (waitable.take_event(self)?, event) {
4413            (None, _) => event,
4414            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4415            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4416            (
4417                Some(Event::StreamWrite {
4418                    code: old_code,
4419                    pending: old_pending,
4420                }),
4421                Event::StreamWrite { code, pending },
4422            ) => Event::StreamWrite {
4423                code: update_code(old_code, code),
4424                pending: old_pending.or(pending),
4425            },
4426            (
4427                Some(Event::StreamRead {
4428                    code: old_code,
4429                    pending: old_pending,
4430                }),
4431                Event::StreamRead { code, pending },
4432            ) => Event::StreamRead {
4433                code: update_code(old_code, code),
4434                pending: old_pending.or(pending),
4435            },
4436            _ => unreachable!(),
4437        };
4438
4439        waitable.set_event(self, Some(event))
4440    }
4441
4442    /// Allocate a new future or stream, including the `TransmitState` and the
4443    /// `TransmitHandle`s corresponding to the read and write ends.
4444    fn new_transmit(
4445        &mut self,
4446        origin: TransmitOrigin,
4447    ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4448        let state_id = self.push(TransmitState::new(origin))?;
4449
4450        let write = self.push(TransmitHandle::new(state_id))?;
4451        let read = self.push(TransmitHandle::new(state_id))?;
4452
4453        let state = self.get_mut(state_id)?;
4454        state.write_handle = write;
4455        state.read_handle = read;
4456
4457        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4458
4459        Ok((write, read))
4460    }
4461
4462    /// Delete the specified future or stream, including the read and write ends.
4463    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4464        let state = self.delete(state_id)?;
4465        self.delete(state.write_handle)?;
4466        self.delete(state.read_handle)?;
4467
4468        log::trace!(
4469            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4470            state.write_handle,
4471            state.read_handle,
4472        );
4473
4474        Ok(())
4475    }
4476}
4477
4478pub(crate) struct ResourcePair {
4479    pub(crate) write: u32,
4480    pub(crate) read: u32,
4481}
4482
4483impl Waitable {
4484    /// Handle the imminent delivery of the specified event, e.g. by updating
4485    /// the state of the stream or future.
4486    pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4487        match event {
4488            Event::FutureRead {
4489                pending: Some((ty, handle)),
4490                ..
4491            }
4492            | Event::FutureWrite {
4493                pending: Some((ty, handle)),
4494                ..
4495            } => {
4496                let instance = instance.id().get_mut(store);
4497                let runtime_instance = instance.component().types()[ty].instance;
4498                let (rep, state) = instance.instance_states().0[runtime_instance]
4499                    .handle_table()
4500                    .future_rep(ty, handle)
4501                    .unwrap();
4502                assert_eq!(rep, self.rep());
4503                assert_eq!(*state, TransmitLocalState::Busy);
4504                *state = match event {
4505                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4506                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4507                    _ => unreachable!(),
4508                };
4509            }
4510            Event::StreamRead {
4511                pending: Some((ty, handle)),
4512                code,
4513            }
4514            | Event::StreamWrite {
4515                pending: Some((ty, handle)),
4516                code,
4517            } => {
4518                let instance = instance.id().get_mut(store);
4519                let runtime_instance = instance.component().types()[ty].instance;
4520                let (rep, state) = instance.instance_states().0[runtime_instance]
4521                    .handle_table()
4522                    .stream_rep(ty, handle)
4523                    .unwrap();
4524                assert_eq!(rep, self.rep());
4525                assert_eq!(*state, TransmitLocalState::Busy);
4526                let done = matches!(code, ReturnCode::Dropped(_));
4527                *state = match event {
4528                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
4529                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4530                    _ => unreachable!(),
4531                };
4532
4533                let transmit_handle = TableId::<TransmitHandle>::new(rep);
4534                let state = store.concurrent_state_mut();
4535                let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4536                let transmit = state.get_mut(transmit_id).unwrap();
4537
4538                match event {
4539                    Event::StreamRead { .. } => {
4540                        transmit.read = ReadState::Open;
4541                    }
4542                    Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4543                    _ => unreachable!(),
4544                };
4545            }
4546            _ => {}
4547        }
4548    }
4549}
4550
4551/// Determine whether an intra-component read/write is allowed for the specified
4552/// `stream` or `future` payload type according to the component model
4553/// specification.
4554fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4555    matches!(
4556        ty,
4557        None | Some(
4558            InterfaceType::S8
4559                | InterfaceType::U8
4560                | InterfaceType::S16
4561                | InterfaceType::U16
4562                | InterfaceType::S32
4563                | InterfaceType::U32
4564                | InterfaceType::S64
4565                | InterfaceType::U64
4566                | InterfaceType::Float32
4567                | InterfaceType::Float64
4568        )
4569    )
4570}
4571
4572#[cfg(test)]
4573mod tests {
4574    use super::*;
4575    use crate::{Engine, Store};
4576    use core::future::pending;
4577    use core::pin::pin;
4578    use std::sync::LazyLock;
4579
4580    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4581
4582    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4583    where
4584        T: FutureProducer<()>,
4585    {
4586        rx.poll_produce(
4587            &mut Context::from_waker(Waker::noop()),
4588            Store::new(&ENGINE, ()).as_context_mut(),
4589            finish,
4590        )
4591    }
4592
4593    #[test]
4594    fn future_producer() {
4595        let mut fut = pin!(async { anyhow::Ok(()) });
4596        assert!(matches!(
4597            poll_future_producer(fut.as_mut(), false),
4598            Poll::Ready(Ok(Some(()))),
4599        ));
4600
4601        let mut fut = pin!(async { anyhow::Ok(()) });
4602        assert!(matches!(
4603            poll_future_producer(fut.as_mut(), true),
4604            Poll::Ready(Ok(Some(()))),
4605        ));
4606
4607        let mut fut = pin!(pending::<Result<()>>());
4608        assert!(matches!(
4609            poll_future_producer(fut.as_mut(), false),
4610            Poll::Pending,
4611        ));
4612        assert!(matches!(
4613            poll_future_producer(fut.as_mut(), true),
4614            Poll::Ready(Ok(None)),
4615        ));
4616
4617        let (tx, rx) = oneshot::channel();
4618        let mut rx = pin!(rx);
4619        assert!(matches!(
4620            poll_future_producer(rx.as_mut(), false),
4621            Poll::Pending,
4622        ));
4623        assert!(matches!(
4624            poll_future_producer(rx.as_mut(), true),
4625            Poll::Ready(Ok(None)),
4626        ));
4627        tx.send(()).unwrap();
4628        assert!(matches!(
4629            poll_future_producer(rx.as_mut(), true),
4630            Poll::Ready(Ok(Some(()))),
4631        ));
4632
4633        let (tx, rx) = oneshot::channel();
4634        let mut rx = pin!(rx);
4635        tx.send(()).unwrap();
4636        assert!(matches!(
4637            poll_future_producer(rx.as_mut(), false),
4638            Poll::Ready(Ok(Some(()))),
4639        ));
4640
4641        let (tx, rx) = oneshot::channel::<()>();
4642        let mut rx = pin!(rx);
4643        drop(tx);
4644        assert!(matches!(
4645            poll_future_producer(rx.as_mut(), false),
4646            Poll::Ready(Err(..)),
4647        ));
4648
4649        let (tx, rx) = oneshot::channel::<()>();
4650        let mut rx = pin!(rx);
4651        drop(tx);
4652        assert!(matches!(
4653            poll_future_producer(rx.as_mut(), true),
4654            Poll::Ready(Err(..)),
4655        ));
4656    }
4657}