wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9    AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10    Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17    Error, Result, bail,
18    error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, MaybeUninit};
26use core::pin::Pin;
27use core::task::{Context, Poll, Waker, ready};
28use futures::channel::oneshot;
29use futures::{FutureExt as _, stream};
30use std::any::{Any, TypeId};
31use std::boxed::Box;
32use std::io::Cursor;
33use std::string::String;
34use std::sync::{Arc, Mutex};
35use std::vec::Vec;
36use wasmtime_environ::component::{
37    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
38    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
39    TypeFutureTableIndex, TypeStreamTableIndex,
40};
41
42pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
43
44mod buffers;
45
46/// Enum for distinguishing between a stream or future in functions that handle
47/// both.
48#[derive(Copy, Clone, Debug)]
49pub enum TransmitKind {
50    Stream,
51    Future,
52}
53
54/// Represents `{stream,future}.{read,write}` results.
55#[derive(Copy, Clone, Debug, PartialEq)]
56pub enum ReturnCode {
57    Blocked,
58    Completed(u32),
59    Dropped(u32),
60    Cancelled(u32),
61}
62
63impl ReturnCode {
64    /// Pack `self` into a single 32-bit integer that may be returned to the
65    /// guest.
66    ///
67    /// This corresponds to `pack_copy_result` in the Component Model spec.
68    pub fn encode(&self) -> u32 {
69        const BLOCKED: u32 = 0xffff_ffff;
70        const COMPLETED: u32 = 0x0;
71        const DROPPED: u32 = 0x1;
72        const CANCELLED: u32 = 0x2;
73        match self {
74            ReturnCode::Blocked => BLOCKED,
75            ReturnCode::Completed(n) => {
76                debug_assert!(*n < (1 << 28));
77                (n << 4) | COMPLETED
78            }
79            ReturnCode::Dropped(n) => {
80                debug_assert!(*n < (1 << 28));
81                (n << 4) | DROPPED
82            }
83            ReturnCode::Cancelled(n) => {
84                debug_assert!(*n < (1 << 28));
85                (n << 4) | CANCELLED
86            }
87        }
88    }
89
90    /// Returns `Self::Completed` with the specified count (or zero if
91    /// `matches!(kind, TransmitKind::Future)`)
92    fn completed(kind: TransmitKind, count: u32) -> Self {
93        Self::Completed(if let TransmitKind::Future = kind {
94            0
95        } else {
96            count
97        })
98    }
99}
100
101/// Represents a stream or future type index.
102///
103/// This is useful as a parameter type for functions which operate on either a
104/// future or a stream.
105#[derive(Copy, Clone, Debug)]
106pub enum TransmitIndex {
107    Stream(TypeStreamTableIndex),
108    Future(TypeFutureTableIndex),
109}
110
111impl TransmitIndex {
112    pub fn kind(&self) -> TransmitKind {
113        match self {
114            TransmitIndex::Stream(_) => TransmitKind::Stream,
115            TransmitIndex::Future(_) => TransmitKind::Future,
116        }
117    }
118}
119
120/// Retrieve the payload type of the specified stream or future, or `None` if it
121/// has no payload type.
122fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
123    match ty {
124        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
125        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
126    }
127}
128
129/// Retrieve the host rep and state for the specified guest-visible waitable
130/// handle.
131fn get_mut_by_index_from(
132    handle_table: &mut HandleTable,
133    ty: TransmitIndex,
134    index: u32,
135) -> Result<(u32, &mut TransmitLocalState)> {
136    match ty {
137        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
138        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
139    }
140}
141
142fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
143    mut store: StoreContextMut<U>,
144    instance: Instance,
145    options: OptionsIndex,
146    ty: TransmitIndex,
147    address: usize,
148    count: usize,
149    buffer: &mut B,
150) -> Result<()> {
151    let count = buffer.remaining().len().min(count);
152
153    let lower = &mut if T::MAY_REQUIRE_REALLOC {
154        LowerContext::new
155    } else {
156        LowerContext::new_without_realloc
157    }(store.as_context_mut(), options, instance);
158
159    if address % usize::try_from(T::ALIGN32)? != 0 {
160        bail!("read pointer not aligned");
161    }
162    lower
163        .as_slice_mut()
164        .get_mut(address..)
165        .and_then(|b| b.get_mut(..T::SIZE32 * count))
166        .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
167
168    if let Some(ty) = payload(ty, lower.types) {
169        T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
170    }
171
172    buffer.skip(count);
173
174    Ok(())
175}
176
177fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
178    lift: &mut LiftContext<'_>,
179    ty: Option<InterfaceType>,
180    buffer: &mut B,
181    address: usize,
182    count: usize,
183) -> Result<()> {
184    let count = count.min(buffer.remaining_capacity());
185    if T::IS_RUST_UNIT_TYPE {
186        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
187        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
188        // is a valid way to populate the zero-sized buffer.
189        buffer.extend(
190            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
191        )
192    } else {
193        let ty = ty.unwrap();
194        if address % usize::try_from(T::ALIGN32)? != 0 {
195            bail!("write pointer not aligned");
196        }
197        lift.memory()
198            .get(address..)
199            .and_then(|b| b.get(..T::SIZE32 * count))
200            .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
201
202        let list = &WasmList::new(address, count, lift, ty)?;
203        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
204    }
205    Ok(())
206}
207
208/// Represents the state associated with an error context
209#[derive(Debug, PartialEq, Eq, PartialOrd)]
210pub(super) struct ErrorContextState {
211    /// Debug message associated with the error context
212    pub(crate) debug_msg: String,
213}
214
215/// Represents the size and alignment for a "flat" Component Model type,
216/// i.e. one containing no pointers or handles.
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub(super) struct FlatAbi {
219    pub(super) size: u32,
220    pub(super) align: u32,
221}
222
223/// Represents the buffer for a host- or guest-initiated stream read.
224pub struct Destination<'a, T, B> {
225    id: TableId<TransmitState>,
226    buffer: &'a mut B,
227    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
228    _phantom: PhantomData<fn() -> T>,
229}
230
231impl<'a, T, B> Destination<'a, T, B> {
232    /// Reborrow `self` so it can be used again later.
233    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
234        Destination {
235            id: self.id,
236            buffer: &mut *self.buffer,
237            host_buffer: self.host_buffer.as_deref_mut(),
238            _phantom: PhantomData,
239        }
240    }
241
242    /// Take the buffer out of `self`, leaving a default-initialized one in its
243    /// place.
244    ///
245    /// This can be useful for reusing the previously-stored buffer's capacity
246    /// instead of allocating a fresh one.
247    pub fn take_buffer(&mut self) -> B
248    where
249        B: Default,
250    {
251        mem::take(self.buffer)
252    }
253
254    /// Store the specified buffer in `self`.
255    ///
256    /// Any items contained in the buffer will be delivered to the reader after
257    /// the `StreamProducer::poll_produce` call to which this `Destination` was
258    /// passed returns (unless overwritten by another call to `set_buffer`).
259    ///
260    /// If items are stored via this buffer _and_ written via a
261    /// `DirectDestination` view of `self`, then the items in the buffer will be
262    /// delivered after the ones written using `DirectDestination`.
263    pub fn set_buffer(&mut self, buffer: B) {
264        *self.buffer = buffer;
265    }
266
267    /// Return the remaining number of items the current read has capacity to
268    /// accept, if known.
269    ///
270    /// This will return `Some(_)` if the reader is a guest; it will return
271    /// `None` if the reader is the host.
272    ///
273    /// Note that this can return `Some(0)`. This means that the guest is
274    /// attempting to perform a zero-length read which typically means that it's
275    /// trying to wait for this stream to be ready-to-read but is not actually
276    /// ready to receive the items yet. The host in this case is allowed to
277    /// either block waiting for readiness or immediately complete the
278    /// operation. The guest is expected to handle both cases. Some more
279    /// discussion about this case can be found in the discussion of ["Stream
280    /// Readiness" in the component-model repo][docs].
281    ///
282    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
283    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
284        let transmit = store
285            .as_context_mut()
286            .0
287            .concurrent_state_mut()
288            .get_mut(self.id)
289            .unwrap();
290
291        if let &ReadState::GuestReady { count, .. } = &transmit.read {
292            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
293                unreachable!()
294            };
295
296            Some(count - guest_offset)
297        } else {
298            None
299        }
300    }
301}
302
303impl<'a, B> Destination<'a, u8, B> {
304    /// Return a `DirectDestination` view of `self`.
305    ///
306    /// If the reader is a guest, this will provide direct access to the guest's
307    /// read buffer.  If the reader is a host, this will provide access to a
308    /// buffer which will be delivered to the host before any items stored using
309    /// `Destination::set_buffer`.
310    ///
311    /// `capacity` will only be used if the reader is a host, in which case it
312    /// will update the length of the buffer, possibly zero-initializing the new
313    /// elements if the new length is larger than the old length.
314    pub fn as_direct<D>(
315        mut self,
316        store: StoreContextMut<'a, D>,
317        capacity: usize,
318    ) -> DirectDestination<'a, D> {
319        if let Some(buffer) = self.host_buffer.as_deref_mut() {
320            buffer.set_position(0);
321            if buffer.get_mut().is_empty() {
322                buffer.get_mut().resize(capacity, 0);
323            }
324        }
325
326        DirectDestination {
327            id: self.id,
328            host_buffer: self.host_buffer,
329            store,
330        }
331    }
332}
333
334/// Represents a read from a `stream<u8>`, providing direct access to the
335/// writer's buffer.
336pub struct DirectDestination<'a, D: 'static> {
337    id: TableId<TransmitState>,
338    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
339    store: StoreContextMut<'a, D>,
340}
341
342impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
343    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
344        let rem = self.remaining();
345        let n = rem.len().min(buf.len());
346        rem[..n].copy_from_slice(&buf[..n]);
347        self.mark_written(n);
348        Ok(n)
349    }
350
351    fn flush(&mut self) -> std::io::Result<()> {
352        Ok(())
353    }
354}
355
356impl<D: 'static> DirectDestination<'_, D> {
357    /// Provide direct access to the writer's buffer.
358    pub fn remaining(&mut self) -> &mut [u8] {
359        if let Some(buffer) = self.host_buffer.as_deref_mut() {
360            buffer.get_mut()
361        } else {
362            let transmit = self
363                .store
364                .as_context_mut()
365                .0
366                .concurrent_state_mut()
367                .get_mut(self.id)
368                .unwrap();
369
370            let &ReadState::GuestReady {
371                address,
372                count,
373                options,
374                instance,
375                ..
376            } = &transmit.read
377            else {
378                unreachable!();
379            };
380
381            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
382                unreachable!()
383            };
384
385            instance
386                .options_memory_mut(self.store.0, options)
387                .get_mut((address + guest_offset)..)
388                .and_then(|b| b.get_mut(..(count - guest_offset)))
389                .unwrap()
390        }
391    }
392
393    /// Mark the specified number of bytes as written to the writer's buffer.
394    ///
395    /// This will panic if the count is larger than the size of the
396    /// buffer returned by `Self::remaining`.
397    pub fn mark_written(&mut self, count: usize) {
398        if let Some(buffer) = self.host_buffer.as_deref_mut() {
399            buffer.set_position(
400                buffer
401                    .position()
402                    .checked_add(u64::try_from(count).unwrap())
403                    .unwrap(),
404            );
405        } else {
406            let transmit = self
407                .store
408                .as_context_mut()
409                .0
410                .concurrent_state_mut()
411                .get_mut(self.id)
412                .unwrap();
413
414            let ReadState::GuestReady {
415                count: read_count, ..
416            } = &transmit.read
417            else {
418                unreachable!();
419            };
420
421            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
422                unreachable!()
423            };
424
425            if *guest_offset + count > *read_count {
426                panic!(
427                    "write count ({count}) must be less than or equal to read count ({read_count})"
428                )
429            } else {
430                *guest_offset += count;
431            }
432        }
433    }
434}
435
436/// Represents the state of a `Stream{Producer,Consumer}`.
437#[derive(Copy, Clone, Debug)]
438pub enum StreamResult {
439    /// The operation completed normally, and the producer or consumer may be
440    /// able to produce or consume more items, respectively.
441    Completed,
442    /// The operation was interrupted (i.e. it wrapped up early after receiving
443    /// a `finish` parameter value of true in a call to `poll_produce` or
444    /// `poll_consume`), and the producer or consumer may be able to produce or
445    /// consume more items, respectively.
446    Cancelled,
447    /// The operation completed normally, but the producer or consumer will
448    /// _not_ able to produce or consume more items, respectively.
449    Dropped,
450}
451
452/// Represents the host-owned write end of a stream.
453pub trait StreamProducer<D>: Send + 'static {
454    /// The payload type of this stream.
455    type Item;
456
457    /// The `WriteBuffer` type to use when delivering items.
458    type Buffer: WriteBuffer<Self::Item> + Default;
459
460    /// Handle a host- or guest-initiated read by delivering zero or more items
461    /// to the specified destination.
462    ///
463    /// This will be called whenever the reader starts a read.
464    ///
465    /// # Arguments
466    ///
467    /// * `self` - a `Pin`'d version of self to perform Rust-level
468    ///   future-related operations on.
469    /// * `cx` - a Rust-related [`Context`] which is passed to other
470    ///   future-related operations or used to acquire a waker.
471    /// * `store` - the Wasmtime store that this operation is happening within.
472    ///   Used, for example, to consult the state `D` associated with the store.
473    /// * `destination` - the location that items are to be written to.
474    /// * `finish` - a flag indicating whether the host should strive to
475    ///   immediately complete/cancel any pending operation. See below for more
476    ///   details.
477    ///
478    /// # Behavior
479    ///
480    /// If the implementation is able to produce one or more items immediately,
481    /// it should write them to `destination` and return either
482    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
483    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
484    /// any more items.
485    ///
486    /// If the implementation is unable to produce any items immediately, but
487    /// expects to do so later, and `finish` is _false_, it should store the
488    /// waker from `cx` for later and return `Poll::Pending` without writing
489    /// anything to `destination`.  Later, it should alert the waker when either
490    /// the items arrive, the stream has ended, or an error occurs.
491    ///
492    /// If more items are written to `destination` than the reader has immediate
493    /// capacity to accept, they will be retained in memory by the caller and
494    /// used to satisfy future reads, in which case `poll_produce` will only be
495    /// called again once all those items have been delivered.
496    ///
497    /// # Zero-length reads
498    ///
499    /// This function may be called with a zero-length capacity buffer
500    /// (i.e. `Destination::remaining` returns `Some(0)`). This indicates that
501    /// the guest wants to wait to see if an item is ready without actually
502    /// reading the item. For example think of a UNIX `poll` function run on a
503    /// TCP stream, seeing if it's readable without actually reading it.
504    ///
505    /// In this situation the host is allowed to either return immediately or
506    /// wait for readiness. Note that waiting for readiness is not always
507    /// possible. For example it's impossible to test if a Rust-native `Future`
508    /// is ready without actually reading the item. Stream-specific
509    /// optimizations, such as testing if a TCP stream is readable, may be
510    /// possible however.
511    ///
512    /// For a zero-length read, the host is allowed to:
513    ///
514    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
515    ///   anything if it expects to be able to produce items immediately (i.e.
516    ///   without first returning `Poll::Pending`) the next time `poll_produce`
517    ///   is called with non-zero capacity. This is the best-case scenario of
518    ///   fulfilling the guest's desire -- items aren't read/buffered but the
519    ///   host is saying it's ready when the guest is.
520    ///
521    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without actually
522    ///   testing for readiness. The guest doesn't know this yet, but the guest
523    ///   will realize that zero-length reads won't work on this stream when a
524    ///   subsequent nonzero read attempt is made which returns `Poll::Pending`
525    ///   here.
526    ///
527    /// - Return `Poll::Pending` if the host has performed necessary async work
528    ///   to wait for this stream to be readable without actually reading
529    ///   anything. This is also a best-case scenario where the host is letting
530    ///   the guest know that nothing is ready yet. Later the zero-length read
531    ///   will complete and then the guest will attempt a nonzero-length read to
532    ///   actually read some bytes.
533    ///
534    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
535    ///   `Destination::set_buffer` with one more more items. Note, however,
536    ///   that this creates the hazard that the items will never be received by
537    ///   the guest if it decides not to do another non-zero-length read before
538    ///   closing the stream.  Moreover, if `Self::Item` is e.g. a
539    ///   `Resource<_>`, they may end up leaking in that scenario. It is not
540    ///   recommended to do this and it's better to return
541    ///   `StreamResult::Completed` without buffering anything instead.
542    ///
543    /// For more discussion on zero-length reads see the [documentation in the
544    /// component-model repo itself][docs].
545    ///
546    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
547    ///
548    /// # Return
549    ///
550    /// This function can return a number of possible cases from this function:
551    ///
552    /// * `Poll::Pending` - this operation cannot complete at this time. The
553    ///   Rust-level `Future::poll` contract applies here where a waker should
554    ///   be stored from the `cx` argument and be arranged to receive a
555    ///   notification when this implementation can make progress. For example
556    ///   if you call `Future::poll` on a sub-future, that's enough. If items
557    ///   were written to `destination` then a trap in the guest will be raised.
558    ///
559    ///   Note that implementations should strive to avoid this return value
560    ///   when `finish` is `true`. In such a situation the guest is attempting
561    ///   to, for example, cancel a previous operation. By returning
562    ///   `Poll::Pending` the guest will be blocked during the cancellation
563    ///   request. If `finish` is `true` then `StreamResult::Cancelled` is
564    ///   favored to indicate that no items were read. If a short read happened,
565    ///   however, it's ok to return `StreamResult::Completed` indicating some
566    ///   items were read.
567    ///
568    /// * `Poll::Ok(StreamResult::Completed)` - items, if applicable, were
569    ///   written to the `destination`.
570    ///
571    /// * `Poll::Ok(StreamResult::Cancelled)` - used when `finish` is `true` and
572    ///   the implementation was able to successfully cancel any async work that
573    ///   a previous read kicked off, if any. The host should not buffer values
574    ///   received after returning `Cancelled` because the guest will not be
575    ///   aware of these values and the guest could close the stream after
576    ///   cancelling a read. Hosts should only return `Cancelled` when there are
577    ///   no more async operations in flight for a previous read.
578    ///
579    ///   If items were written to `destination` then a trap in the guest will
580    ///   be raised. If `finish` is `false` then this return value will raise a
581    ///   trap in the guest.
582    ///
583    /// * `Poll::Ok(StreamResult::Dropped)` - end-of-stream marker, indicating
584    ///   that this producer should not be polled again. Note that items may
585    ///   still be written to `destination`.
586    ///
587    /// # Errors
588    ///
589    /// The implementation may alternatively choose to return `Err(_)` to
590    /// indicate an unrecoverable error. This will cause the guest (if any) to
591    /// trap and render the component instance (if any) unusable. The
592    /// implementation should report errors that _are_ recoverable by other
593    /// means (e.g. by writing to a `future`) and return
594    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
595    fn poll_produce<'a>(
596        self: Pin<&mut Self>,
597        cx: &mut Context<'_>,
598        store: StoreContextMut<'a, D>,
599        destination: Destination<'a, Self::Item, Self::Buffer>,
600        finish: bool,
601    ) -> Poll<Result<StreamResult>>;
602
603    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
604    /// be downcast to the specified type.
605    ///
606    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
607    /// to the specified type is guaranteed to succeed.
608    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
609        Err(me)
610    }
611}
612
613impl<T, D> StreamProducer<D> for iter::Empty<T>
614where
615    T: Send + Sync + 'static,
616{
617    type Item = T;
618    type Buffer = Option<Self::Item>;
619
620    fn poll_produce<'a>(
621        self: Pin<&mut Self>,
622        _: &mut Context<'_>,
623        _: StoreContextMut<'a, D>,
624        _: Destination<'a, Self::Item, Self::Buffer>,
625        _: bool,
626    ) -> Poll<Result<StreamResult>> {
627        Poll::Ready(Ok(StreamResult::Dropped))
628    }
629}
630
631impl<T, D> StreamProducer<D> for stream::Empty<T>
632where
633    T: Send + Sync + 'static,
634{
635    type Item = T;
636    type Buffer = Option<Self::Item>;
637
638    fn poll_produce<'a>(
639        self: Pin<&mut Self>,
640        _: &mut Context<'_>,
641        _: StoreContextMut<'a, D>,
642        _: Destination<'a, Self::Item, Self::Buffer>,
643        _: bool,
644    ) -> Poll<Result<StreamResult>> {
645        Poll::Ready(Ok(StreamResult::Dropped))
646    }
647}
648
649impl<T, D> StreamProducer<D> for Vec<T>
650where
651    T: Unpin + Send + Sync + 'static,
652{
653    type Item = T;
654    type Buffer = VecBuffer<T>;
655
656    fn poll_produce<'a>(
657        self: Pin<&mut Self>,
658        _: &mut Context<'_>,
659        _: StoreContextMut<'a, D>,
660        mut dst: Destination<'a, Self::Item, Self::Buffer>,
661        _: bool,
662    ) -> Poll<Result<StreamResult>> {
663        dst.set_buffer(mem::take(self.get_mut()).into());
664        Poll::Ready(Ok(StreamResult::Dropped))
665    }
666}
667
668impl<T, D> StreamProducer<D> for Box<[T]>
669where
670    T: Unpin + Send + Sync + 'static,
671{
672    type Item = T;
673    type Buffer = VecBuffer<T>;
674
675    fn poll_produce<'a>(
676        self: Pin<&mut Self>,
677        _: &mut Context<'_>,
678        _: StoreContextMut<'a, D>,
679        mut dst: Destination<'a, Self::Item, Self::Buffer>,
680        _: bool,
681    ) -> Poll<Result<StreamResult>> {
682        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
683        Poll::Ready(Ok(StreamResult::Dropped))
684    }
685}
686
687#[cfg(feature = "component-model-async-bytes")]
688impl<D> StreamProducer<D> for bytes::Bytes {
689    type Item = u8;
690    type Buffer = Cursor<Self>;
691
692    fn poll_produce<'a>(
693        mut self: Pin<&mut Self>,
694        _: &mut Context<'_>,
695        mut store: StoreContextMut<'a, D>,
696        mut dst: Destination<'a, Self::Item, Self::Buffer>,
697        _: bool,
698    ) -> Poll<Result<StreamResult>> {
699        let cap = dst.remaining(&mut store);
700        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
701            // on 0-length or host reads, buffer the bytes
702            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
703            return Poll::Ready(Ok(StreamResult::Dropped));
704        };
705        let cap = cap.into();
706        // data does not fit in destination, fill it and buffer the rest
707        dst.set_buffer(Cursor::new(self.split_off(cap)));
708        let mut dst = dst.as_direct(store, cap);
709        dst.remaining().copy_from_slice(&self);
710        dst.mark_written(cap);
711        Poll::Ready(Ok(StreamResult::Dropped))
712    }
713}
714
715#[cfg(feature = "component-model-async-bytes")]
716impl<D> StreamProducer<D> for bytes::BytesMut {
717    type Item = u8;
718    type Buffer = Cursor<Self>;
719
720    fn poll_produce<'a>(
721        mut self: Pin<&mut Self>,
722        _: &mut Context<'_>,
723        mut store: StoreContextMut<'a, D>,
724        mut dst: Destination<'a, Self::Item, Self::Buffer>,
725        _: bool,
726    ) -> Poll<Result<StreamResult>> {
727        let cap = dst.remaining(&mut store);
728        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
729            // on 0-length or host reads, buffer the bytes
730            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
731            return Poll::Ready(Ok(StreamResult::Dropped));
732        };
733        let cap = cap.into();
734        // data does not fit in destination, fill it and buffer the rest
735        dst.set_buffer(Cursor::new(self.split_off(cap)));
736        let mut dst = dst.as_direct(store, cap);
737        dst.remaining().copy_from_slice(&self);
738        dst.mark_written(cap);
739        Poll::Ready(Ok(StreamResult::Dropped))
740    }
741}
742
743/// Represents the buffer for a host- or guest-initiated stream write.
744pub struct Source<'a, T> {
745    id: TableId<TransmitState>,
746    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
747}
748
749impl<'a, T> Source<'a, T> {
750    /// Reborrow `self` so it can be used again later.
751    pub fn reborrow(&mut self) -> Source<'_, T> {
752        Source {
753            id: self.id,
754            host_buffer: self.host_buffer.as_deref_mut(),
755        }
756    }
757
758    /// Accept zero or more items from the writer.
759    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
760    where
761        T: func::Lift + 'static,
762        B: ReadBuffer<T>,
763    {
764        if let Some(input) = &mut self.host_buffer {
765            let count = input.remaining().len().min(buffer.remaining_capacity());
766            buffer.move_from(*input, count);
767        } else {
768            let store = store.as_context_mut();
769            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
770
771            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
772                unreachable!();
773            };
774
775            let &WriteState::GuestReady {
776                ty,
777                address,
778                count,
779                options,
780                instance,
781                ..
782            } = &transmit.write
783            else {
784                unreachable!()
785            };
786
787            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
788            let ty = payload(ty, cx.types);
789            let old_remaining = buffer.remaining_capacity();
790            lift::<T, B>(
791                cx,
792                ty,
793                buffer,
794                address + (T::SIZE32 * guest_offset),
795                count - guest_offset,
796            )?;
797
798            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
799
800            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
801                unreachable!();
802            };
803
804            *guest_offset += old_remaining - buffer.remaining_capacity();
805        }
806
807        Ok(())
808    }
809
810    /// Return the number of items remaining to be read from the current write
811    /// operation.
812    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
813    where
814        T: 'static,
815    {
816        let transmit = store
817            .as_context_mut()
818            .0
819            .concurrent_state_mut()
820            .get_mut(self.id)
821            .unwrap();
822
823        if let &WriteState::GuestReady { count, .. } = &transmit.write {
824            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
825                unreachable!()
826            };
827
828            count - guest_offset
829        } else if let Some(host_buffer) = &self.host_buffer {
830            host_buffer.remaining().len()
831        } else {
832            unreachable!()
833        }
834    }
835}
836
837impl<'a> Source<'a, u8> {
838    /// Return a `DirectSource` view of `self`.
839    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
840        DirectSource {
841            id: self.id,
842            host_buffer: self.host_buffer,
843            store,
844        }
845    }
846}
847
848/// Represents a write to a `stream<u8>`, providing direct access to the
849/// writer's buffer.
850pub struct DirectSource<'a, D: 'static> {
851    id: TableId<TransmitState>,
852    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
853    store: StoreContextMut<'a, D>,
854}
855
856impl<D: 'static> std::io::Read for DirectSource<'_, D> {
857    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
858        let rem = self.remaining();
859        let n = rem.len().min(buf.len());
860        buf[..n].copy_from_slice(&rem[..n]);
861        self.mark_read(n);
862        Ok(n)
863    }
864}
865
866impl<D: 'static> DirectSource<'_, D> {
867    /// Provide direct access to the writer's buffer.
868    pub fn remaining(&mut self) -> &[u8] {
869        if let Some(buffer) = self.host_buffer.as_deref_mut() {
870            buffer.remaining()
871        } else {
872            let transmit = self
873                .store
874                .as_context_mut()
875                .0
876                .concurrent_state_mut()
877                .get_mut(self.id)
878                .unwrap();
879
880            let &WriteState::GuestReady {
881                address,
882                count,
883                options,
884                instance,
885                ..
886            } = &transmit.write
887            else {
888                unreachable!()
889            };
890
891            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
892                unreachable!()
893            };
894
895            instance
896                .options_memory(self.store.0, options)
897                .get((address + guest_offset)..)
898                .and_then(|b| b.get(..(count - guest_offset)))
899                .unwrap()
900        }
901    }
902
903    /// Mark the specified number of bytes as read from the writer's buffer.
904    ///
905    /// This will panic if the count is larger than the size of the buffer
906    /// returned by `Self::remaining`.
907    pub fn mark_read(&mut self, count: usize) {
908        if let Some(buffer) = self.host_buffer.as_deref_mut() {
909            buffer.skip(count);
910        } else {
911            let transmit = self
912                .store
913                .as_context_mut()
914                .0
915                .concurrent_state_mut()
916                .get_mut(self.id)
917                .unwrap();
918
919            let WriteState::GuestReady {
920                count: write_count, ..
921            } = &transmit.write
922            else {
923                unreachable!()
924            };
925
926            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
927                unreachable!()
928            };
929
930            if *guest_offset + count > *write_count {
931                panic!(
932                    "read count ({count}) must be less than or equal to write count ({write_count})"
933                )
934            } else {
935                *guest_offset += count;
936            }
937        }
938    }
939}
940
941/// Represents the host-owned read end of a stream.
942pub trait StreamConsumer<D>: Send + 'static {
943    /// The payload type of this stream.
944    type Item;
945
946    /// Handle a host- or guest-initiated write by accepting zero or more items
947    /// from the specified source.
948    ///
949    /// This will be called whenever the writer starts a write.
950    ///
951    /// If the implementation is able to consume one or more items immediately,
952    /// it should take them from `source` and return either
953    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
954    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
955    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
956    /// indicate that the caller should delay sending a `COMPLETED` event to the
957    /// writer until a later call to this function returns `Poll::Ready(_)`.
958    /// For more about that, see the `Backpressure` section below.
959    ///
960    /// If the implementation cannot consume any items immediately and `finish`
961    /// is _false_, it should store the waker from `cx` for later and return
962    /// `Poll::Pending` without writing anything to `destination`.  Later, it
963    /// should alert the waker when either (1) the items arrive, (2) the stream
964    /// has ended, or (3) an error occurs.
965    ///
966    /// If the implementation cannot consume any items immediately and `finish`
967    /// is _true_, it should, if possible, return
968    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
969    /// anything from `source`.  However, that might not be possible if an
970    /// earlier call to `poll_consume` kicked off an asynchronous operation
971    /// which needs to be completed (and possibly interrupted) gracefully, in
972    /// which case the implementation may return `Poll::Pending` and later alert
973    /// the waker as described above.  In other words, when `finish` is true,
974    /// the implementation should prioritize returning a result to the reader
975    /// (even if no items can be consumed) rather than wait indefinitely for at
976    /// capacity to free up.
977    ///
978    /// In all of the above cases, the implementation may alternatively choose
979    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
980    /// the guest (if any) to trap and render the component instance (if any)
981    /// unusable.  The implementation should report errors that _are_
982    /// recoverable by other means (e.g. by writing to a `future`) and return
983    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
984    ///
985    /// Note that the implementation should only return
986    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
987    /// items from `source` if called with `finish` set to true.  If it does so
988    /// when `finish` is false, the caller will trap.  Additionally, it should
989    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
990    /// least one item from `source` if there is an item available; otherwise,
991    /// the caller will trap.  If `poll_consume` is called with no items in
992    /// `source`, it should only return `Poll::Ready(_)` once it is able to
993    /// accept at least one item during the next call to `poll_consume`.
994    ///
995    /// Note that any items which the implementation of this trait takes from
996    /// `source` become the responsibility of that implementation.  For that
997    /// reason, an implementation which forwards items to an upstream sink
998    /// should reserve capacity in that sink before taking items out of
999    /// `source`, if possible.  Alternatively, it might buffer items which can't
1000    /// be forwarded immediately and send them once capacity is freed up.
1001    ///
1002    /// ## Backpressure
1003    ///
1004    /// As mentioned above, an implementation might choose to return
1005    /// `Poll::Pending` after taking items from `source`, which tells the caller
1006    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
1007    /// a form of backpressure when the items are forwarded to an upstream sink
1008    /// asynchronously.  Note, however, that it's not possible to "put back"
1009    /// items into `source` once they've been taken out, so if the upstream sink
1010    /// is unable to accept all the items, that cannot be communicated to the
1011    /// writer at this level of abstraction.  Just as with application-specific,
1012    /// recoverable errors, information about which items could be forwarded and
1013    /// which could not must be communicated out-of-band, e.g. by writing to an
1014    /// application-specific `future`.
1015    ///
1016    /// Similarly, if the writer cancels the write after items have been taken
1017    /// from `source` but before the items have all been forwarded to an
1018    /// upstream sink, `poll_consume` will be called with `finish` set to true,
1019    /// and the implementation may either:
1020    ///
1021    /// - Interrupt the forwarding process gracefully.  This may be preferable
1022    /// if there is an out-of-band channel for communicating to the writer how
1023    /// many items were forwarded before being interrupted.
1024    ///
1025    /// - Allow the forwarding to complete without interrupting it.  This is
1026    /// usually preferable if there's no out-of-band channel for reporting back
1027    /// to the writer how many items were forwarded.
1028    fn poll_consume(
1029        self: Pin<&mut Self>,
1030        cx: &mut Context<'_>,
1031        store: StoreContextMut<D>,
1032        source: Source<'_, Self::Item>,
1033        finish: bool,
1034    ) -> Poll<Result<StreamResult>>;
1035}
1036
1037/// Represents a host-owned write end of a future.
1038pub trait FutureProducer<D>: Send + 'static {
1039    /// The payload type of this future.
1040    type Item;
1041
1042    /// Handle a host- or guest-initiated read by producing a value.
1043    ///
1044    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1045    /// simplified interface for futures.
1046    ///
1047    /// If `finish` is true, the implementation may return
1048    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
1049    /// could produce a value.  Otherwise, it must either return
1050    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
1051    fn poll_produce(
1052        self: Pin<&mut Self>,
1053        cx: &mut Context<'_>,
1054        store: StoreContextMut<D>,
1055        finish: bool,
1056    ) -> Poll<Result<Option<Self::Item>>>;
1057}
1058
1059impl<T, E, D, Fut> FutureProducer<D> for Fut
1060where
1061    E: Into<Error>,
1062    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1063{
1064    type Item = T;
1065
1066    fn poll_produce<'a>(
1067        self: Pin<&mut Self>,
1068        cx: &mut Context<'_>,
1069        _: StoreContextMut<'a, D>,
1070        finish: bool,
1071    ) -> Poll<Result<Option<T>>> {
1072        match self.poll(cx) {
1073            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1074            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1075            Poll::Pending if finish => Poll::Ready(Ok(None)),
1076            Poll::Pending => Poll::Pending,
1077        }
1078    }
1079}
1080
1081/// Represents a host-owned read end of a future.
1082pub trait FutureConsumer<D>: Send + 'static {
1083    /// The payload type of this future.
1084    type Item;
1085
1086    /// Handle a host- or guest-initiated write by consuming a value.
1087    ///
1088    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1089    /// simplified interface for futures.
1090    ///
1091    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
1092    /// without taking the item from `source`, which indicates the operation was
1093    /// canceled before it could consume the value.  Otherwise, it must either
1094    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
1095    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
1096    /// the item).
1097    fn poll_consume(
1098        self: Pin<&mut Self>,
1099        cx: &mut Context<'_>,
1100        store: StoreContextMut<D>,
1101        source: Source<'_, Self::Item>,
1102        finish: bool,
1103    ) -> Poll<Result<()>>;
1104}
1105
1106/// Represents the readable end of a Component Model `future`.
1107///
1108/// Note that `FutureReader` instances must be disposed of using either `pipe`
1109/// or `close`; otherwise the in-store representation will leak and the writer
1110/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1111/// ensure that disposal happens automatically.
1112pub struct FutureReader<T> {
1113    id: TableId<TransmitHandle>,
1114    _phantom: PhantomData<T>,
1115}
1116
1117impl<T> FutureReader<T> {
1118    /// Create a new future with the specified producer.
1119    pub fn new<S: AsContextMut>(
1120        mut store: S,
1121        producer: impl FutureProducer<S::Data, Item = T>,
1122    ) -> Self
1123    where
1124        T: func::Lower + func::Lift + Send + Sync + 'static,
1125    {
1126        struct Producer<P>(P);
1127
1128        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1129            for Producer<P>
1130        {
1131            type Item = P::Item;
1132            type Buffer = Option<P::Item>;
1133
1134            fn poll_produce<'a>(
1135                self: Pin<&mut Self>,
1136                cx: &mut Context<'_>,
1137                store: StoreContextMut<D>,
1138                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1139                finish: bool,
1140            ) -> Poll<Result<StreamResult>> {
1141                // SAFETY: This is a standard pin-projection, and we never move
1142                // out of `self`.
1143                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1144
1145                Poll::Ready(Ok(
1146                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1147                        destination.set_buffer(Some(value));
1148
1149                        // Here we return `StreamResult::Completed` even though
1150                        // we've produced the last item we'll ever produce.
1151                        // That's because the ABI expects
1152                        // `ReturnCode::Completed(1)` rather than
1153                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1154                        // called again since the future will have resolved.
1155                        StreamResult::Completed
1156                    } else {
1157                        StreamResult::Cancelled
1158                    },
1159                ))
1160            }
1161        }
1162
1163        Self::new_(
1164            store
1165                .as_context_mut()
1166                .new_transmit(TransmitKind::Future, Producer(producer)),
1167        )
1168    }
1169
1170    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1171        Self {
1172            id,
1173            _phantom: PhantomData,
1174        }
1175    }
1176
1177    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1178        self.id
1179    }
1180
1181    /// Set the consumer that accepts the result of this future.
1182    pub fn pipe<S: AsContextMut>(
1183        self,
1184        mut store: S,
1185        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1186    ) where
1187        T: func::Lift + 'static,
1188    {
1189        struct Consumer<C>(C);
1190
1191        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1192            for Consumer<C>
1193        {
1194            type Item = T;
1195
1196            fn poll_consume(
1197                self: Pin<&mut Self>,
1198                cx: &mut Context<'_>,
1199                mut store: StoreContextMut<D>,
1200                mut source: Source<Self::Item>,
1201                finish: bool,
1202            ) -> Poll<Result<StreamResult>> {
1203                // SAFETY: This is a standard pin-projection, and we never move
1204                // out of `self`.
1205                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1206
1207                ready!(consumer.poll_consume(
1208                    cx,
1209                    store.as_context_mut(),
1210                    source.reborrow(),
1211                    finish
1212                ))?;
1213
1214                Poll::Ready(Ok(if source.remaining(store) == 0 {
1215                    // Here we return `StreamResult::Completed` even though
1216                    // we've consumed the last item we'll ever consume.  That's
1217                    // because the ABI expects `ReturnCode::Completed(1)` rather
1218                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1219                    // called again since the future will have resolved.
1220                    StreamResult::Completed
1221                } else {
1222                    StreamResult::Cancelled
1223                }))
1224            }
1225        }
1226
1227        store
1228            .as_context_mut()
1229            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1230    }
1231
1232    /// Transfer ownership of the read end of a future from a guest to the host.
1233    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1234        let id = lift_index_to_future(cx, ty, index)?;
1235        Ok(Self::new_(id))
1236    }
1237
1238    /// Close this `FutureReader`.
1239    ///
1240    /// This will close this half of the future which will signal to a pending
1241    /// write, if any, that the reader side is dropped. If the writer half has
1242    /// not yet written a value then when it attempts to write a value it will
1243    /// see that this end is closed.
1244    ///
1245    /// # Panics
1246    ///
1247    /// Panics if the store that the [`Accessor`] is derived from does not own
1248    /// this future. Usage of this future after calling `close` will also cause
1249    /// a panic.
1250    pub fn close(&mut self, mut store: impl AsContextMut) {
1251        future_close(store.as_context_mut().0, &mut self.id)
1252    }
1253
1254    /// Convenience method around [`Self::close`].
1255    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1256        accessor.as_accessor().with(|access| self.close(access))
1257    }
1258
1259    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1260    /// drop and clean it up from the store.
1261    ///
1262    /// Note that the `accessor` provided must own this future and is
1263    /// additionally transferred to the `GuardedFutureReader` return value.
1264    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1265    where
1266        A: AsAccessor,
1267    {
1268        GuardedFutureReader::new(accessor, self)
1269    }
1270
1271    /// Attempts to convert this [`FutureReader<T>`] to a [`FutureAny`].
1272    ///
1273    /// # Errors
1274    ///
1275    /// This function will return an error if `self` does not belong to
1276    /// `store`.
1277    pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1278    where
1279        T: ComponentType + 'static,
1280    {
1281        FutureAny::try_from_future_reader(store, self)
1282    }
1283
1284    /// Attempts to convert a [`FutureAny`] into a [`FutureReader<T>`].
1285    ///
1286    /// # Errors
1287    ///
1288    /// This function will fail if `T` doesn't match the type of the value that
1289    /// `future` is sending.
1290    pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1291    where
1292        T: ComponentType + 'static,
1293    {
1294        future.try_into_future_reader()
1295    }
1296}
1297
1298impl<T> fmt::Debug for FutureReader<T> {
1299    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1300        f.debug_struct("FutureReader")
1301            .field("id", &self.id)
1302            .finish()
1303    }
1304}
1305
1306pub(super) fn future_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1307    let id = mem::replace(id, TableId::new(u32::MAX));
1308    store.host_drop_reader(id, TransmitKind::Future).unwrap();
1309}
1310
1311/// Transfer ownership of the read end of a future from the host to a guest.
1312pub(super) fn lift_index_to_future(
1313    cx: &mut LiftContext<'_>,
1314    ty: InterfaceType,
1315    index: u32,
1316) -> Result<TableId<TransmitHandle>> {
1317    match ty {
1318        InterfaceType::Future(src) => {
1319            let handle_table = cx
1320                .instance_mut()
1321                .table_for_transmit(TransmitIndex::Future(src));
1322            let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1323            if is_done {
1324                bail!("cannot lift future after being notified that the writable end dropped");
1325            }
1326            let id = TableId::<TransmitHandle>::new(rep);
1327            let concurrent_state = cx.concurrent_state_mut();
1328            let future = concurrent_state.get_mut(id)?;
1329            future.common.handle = None;
1330            let state = future.state;
1331
1332            if concurrent_state.get_mut(state)?.done {
1333                bail!("cannot lift future after previous read succeeded");
1334            }
1335
1336            Ok(id)
1337        }
1338        _ => func::bad_type_info(),
1339    }
1340}
1341
1342/// Transfer ownership of the read end of a future from the host to a guest.
1343pub(super) fn lower_future_to_index<U>(
1344    id: TableId<TransmitHandle>,
1345    cx: &mut LowerContext<'_, U>,
1346    ty: InterfaceType,
1347) -> Result<u32> {
1348    match ty {
1349        InterfaceType::Future(dst) => {
1350            let concurrent_state = cx.store.0.concurrent_state_mut();
1351            let state = concurrent_state.get_mut(id)?.state;
1352            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1353
1354            let handle = cx
1355                .instance_mut()
1356                .table_for_transmit(TransmitIndex::Future(dst))
1357                .future_insert_read(dst, rep)?;
1358
1359            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1360
1361            Ok(handle)
1362        }
1363        _ => func::bad_type_info(),
1364    }
1365}
1366
1367// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1368// safe and correct since we lift and lower future handles as `u32`s.
1369unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1370    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1371
1372    type Lower = <u32 as func::ComponentType>::Lower;
1373
1374    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1375        match ty {
1376            InterfaceType::Future(ty) => {
1377                let ty = types.types[*ty].ty;
1378                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1379            }
1380            other => bail!("expected `future`, found `{}`", func::desc(other)),
1381        }
1382    }
1383}
1384
1385// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1386unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1387    fn linear_lower_to_flat<U>(
1388        &self,
1389        cx: &mut LowerContext<'_, U>,
1390        ty: InterfaceType,
1391        dst: &mut MaybeUninit<Self::Lower>,
1392    ) -> Result<()> {
1393        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1394    }
1395
1396    fn linear_lower_to_memory<U>(
1397        &self,
1398        cx: &mut LowerContext<'_, U>,
1399        ty: InterfaceType,
1400        offset: usize,
1401    ) -> Result<()> {
1402        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1403            cx,
1404            InterfaceType::U32,
1405            offset,
1406        )
1407    }
1408}
1409
1410// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1411unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1412    fn linear_lift_from_flat(
1413        cx: &mut LiftContext<'_>,
1414        ty: InterfaceType,
1415        src: &Self::Lower,
1416    ) -> Result<Self> {
1417        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1418        Self::lift_from_index(cx, ty, index)
1419    }
1420
1421    fn linear_lift_from_memory(
1422        cx: &mut LiftContext<'_>,
1423        ty: InterfaceType,
1424        bytes: &[u8],
1425    ) -> Result<Self> {
1426        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1427        Self::lift_from_index(cx, ty, index)
1428    }
1429}
1430
1431/// A [`FutureReader`] paired with an [`Accessor`].
1432///
1433/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1434/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1435/// [`FutureReader::guard`].
1436pub struct GuardedFutureReader<T, A>
1437where
1438    A: AsAccessor,
1439{
1440    // This field is `None` to implement the conversion from this guard back to
1441    // `FutureReader`. When `None` is seen in the destructor it will cause the
1442    // destructor to do nothing.
1443    reader: Option<FutureReader<T>>,
1444    accessor: A,
1445}
1446
1447impl<T, A> GuardedFutureReader<T, A>
1448where
1449    A: AsAccessor,
1450{
1451    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1452    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1453        Self {
1454            reader: Some(reader),
1455            accessor,
1456        }
1457    }
1458
1459    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1460    /// back.
1461    pub fn into_future(self) -> FutureReader<T> {
1462        self.into()
1463    }
1464}
1465
1466impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1467where
1468    A: AsAccessor,
1469{
1470    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1471        guard.reader.take().unwrap()
1472    }
1473}
1474
1475impl<T, A> Drop for GuardedFutureReader<T, A>
1476where
1477    A: AsAccessor,
1478{
1479    fn drop(&mut self) {
1480        if let Some(reader) = &mut self.reader {
1481            reader.close_with(&self.accessor)
1482        }
1483    }
1484}
1485
1486/// Represents the readable end of a Component Model `stream`.
1487///
1488/// Note that `StreamReader` instances must be disposed of using `close`;
1489/// otherwise the in-store representation will leak and the writer end will hang
1490/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1491/// disposal happens automatically.
1492pub struct StreamReader<T> {
1493    id: TableId<TransmitHandle>,
1494    _phantom: PhantomData<T>,
1495}
1496
1497impl<T> StreamReader<T> {
1498    /// Create a new stream with the specified producer.
1499    pub fn new<S: AsContextMut>(
1500        mut store: S,
1501        producer: impl StreamProducer<S::Data, Item = T>,
1502    ) -> Self
1503    where
1504        T: func::Lower + func::Lift + Send + Sync + 'static,
1505    {
1506        Self::new_(
1507            store
1508                .as_context_mut()
1509                .new_transmit(TransmitKind::Stream, producer),
1510        )
1511    }
1512
1513    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1514        Self {
1515            id,
1516            _phantom: PhantomData,
1517        }
1518    }
1519
1520    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1521        self.id
1522    }
1523
1524    /// Attempt to consume this object by converting it into the specified type.
1525    ///
1526    /// This can be useful for "short-circuiting" host-to-host streams,
1527    /// bypassing the guest entirely.  For example, if a guest task returns a
1528    /// host-created stream and then exits, this function may be used to
1529    /// retrieve the write end, after which the guest instance and store may be
1530    /// disposed of if no longer needed.
1531    ///
1532    /// This will return `Ok(_)` if and only if the following conditions are
1533    /// met:
1534    ///
1535    /// - The stream was created by the host (i.e. not by the guest).
1536    ///
1537    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1538    /// producer provided to `StreamReader::new` when the stream was created,
1539    /// along with `TypeId::of::<V>()`.
1540    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1541        let store = store.as_context_mut();
1542        let state = store.0.concurrent_state_mut();
1543        let id = state.get_mut(self.id).unwrap().state;
1544        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1545            match try_into(TypeId::of::<V>()) {
1546                Some(result) => {
1547                    self.close(store);
1548                    Ok(*result.downcast::<V>().unwrap())
1549                }
1550                None => Err(self),
1551            }
1552        } else {
1553            Err(self)
1554        }
1555    }
1556
1557    /// Set the consumer that accepts the items delivered to this stream.
1558    pub fn pipe<S: AsContextMut>(
1559        self,
1560        mut store: S,
1561        consumer: impl StreamConsumer<S::Data, Item = T>,
1562    ) where
1563        T: 'static,
1564    {
1565        store
1566            .as_context_mut()
1567            .set_consumer(self.id, TransmitKind::Stream, consumer);
1568    }
1569
1570    /// Transfer ownership of the read end of a stream from a guest to the host.
1571    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1572        let id = lift_index_to_stream(cx, ty, index)?;
1573        Ok(Self::new_(id))
1574    }
1575
1576    /// Close this `StreamReader`.
1577    ///
1578    /// This will signal that this portion of the stream is closed causing all
1579    /// future writes to return immediately with "DROPPED".
1580    ///
1581    /// # Panics
1582    ///
1583    /// Panics if the `store` does not own this future. Usage of this future
1584    /// after calling `close` will also cause a panic.
1585    pub fn close(&mut self, mut store: impl AsContextMut) {
1586        stream_close(store.as_context_mut().0, &mut self.id);
1587    }
1588
1589    /// Convenience method around [`Self::close`].
1590    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1591        accessor.as_accessor().with(|access| self.close(access))
1592    }
1593
1594    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1595    /// drop and clean it up from the store.
1596    ///
1597    /// Note that the `accessor` provided must own this future and is
1598    /// additionally transferred to the `GuardedStreamReader` return value.
1599    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1600    where
1601        A: AsAccessor,
1602    {
1603        GuardedStreamReader::new(accessor, self)
1604    }
1605
1606    /// Attempts to convert this [`StreamReader<T>`] to a [`StreamAny`].
1607    ///
1608    /// # Errors
1609    ///
1610    /// This function will return an error if `self` does not belong to
1611    /// `store`.
1612    pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1613    where
1614        T: ComponentType + 'static,
1615    {
1616        StreamAny::try_from_stream_reader(store, self)
1617    }
1618
1619    /// Attempts to convert a [`StreamAny`] into a [`StreamReader<T>`].
1620    ///
1621    /// # Errors
1622    ///
1623    /// This function will fail if `T` doesn't match the type of the value that
1624    /// `stream` is sending.
1625    pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1626    where
1627        T: ComponentType + 'static,
1628    {
1629        stream.try_into_stream_reader()
1630    }
1631}
1632
1633impl<T> fmt::Debug for StreamReader<T> {
1634    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1635        f.debug_struct("StreamReader")
1636            .field("id", &self.id)
1637            .finish()
1638    }
1639}
1640
1641pub(super) fn stream_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1642    let id = mem::replace(id, TableId::new(u32::MAX));
1643    store.host_drop_reader(id, TransmitKind::Stream).unwrap();
1644}
1645
1646/// Transfer ownership of the read end of a stream from a guest to the host.
1647pub(super) fn lift_index_to_stream(
1648    cx: &mut LiftContext<'_>,
1649    ty: InterfaceType,
1650    index: u32,
1651) -> Result<TableId<TransmitHandle>> {
1652    match ty {
1653        InterfaceType::Stream(src) => {
1654            let handle_table = cx
1655                .instance_mut()
1656                .table_for_transmit(TransmitIndex::Stream(src));
1657            let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1658            if is_done {
1659                bail!("cannot lift stream after being notified that the writable end dropped");
1660            }
1661            let id = TableId::<TransmitHandle>::new(rep);
1662            cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1663            Ok(id)
1664        }
1665        _ => func::bad_type_info(),
1666    }
1667}
1668
1669/// Transfer ownership of the read end of a stream from the host to a guest.
1670pub(super) fn lower_stream_to_index<U>(
1671    id: TableId<TransmitHandle>,
1672    cx: &mut LowerContext<'_, U>,
1673    ty: InterfaceType,
1674) -> Result<u32> {
1675    match ty {
1676        InterfaceType::Stream(dst) => {
1677            let concurrent_state = cx.store.0.concurrent_state_mut();
1678            let state = concurrent_state.get_mut(id)?.state;
1679            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1680
1681            let handle = cx
1682                .instance_mut()
1683                .table_for_transmit(TransmitIndex::Stream(dst))
1684                .stream_insert_read(dst, rep)?;
1685
1686            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1687
1688            Ok(handle)
1689        }
1690        _ => func::bad_type_info(),
1691    }
1692}
1693
1694// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1695// safe and correct since we lift and lower stream handles as `u32`s.
1696unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1697    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1698
1699    type Lower = <u32 as func::ComponentType>::Lower;
1700
1701    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1702        match ty {
1703            InterfaceType::Stream(ty) => {
1704                let ty = types.types[*ty].ty;
1705                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1706            }
1707            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1708        }
1709    }
1710}
1711
1712// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1713unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1714    fn linear_lower_to_flat<U>(
1715        &self,
1716        cx: &mut LowerContext<'_, U>,
1717        ty: InterfaceType,
1718        dst: &mut MaybeUninit<Self::Lower>,
1719    ) -> Result<()> {
1720        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1721    }
1722
1723    fn linear_lower_to_memory<U>(
1724        &self,
1725        cx: &mut LowerContext<'_, U>,
1726        ty: InterfaceType,
1727        offset: usize,
1728    ) -> Result<()> {
1729        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1730            cx,
1731            InterfaceType::U32,
1732            offset,
1733        )
1734    }
1735}
1736
1737// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1738unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1739    fn linear_lift_from_flat(
1740        cx: &mut LiftContext<'_>,
1741        ty: InterfaceType,
1742        src: &Self::Lower,
1743    ) -> Result<Self> {
1744        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1745        Self::lift_from_index(cx, ty, index)
1746    }
1747
1748    fn linear_lift_from_memory(
1749        cx: &mut LiftContext<'_>,
1750        ty: InterfaceType,
1751        bytes: &[u8],
1752    ) -> Result<Self> {
1753        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1754        Self::lift_from_index(cx, ty, index)
1755    }
1756}
1757
1758/// A [`StreamReader`] paired with an [`Accessor`].
1759///
1760/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1761/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1762/// [`StreamReader::guard`].
1763pub struct GuardedStreamReader<T, A>
1764where
1765    A: AsAccessor,
1766{
1767    // This field is `None` to implement the conversion from this guard back to
1768    // `StreamReader`. When `None` is seen in the destructor it will cause the
1769    // destructor to do nothing.
1770    reader: Option<StreamReader<T>>,
1771    accessor: A,
1772}
1773
1774impl<T, A> GuardedStreamReader<T, A>
1775where
1776    A: AsAccessor,
1777{
1778    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1779    /// `reader`.
1780    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1781        Self {
1782            reader: Some(reader),
1783            accessor,
1784        }
1785    }
1786
1787    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1788    /// back.
1789    pub fn into_stream(self) -> StreamReader<T> {
1790        self.into()
1791    }
1792}
1793
1794impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1795where
1796    A: AsAccessor,
1797{
1798    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1799        guard.reader.take().unwrap()
1800    }
1801}
1802
1803impl<T, A> Drop for GuardedStreamReader<T, A>
1804where
1805    A: AsAccessor,
1806{
1807    fn drop(&mut self) {
1808        if let Some(reader) = &mut self.reader {
1809            reader.close_with(&self.accessor)
1810        }
1811    }
1812}
1813
1814/// Represents a Component Model `error-context`.
1815pub struct ErrorContext {
1816    rep: u32,
1817}
1818
1819impl ErrorContext {
1820    pub(crate) fn new(rep: u32) -> Self {
1821        Self { rep }
1822    }
1823
1824    /// Convert this `ErrorContext` into a [`Val`].
1825    pub fn into_val(self) -> Val {
1826        Val::ErrorContext(ErrorContextAny(self.rep))
1827    }
1828
1829    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1830    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1831        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1832            bail!("expected `error-context`; got `{}`", value.desc());
1833        };
1834        Ok(Self::new(*rep))
1835    }
1836
1837    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1838        match ty {
1839            InterfaceType::ErrorContext(src) => {
1840                let rep = cx
1841                    .instance_mut()
1842                    .table_for_error_context(src)
1843                    .error_context_rep(index)?;
1844
1845                Ok(Self { rep })
1846            }
1847            _ => func::bad_type_info(),
1848        }
1849    }
1850}
1851
1852pub(crate) fn lower_error_context_to_index<U>(
1853    rep: u32,
1854    cx: &mut LowerContext<'_, U>,
1855    ty: InterfaceType,
1856) -> Result<u32> {
1857    match ty {
1858        InterfaceType::ErrorContext(dst) => {
1859            let tbl = cx.instance_mut().table_for_error_context(dst);
1860            tbl.error_context_insert(rep)
1861        }
1862        _ => func::bad_type_info(),
1863    }
1864}
1865// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1866// safe and correct since we lift and lower future handles as `u32`s.
1867unsafe impl func::ComponentType for ErrorContext {
1868    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1869
1870    type Lower = <u32 as func::ComponentType>::Lower;
1871
1872    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1873        match ty {
1874            InterfaceType::ErrorContext(_) => Ok(()),
1875            other => bail!("expected `error`, found `{}`", func::desc(other)),
1876        }
1877    }
1878}
1879
1880// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1881unsafe impl func::Lower for ErrorContext {
1882    fn linear_lower_to_flat<T>(
1883        &self,
1884        cx: &mut LowerContext<'_, T>,
1885        ty: InterfaceType,
1886        dst: &mut MaybeUninit<Self::Lower>,
1887    ) -> Result<()> {
1888        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1889            cx,
1890            InterfaceType::U32,
1891            dst,
1892        )
1893    }
1894
1895    fn linear_lower_to_memory<T>(
1896        &self,
1897        cx: &mut LowerContext<'_, T>,
1898        ty: InterfaceType,
1899        offset: usize,
1900    ) -> Result<()> {
1901        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1902            cx,
1903            InterfaceType::U32,
1904            offset,
1905        )
1906    }
1907}
1908
1909// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1910unsafe impl func::Lift for ErrorContext {
1911    fn linear_lift_from_flat(
1912        cx: &mut LiftContext<'_>,
1913        ty: InterfaceType,
1914        src: &Self::Lower,
1915    ) -> Result<Self> {
1916        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1917        Self::lift_from_index(cx, ty, index)
1918    }
1919
1920    fn linear_lift_from_memory(
1921        cx: &mut LiftContext<'_>,
1922        ty: InterfaceType,
1923        bytes: &[u8],
1924    ) -> Result<Self> {
1925        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1926        Self::lift_from_index(cx, ty, index)
1927    }
1928}
1929
1930/// Represents the read or write end of a stream or future.
1931pub(super) struct TransmitHandle {
1932    pub(super) common: WaitableCommon,
1933    /// See `TransmitState`
1934    state: TableId<TransmitState>,
1935}
1936
1937impl TransmitHandle {
1938    fn new(state: TableId<TransmitState>) -> Self {
1939        Self {
1940            common: WaitableCommon::default(),
1941            state,
1942        }
1943    }
1944}
1945
1946impl TableDebug for TransmitHandle {
1947    fn type_name() -> &'static str {
1948        "TransmitHandle"
1949    }
1950}
1951
1952/// Represents the state of a stream or future.
1953struct TransmitState {
1954    /// The write end of the stream or future.
1955    write_handle: TableId<TransmitHandle>,
1956    /// The read end of the stream or future.
1957    read_handle: TableId<TransmitHandle>,
1958    /// See `WriteState`
1959    write: WriteState,
1960    /// See `ReadState`
1961    read: ReadState,
1962    /// Whether futher values may be transmitted via this stream or future.
1963    done: bool,
1964    /// The original creator of this stream, used for type-checking with
1965    /// `{Future,Stream}Any`.
1966    pub(super) origin: TransmitOrigin,
1967}
1968
1969#[derive(Copy, Clone)]
1970pub(super) enum TransmitOrigin {
1971    Host,
1972    GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
1973    GuestStream(ComponentInstanceId, TypeStreamTableIndex),
1974}
1975
1976impl TransmitState {
1977    fn new(origin: TransmitOrigin) -> Self {
1978        Self {
1979            write_handle: TableId::new(u32::MAX),
1980            read_handle: TableId::new(u32::MAX),
1981            read: ReadState::Open,
1982            write: WriteState::Open,
1983            done: false,
1984            origin,
1985        }
1986    }
1987}
1988
1989impl TableDebug for TransmitState {
1990    fn type_name() -> &'static str {
1991        "TransmitState"
1992    }
1993}
1994
1995impl TransmitOrigin {
1996    fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
1997        match index {
1998            TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
1999            TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2000        }
2001    }
2002}
2003
2004type PollStream = Box<
2005    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2006>;
2007
2008type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2009
2010/// Represents the state of the write end of a stream or future.
2011enum WriteState {
2012    /// The write end is open, but no write is pending.
2013    Open,
2014    /// The write end is owned by a guest task and a write is pending.
2015    GuestReady {
2016        instance: Instance,
2017        caller: RuntimeComponentInstanceIndex,
2018        ty: TransmitIndex,
2019        flat_abi: Option<FlatAbi>,
2020        options: OptionsIndex,
2021        address: usize,
2022        count: usize,
2023        handle: u32,
2024    },
2025    /// The write end is owned by the host, which is ready to produce items.
2026    HostReady {
2027        produce: PollStream,
2028        try_into: TryInto,
2029        guest_offset: usize,
2030        cancel: bool,
2031        cancel_waker: Option<Waker>,
2032    },
2033    /// The write end has been dropped.
2034    Dropped,
2035}
2036
2037impl fmt::Debug for WriteState {
2038    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2039        match self {
2040            Self::Open => f.debug_tuple("Open").finish(),
2041            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2042            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2043            Self::Dropped => f.debug_tuple("Dropped").finish(),
2044        }
2045    }
2046}
2047
2048/// Represents the state of the read end of a stream or future.
2049enum ReadState {
2050    /// The read end is open, but no read is pending.
2051    Open,
2052    /// The read end is owned by a guest task and a read is pending.
2053    GuestReady {
2054        ty: TransmitIndex,
2055        caller: RuntimeComponentInstanceIndex,
2056        flat_abi: Option<FlatAbi>,
2057        instance: Instance,
2058        options: OptionsIndex,
2059        address: usize,
2060        count: usize,
2061        handle: u32,
2062    },
2063    /// The read end is owned by a host task, and it is ready to consume items.
2064    HostReady {
2065        consume: PollStream,
2066        guest_offset: usize,
2067        cancel: bool,
2068        cancel_waker: Option<Waker>,
2069    },
2070    /// Both the read and write ends are owned by the host.
2071    HostToHost {
2072        accept: Box<
2073            dyn for<'a> Fn(
2074                    &'a mut UntypedWriteBuffer<'a>,
2075                )
2076                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2077                + Send
2078                + Sync,
2079        >,
2080        buffer: Vec<u8>,
2081        limit: usize,
2082    },
2083    /// The read end has been dropped.
2084    Dropped,
2085}
2086
2087impl fmt::Debug for ReadState {
2088    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2089        match self {
2090            Self::Open => f.debug_tuple("Open").finish(),
2091            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2092            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2093            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2094            Self::Dropped => f.debug_tuple("Dropped").finish(),
2095        }
2096    }
2097}
2098
2099fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2100    let count = guest_offset.try_into().unwrap();
2101    match state {
2102        StreamResult::Dropped => ReturnCode::Dropped(count),
2103        StreamResult::Completed => ReturnCode::completed(kind, count),
2104        StreamResult::Cancelled => ReturnCode::Cancelled(count),
2105    }
2106}
2107
2108impl StoreOpaque {
2109    fn pipe_from_guest(
2110        &mut self,
2111        kind: TransmitKind,
2112        id: TableId<TransmitState>,
2113        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2114    ) {
2115        let future = async move {
2116            let stream_state = future.await?;
2117            tls::get(|store| {
2118                let state = store.concurrent_state_mut();
2119                let transmit = state.get_mut(id)?;
2120                let ReadState::HostReady {
2121                    consume,
2122                    guest_offset,
2123                    ..
2124                } = mem::replace(&mut transmit.read, ReadState::Open)
2125                else {
2126                    unreachable!();
2127                };
2128                let code = return_code(kind, stream_state, guest_offset);
2129                transmit.read = match stream_state {
2130                    StreamResult::Dropped => ReadState::Dropped,
2131                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2132                        consume,
2133                        guest_offset: 0,
2134                        cancel: false,
2135                        cancel_waker: None,
2136                    },
2137                };
2138                let WriteState::GuestReady { ty, handle, .. } =
2139                    mem::replace(&mut transmit.write, WriteState::Open)
2140                else {
2141                    unreachable!();
2142                };
2143                state.send_write_result(ty, id, handle, code)?;
2144                Ok(())
2145            })
2146        };
2147
2148        self.concurrent_state_mut().push_future(future.boxed());
2149    }
2150
2151    fn pipe_to_guest(
2152        &mut self,
2153        kind: TransmitKind,
2154        id: TableId<TransmitState>,
2155        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2156    ) {
2157        let future = async move {
2158            let stream_state = future.await?;
2159            tls::get(|store| {
2160                let state = store.concurrent_state_mut();
2161                let transmit = state.get_mut(id)?;
2162                let WriteState::HostReady {
2163                    produce,
2164                    try_into,
2165                    guest_offset,
2166                    ..
2167                } = mem::replace(&mut transmit.write, WriteState::Open)
2168                else {
2169                    unreachable!();
2170                };
2171                let code = return_code(kind, stream_state, guest_offset);
2172                transmit.write = match stream_state {
2173                    StreamResult::Dropped => WriteState::Dropped,
2174                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2175                        produce,
2176                        try_into,
2177                        guest_offset: 0,
2178                        cancel: false,
2179                        cancel_waker: None,
2180                    },
2181                };
2182                let ReadState::GuestReady { ty, handle, .. } =
2183                    mem::replace(&mut transmit.read, ReadState::Open)
2184                else {
2185                    unreachable!();
2186                };
2187                state.send_read_result(ty, id, handle, code)?;
2188                Ok(())
2189            })
2190        };
2191
2192        self.concurrent_state_mut().push_future(future.boxed());
2193    }
2194
2195    /// Drop the read end of a stream or future read from the host.
2196    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2197        let state = self.concurrent_state_mut();
2198        let transmit_id = state.get_mut(id)?.state;
2199        let transmit = state
2200            .get_mut(transmit_id)
2201            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2202        log::trace!(
2203            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2204            transmit.read,
2205            transmit.write
2206        );
2207
2208        transmit.read = ReadState::Dropped;
2209
2210        // If the write end is already dropped, it should stay dropped,
2211        // otherwise, it should be opened.
2212        let new_state = if let WriteState::Dropped = &transmit.write {
2213            WriteState::Dropped
2214        } else {
2215            WriteState::Open
2216        };
2217
2218        let write_handle = transmit.write_handle;
2219
2220        match mem::replace(&mut transmit.write, new_state) {
2221            // If a guest is waiting to write, notify it that the read end has
2222            // been dropped.
2223            WriteState::GuestReady { ty, handle, .. } => {
2224                state.update_event(
2225                    write_handle.rep(),
2226                    match ty {
2227                        TransmitIndex::Future(ty) => Event::FutureWrite {
2228                            code: ReturnCode::Dropped(0),
2229                            pending: Some((ty, handle)),
2230                        },
2231                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2232                            code: ReturnCode::Dropped(0),
2233                            pending: Some((ty, handle)),
2234                        },
2235                    },
2236                )?;
2237            }
2238
2239            WriteState::HostReady { .. } => {}
2240
2241            WriteState::Open => {
2242                state.update_event(
2243                    write_handle.rep(),
2244                    match kind {
2245                        TransmitKind::Future => Event::FutureWrite {
2246                            code: ReturnCode::Dropped(0),
2247                            pending: None,
2248                        },
2249                        TransmitKind::Stream => Event::StreamWrite {
2250                            code: ReturnCode::Dropped(0),
2251                            pending: None,
2252                        },
2253                    },
2254                )?;
2255            }
2256
2257            WriteState::Dropped => {
2258                log::trace!("host_drop_reader delete {transmit_id:?}");
2259                state.delete_transmit(transmit_id)?;
2260            }
2261        }
2262        Ok(())
2263    }
2264
2265    /// Drop the write end of a stream or future read from the host.
2266    fn host_drop_writer(
2267        &mut self,
2268        id: TableId<TransmitHandle>,
2269        on_drop_open: Option<fn() -> Result<()>>,
2270    ) -> Result<()> {
2271        let state = self.concurrent_state_mut();
2272        let transmit_id = state.get_mut(id)?.state;
2273        let transmit = state
2274            .get_mut(transmit_id)
2275            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2276        log::trace!(
2277            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2278            transmit.read,
2279            transmit.write
2280        );
2281
2282        // Existing queued transmits must be updated with information for the impending writer closure
2283        match &mut transmit.write {
2284            WriteState::GuestReady { .. } => {
2285                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2286            }
2287            WriteState::HostReady { .. } => {}
2288            v @ WriteState::Open => {
2289                if let (Some(on_drop_open), false) = (
2290                    on_drop_open,
2291                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2292                ) {
2293                    on_drop_open()?;
2294                } else {
2295                    *v = WriteState::Dropped;
2296                }
2297            }
2298            WriteState::Dropped => unreachable!("write state is already dropped"),
2299        }
2300
2301        let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2302
2303        // If the existing read state is dropped, then there's nothing to read
2304        // and we can keep it that way.
2305        //
2306        // If the read state was any other state, then we must set the new state to open
2307        // to indicate that there *is* data to be read
2308        let new_state = if let ReadState::Dropped = &transmit.read {
2309            ReadState::Dropped
2310        } else {
2311            ReadState::Open
2312        };
2313
2314        let read_handle = transmit.read_handle;
2315
2316        // Swap in the new read state
2317        match mem::replace(&mut transmit.read, new_state) {
2318            // If the guest was ready to read, then we cannot drop the reader (or writer);
2319            // we must deliver the event, and update the state associated with the handle to
2320            // represent that a read must be performed
2321            ReadState::GuestReady { ty, handle, .. } => {
2322                // Ensure the final read of the guest is queued, with appropriate closure indicator
2323                self.concurrent_state_mut().update_event(
2324                    read_handle.rep(),
2325                    match ty {
2326                        TransmitIndex::Future(ty) => Event::FutureRead {
2327                            code: ReturnCode::Dropped(0),
2328                            pending: Some((ty, handle)),
2329                        },
2330                        TransmitIndex::Stream(ty) => Event::StreamRead {
2331                            code: ReturnCode::Dropped(0),
2332                            pending: Some((ty, handle)),
2333                        },
2334                    },
2335                )?;
2336            }
2337
2338            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2339
2340            // If the read state is open, then there are no registered readers of the stream/future
2341            ReadState::Open => {
2342                self.concurrent_state_mut().update_event(
2343                    read_handle.rep(),
2344                    match on_drop_open {
2345                        Some(_) => Event::FutureRead {
2346                            code: ReturnCode::Dropped(0),
2347                            pending: None,
2348                        },
2349                        None => Event::StreamRead {
2350                            code: ReturnCode::Dropped(0),
2351                            pending: None,
2352                        },
2353                    },
2354                )?;
2355            }
2356
2357            // If the read state was already dropped, then we can remove the transmit state completely
2358            // (both writer and reader have been dropped)
2359            ReadState::Dropped => {
2360                log::trace!("host_drop_writer delete {transmit_id:?}");
2361                self.concurrent_state_mut().delete_transmit(transmit_id)?;
2362            }
2363        }
2364        Ok(())
2365    }
2366
2367    pub(super) fn transmit_origin(
2368        &mut self,
2369        id: TableId<TransmitHandle>,
2370    ) -> Result<TransmitOrigin> {
2371        let state = self.concurrent_state_mut();
2372        let state_id = state.get_mut(id)?.state;
2373        Ok(state.get_mut(state_id)?.origin)
2374    }
2375}
2376
2377impl<T> StoreContextMut<'_, T> {
2378    fn new_transmit<P: StreamProducer<T>>(
2379        mut self,
2380        kind: TransmitKind,
2381        producer: P,
2382    ) -> TableId<TransmitHandle>
2383    where
2384        P::Item: func::Lower,
2385    {
2386        let token = StoreToken::new(self.as_context_mut());
2387        let state = self.0.concurrent_state_mut();
2388        let (_, read) = state.new_transmit(TransmitOrigin::Host).unwrap();
2389        let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2390        let id = state.get_mut(read).unwrap().state;
2391        let mut dropped = false;
2392        let produce = Box::new({
2393            let producer = producer.clone();
2394            move || {
2395                let producer = producer.clone();
2396                async move {
2397                    let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2398
2399                    let (result, cancelled) = if buffer.remaining().is_empty() {
2400                        future::poll_fn(|cx| {
2401                            tls::get(|store| {
2402                                let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2403
2404                                let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2405                                    unreachable!();
2406                                };
2407
2408                                let mut host_buffer =
2409                                    if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2410                                        Some(Cursor::new(mem::take(buffer)))
2411                                    } else {
2412                                        None
2413                                    };
2414
2415                                let poll = mine.as_mut().poll_produce(
2416                                    cx,
2417                                    token.as_context_mut(store),
2418                                    Destination {
2419                                        id,
2420                                        buffer: &mut buffer,
2421                                        host_buffer: host_buffer.as_mut(),
2422                                        _phantom: PhantomData,
2423                                    },
2424                                    cancel,
2425                                );
2426
2427                                let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2428
2429                                let host_offset = if let (
2430                                    Some(host_buffer),
2431                                    ReadState::HostToHost { buffer, limit, .. },
2432                                ) = (host_buffer, &mut transmit.read)
2433                                {
2434                                    *limit = usize::try_from(host_buffer.position()).unwrap();
2435                                    *buffer = host_buffer.into_inner();
2436                                    *limit
2437                                } else {
2438                                    0
2439                                };
2440
2441                                {
2442                                    let WriteState::HostReady {
2443                                        guest_offset,
2444                                        cancel,
2445                                        cancel_waker,
2446                                        ..
2447                                    } = &mut transmit.write
2448                                    else {
2449                                        unreachable!();
2450                                    };
2451
2452                                    if poll.is_pending() {
2453                                        if !buffer.remaining().is_empty()
2454                                            || *guest_offset > 0
2455                                            || host_offset > 0
2456                                        {
2457                                            return Poll::Ready(Err(format_err!(
2458                                                "StreamProducer::poll_produce returned Poll::Pending \
2459                                                 after producing at least one item"
2460                                            )));
2461                                        }
2462                                        *cancel_waker = Some(cx.waker().clone());
2463                                    } else {
2464                                        *cancel_waker = None;
2465                                        *cancel = false;
2466                                    }
2467                                }
2468
2469                                poll.map(|v| v.map(|result| (result, cancel)))
2470                            })
2471                        })
2472                            .await?
2473                    } else {
2474                        (StreamResult::Completed, false)
2475                    };
2476
2477                    let (guest_offset, host_offset, count) = tls::get(|store| {
2478                        let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2479                        let (count, host_offset) = match &transmit.read {
2480                            &ReadState::GuestReady { count, .. } => (count, 0),
2481                            &ReadState::HostToHost { limit, .. } => (1, limit),
2482                            _ => unreachable!(),
2483                        };
2484                        let guest_offset = match &transmit.write {
2485                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2486                            _ => unreachable!(),
2487                        };
2488                        (guest_offset, host_offset, count)
2489                    });
2490
2491                    match result {
2492                        StreamResult::Completed => {
2493                            if count > 1
2494                                && buffer.remaining().is_empty()
2495                                && guest_offset == 0
2496                                && host_offset == 0
2497                            {
2498                                bail!(
2499                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2500                                     without producing any items"
2501                                );
2502                            }
2503                        }
2504                        StreamResult::Cancelled => {
2505                            if !cancelled {
2506                                bail!(
2507                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2508                                     without being given a `finish` parameter value of true"
2509                                );
2510                            }
2511                        }
2512                        StreamResult::Dropped => {
2513                            dropped = true;
2514                        }
2515                    }
2516
2517                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2518
2519                    *producer.lock().unwrap() = Some((mine, buffer));
2520
2521                    if write_buffer {
2522                        write(token, id, producer.clone(), kind).await?;
2523                    }
2524
2525                    Ok(if dropped {
2526                        if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2527                        {
2528                            StreamResult::Dropped
2529                        } else {
2530                            StreamResult::Completed
2531                        }
2532                    } else {
2533                        result
2534                    })
2535                }
2536                .boxed()
2537            }
2538        });
2539        let try_into = Box::new(move |ty| {
2540            let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2541            match P::try_into(mine, ty) {
2542                Ok(value) => Some(value),
2543                Err(mine) => {
2544                    *producer.lock().unwrap() = Some((mine, buffer));
2545                    None
2546                }
2547            }
2548        });
2549        state.get_mut(id).unwrap().write = WriteState::HostReady {
2550            produce,
2551            try_into,
2552            guest_offset: 0,
2553            cancel: false,
2554            cancel_waker: None,
2555        };
2556        read
2557    }
2558
2559    fn set_consumer<C: StreamConsumer<T>>(
2560        mut self,
2561        id: TableId<TransmitHandle>,
2562        kind: TransmitKind,
2563        consumer: C,
2564    ) {
2565        let token = StoreToken::new(self.as_context_mut());
2566        let state = self.0.concurrent_state_mut();
2567        let id = state.get_mut(id).unwrap().state;
2568        let transmit = state.get_mut(id).unwrap();
2569        let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2570        let consume_with_buffer = {
2571            let consumer = consumer.clone();
2572            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2573                let mut mine = consumer.lock().unwrap().take().unwrap();
2574
2575                let host_buffer_remaining_before =
2576                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2577
2578                let (result, cancelled) = future::poll_fn(|cx| {
2579                    tls::get(|store| {
2580                        let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2581                            &ReadState::HostReady { cancel, .. } => cancel,
2582                            ReadState::Open => false,
2583                            _ => unreachable!(),
2584                        };
2585
2586                        let poll = mine.as_mut().poll_consume(
2587                            cx,
2588                            token.as_context_mut(store),
2589                            Source {
2590                                id,
2591                                host_buffer: host_buffer.as_deref_mut(),
2592                            },
2593                            cancel,
2594                        );
2595
2596                        if let ReadState::HostReady {
2597                            cancel_waker,
2598                            cancel,
2599                            ..
2600                        } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2601                        {
2602                            if poll.is_pending() {
2603                                *cancel_waker = Some(cx.waker().clone());
2604                            } else {
2605                                *cancel_waker = None;
2606                                *cancel = false;
2607                            }
2608                        }
2609
2610                        poll.map(|v| v.map(|result| (result, cancel)))
2611                    })
2612                })
2613                .await?;
2614
2615                let (guest_offset, count) = tls::get(|store| {
2616                    let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2617                    (
2618                        match &transmit.read {
2619                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2620                            ReadState::Open => 0,
2621                            _ => unreachable!(),
2622                        },
2623                        match &transmit.write {
2624                            &WriteState::GuestReady { count, .. } => count,
2625                            WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2626                            _ => unreachable!(),
2627                        },
2628                    )
2629                });
2630
2631                match result {
2632                    StreamResult::Completed => {
2633                        if count > 0
2634                            && guest_offset == 0
2635                            && host_buffer_remaining_before
2636                                .zip(host_buffer.map(|v| v.remaining().len()))
2637                                .map(|(before, after)| before == after)
2638                                .unwrap_or(false)
2639                        {
2640                            bail!(
2641                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2642                                 without consuming any items"
2643                            );
2644                        }
2645
2646                        if let TransmitKind::Future = kind {
2647                            tls::get(|store| {
2648                                store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2649                            });
2650                        }
2651                    }
2652                    StreamResult::Cancelled => {
2653                        if !cancelled {
2654                            bail!(
2655                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2656                                 without being given a `finish` parameter value of true"
2657                            );
2658                        }
2659                    }
2660                    StreamResult::Dropped => {}
2661                }
2662
2663                *consumer.lock().unwrap() = Some(mine);
2664
2665                Ok(result)
2666            }
2667        };
2668        let consume = {
2669            let consume = consume_with_buffer.clone();
2670            Box::new(move || {
2671                let consume = consume.clone();
2672                async move { consume(None).await }.boxed()
2673            })
2674        };
2675
2676        match &transmit.write {
2677            WriteState::Open => {
2678                transmit.read = ReadState::HostReady {
2679                    consume,
2680                    guest_offset: 0,
2681                    cancel: false,
2682                    cancel_waker: None,
2683                };
2684            }
2685            &WriteState::GuestReady { .. } => {
2686                let future = consume();
2687                transmit.read = ReadState::HostReady {
2688                    consume,
2689                    guest_offset: 0,
2690                    cancel: false,
2691                    cancel_waker: None,
2692                };
2693                self.0.pipe_from_guest(kind, id, future);
2694            }
2695            WriteState::HostReady { .. } => {
2696                let WriteState::HostReady { produce, .. } = mem::replace(
2697                    &mut transmit.write,
2698                    WriteState::HostReady {
2699                        produce: Box::new(|| unreachable!()),
2700                        try_into: Box::new(|_| unreachable!()),
2701                        guest_offset: 0,
2702                        cancel: false,
2703                        cancel_waker: None,
2704                    },
2705                ) else {
2706                    unreachable!();
2707                };
2708
2709                transmit.read = ReadState::HostToHost {
2710                    accept: Box::new(move |input| {
2711                        let consume = consume_with_buffer.clone();
2712                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2713                    }),
2714                    buffer: Vec::new(),
2715                    limit: 0,
2716                };
2717
2718                let future = async move {
2719                    loop {
2720                        if tls::get(|store| {
2721                            crate::error::Ok(matches!(
2722                                store.concurrent_state_mut().get_mut(id)?.read,
2723                                ReadState::Dropped
2724                            ))
2725                        })? {
2726                            break Ok(());
2727                        }
2728
2729                        match produce().await? {
2730                            StreamResult::Completed | StreamResult::Cancelled => {}
2731                            StreamResult::Dropped => break Ok(()),
2732                        }
2733
2734                        if let TransmitKind::Future = kind {
2735                            break Ok(());
2736                        }
2737                    }
2738                }
2739                .map(move |result| {
2740                    tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2741                    result
2742                });
2743
2744                state.push_future(Box::pin(future));
2745            }
2746            WriteState::Dropped => {
2747                let reader = transmit.read_handle;
2748                self.0.host_drop_reader(reader, kind).unwrap();
2749            }
2750        }
2751    }
2752}
2753
2754async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2755    token: StoreToken<D>,
2756    id: TableId<TransmitState>,
2757    pair: Arc<Mutex<Option<(P, B)>>>,
2758    kind: TransmitKind,
2759) -> Result<()> {
2760    let (read, guest_offset) = tls::get(|store| {
2761        let transmit = store.concurrent_state_mut().get_mut(id)?;
2762
2763        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2764            Some(guest_offset)
2765        } else {
2766            None
2767        };
2768
2769        crate::error::Ok((
2770            mem::replace(&mut transmit.read, ReadState::Open),
2771            guest_offset,
2772        ))
2773    })?;
2774
2775    match read {
2776        ReadState::GuestReady {
2777            ty,
2778            flat_abi,
2779            options,
2780            address,
2781            count,
2782            handle,
2783            instance,
2784            caller,
2785        } => {
2786            let guest_offset = guest_offset.unwrap();
2787
2788            if let TransmitKind::Future = kind {
2789                tls::get(|store| {
2790                    store.concurrent_state_mut().get_mut(id)?.done = true;
2791                    crate::error::Ok(())
2792                })?;
2793            }
2794
2795            let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2796            let accept = {
2797                let pair = pair.clone();
2798                move |mut store: StoreContextMut<D>| {
2799                    lower::<T, B, D>(
2800                        store.as_context_mut(),
2801                        instance,
2802                        options,
2803                        ty,
2804                        address + (T::SIZE32 * guest_offset),
2805                        count - guest_offset,
2806                        &mut pair.lock().unwrap().as_mut().unwrap().1,
2807                    )?;
2808                    crate::error::Ok(())
2809                }
2810            };
2811
2812            if guest_offset < count {
2813                if T::MAY_REQUIRE_REALLOC {
2814                    // For payloads which may require a realloc call, use a
2815                    // oneshot::channel and background task.  This is
2816                    // necessary because calling the guest while there are
2817                    // host embedder frames on the stack is unsound.
2818                    let (tx, rx) = oneshot::channel();
2819                    tls::get(move |store| {
2820                        store
2821                            .concurrent_state_mut()
2822                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2823                                move |store| {
2824                                    _ = tx.send(accept(token.as_context_mut(store))?);
2825                                    Ok(())
2826                                },
2827                            ))))
2828                    });
2829                    rx.await?
2830                } else {
2831                    // Optimize flat payloads (i.e. those which do not
2832                    // require calling the guest's realloc function) by
2833                    // lowering directly instead of using a oneshot::channel
2834                    // and background task.
2835                    tls::get(|store| accept(token.as_context_mut(store)))?
2836                };
2837            }
2838
2839            tls::get(|store| {
2840                let count =
2841                    old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2842
2843                let transmit = store.concurrent_state_mut().get_mut(id)?;
2844
2845                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2846                    unreachable!();
2847                };
2848
2849                *guest_offset += count;
2850
2851                transmit.read = ReadState::GuestReady {
2852                    ty,
2853                    flat_abi,
2854                    options,
2855                    address,
2856                    count,
2857                    handle,
2858                    instance,
2859                    caller,
2860                };
2861
2862                crate::error::Ok(())
2863            })?;
2864
2865            Ok(())
2866        }
2867
2868        ReadState::HostToHost {
2869            accept,
2870            mut buffer,
2871            limit,
2872        } => {
2873            let mut state = StreamResult::Completed;
2874            let mut position = 0;
2875
2876            while !matches!(state, StreamResult::Dropped) && position < limit {
2877                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2878                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2879                (buffer, position, _) = slice_buffer.into_parts();
2880            }
2881
2882            {
2883                let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2884
2885                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2886                    state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2887                }
2888
2889                *pair.lock().unwrap() = Some((mine, buffer));
2890            }
2891
2892            tls::get(|store| {
2893                store.concurrent_state_mut().get_mut(id)?.read = match state {
2894                    StreamResult::Dropped => ReadState::Dropped,
2895                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2896                        accept,
2897                        buffer,
2898                        limit: 0,
2899                    },
2900                };
2901
2902                crate::error::Ok(())
2903            })?;
2904            Ok(())
2905        }
2906
2907        _ => unreachable!(),
2908    }
2909}
2910
2911impl Instance {
2912    /// Handle a host- or guest-initiated write by delivering the item(s) to the
2913    /// `StreamConsumer` for the specified stream or future.
2914    fn consume(
2915        self,
2916        store: &mut dyn VMStore,
2917        kind: TransmitKind,
2918        transmit_id: TableId<TransmitState>,
2919        consume: PollStream,
2920        guest_offset: usize,
2921        cancel: bool,
2922    ) -> Result<ReturnCode> {
2923        let mut future = consume();
2924        store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2925            consume,
2926            guest_offset,
2927            cancel,
2928            cancel_waker: None,
2929        };
2930        let poll = tls::set(store, || {
2931            future
2932                .as_mut()
2933                .poll(&mut Context::from_waker(&Waker::noop()))
2934        });
2935
2936        Ok(match poll {
2937            Poll::Ready(state) => {
2938                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2939                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2940                    unreachable!();
2941                };
2942                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2943                transmit.write = WriteState::Open;
2944                code
2945            }
2946            Poll::Pending => {
2947                store.pipe_from_guest(kind, transmit_id, future);
2948                ReturnCode::Blocked
2949            }
2950        })
2951    }
2952
2953    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
2954    /// for the specified stream or future for items.
2955    fn produce(
2956        self,
2957        store: &mut dyn VMStore,
2958        kind: TransmitKind,
2959        transmit_id: TableId<TransmitState>,
2960        produce: PollStream,
2961        try_into: TryInto,
2962        guest_offset: usize,
2963        cancel: bool,
2964    ) -> Result<ReturnCode> {
2965        let mut future = produce();
2966        store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2967            produce,
2968            try_into,
2969            guest_offset,
2970            cancel,
2971            cancel_waker: None,
2972        };
2973        let poll = tls::set(store, || {
2974            future
2975                .as_mut()
2976                .poll(&mut Context::from_waker(&Waker::noop()))
2977        });
2978
2979        Ok(match poll {
2980            Poll::Ready(state) => {
2981                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2982                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2983                    unreachable!();
2984                };
2985                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2986                transmit.read = ReadState::Open;
2987                code
2988            }
2989            Poll::Pending => {
2990                store.pipe_to_guest(kind, transmit_id, future);
2991                ReturnCode::Blocked
2992            }
2993        })
2994    }
2995
2996    /// Drop the writable end of the specified stream or future from the guest.
2997    pub(super) fn guest_drop_writable(
2998        self,
2999        store: &mut StoreOpaque,
3000        ty: TransmitIndex,
3001        writer: u32,
3002    ) -> Result<()> {
3003        let table = self.id().get_mut(store).table_for_transmit(ty);
3004        let transmit_rep = match ty {
3005            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3006            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3007        };
3008
3009        let id = TableId::<TransmitHandle>::new(transmit_rep);
3010        log::trace!("guest_drop_writable: drop writer {id:?}");
3011        match ty {
3012            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3013            TransmitIndex::Future(_) => store.host_drop_writer(
3014                id,
3015                Some(|| {
3016                    Err(format_err!(
3017                        "cannot drop future write end without first writing a value"
3018                    ))
3019                }),
3020            ),
3021        }
3022    }
3023
3024    /// Copy `count` items from `read_address` to `write_address` for the
3025    /// specified stream or future.
3026    fn copy<T: 'static>(
3027        self,
3028        mut store: StoreContextMut<T>,
3029        flat_abi: Option<FlatAbi>,
3030        write_caller: RuntimeComponentInstanceIndex,
3031        write_ty: TransmitIndex,
3032        write_options: OptionsIndex,
3033        write_address: usize,
3034        read_caller: RuntimeComponentInstanceIndex,
3035        read_ty: TransmitIndex,
3036        read_options: OptionsIndex,
3037        read_address: usize,
3038        count: usize,
3039        rep: u32,
3040    ) -> Result<()> {
3041        let types = self.id().get(store.0).component().types();
3042        match (write_ty, read_ty) {
3043            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3044                assert_eq!(count, 1);
3045
3046                let payload = types[types[write_ty].ty].payload;
3047
3048                if write_caller == read_caller && !allow_intra_component_read_write(payload) {
3049                    bail!(
3050                        "cannot read from and write to intra-component future with non-numeric payload"
3051                    )
3052                }
3053
3054                let val = payload
3055                    .map(|ty| {
3056                        let lift =
3057                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3058
3059                        let abi = lift.types.canonical_abi(&ty);
3060                        // FIXME: needs to read an i64 for memory64
3061                        if write_address % usize::try_from(abi.align32)? != 0 {
3062                            bail!("write pointer not aligned");
3063                        }
3064
3065                        let bytes = lift
3066                            .memory()
3067                            .get(write_address..)
3068                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
3069                            .ok_or_else(|| {
3070                                crate::format_err!("write pointer out of bounds of memory")
3071                            })?;
3072
3073                        Val::load(lift, ty, bytes)
3074                    })
3075                    .transpose()?;
3076
3077                if let Some(val) = val {
3078                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3079                    let types = lower.types;
3080                    let ty = types[types[read_ty].ty].payload.unwrap();
3081                    let ptr = func::validate_inbounds_dynamic(
3082                        types.canonical_abi(&ty),
3083                        lower.as_slice_mut(),
3084                        &ValRaw::u32(read_address.try_into().unwrap()),
3085                    )?;
3086                    val.store(lower, ty, ptr)?;
3087                }
3088            }
3089            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3090                if write_caller == read_caller
3091                    && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3092                {
3093                    bail!(
3094                        "cannot read from and write to intra-component stream with non-numeric payload"
3095                    )
3096                }
3097
3098                if let Some(flat_abi) = flat_abi {
3099                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
3100                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
3101                    if length_in_bytes > 0 {
3102                        if write_address % usize::try_from(flat_abi.align)? != 0 {
3103                            bail!("write pointer not aligned");
3104                        }
3105                        if read_address % usize::try_from(flat_abi.align)? != 0 {
3106                            bail!("read pointer not aligned");
3107                        }
3108
3109                        let store_opaque = store.0.store_opaque_mut();
3110
3111                        {
3112                            let src = self
3113                                .options_memory(store_opaque, write_options)
3114                                .get(write_address..)
3115                                .and_then(|b| b.get(..length_in_bytes))
3116                                .ok_or_else(|| {
3117                                    crate::format_err!("write pointer out of bounds of memory")
3118                                })?
3119                                .as_ptr();
3120                            let dst = self
3121                                .options_memory_mut(store_opaque, read_options)
3122                                .get_mut(read_address..)
3123                                .and_then(|b| b.get_mut(..length_in_bytes))
3124                                .ok_or_else(|| {
3125                                    crate::format_err!("read pointer out of bounds of memory")
3126                                })?
3127                                .as_mut_ptr();
3128                            // SAFETY: Both `src` and `dst` have been validated
3129                            // above.
3130                            unsafe {
3131                                if write_caller == read_caller {
3132                                    // If the same instance owns both ends of
3133                                    // the stream, the source and destination
3134                                    // buffers might overlap.
3135                                    src.copy_to(dst, length_in_bytes)
3136                                } else {
3137                                    // Since the read and write ends of the
3138                                    // stream are owned by distinct instances,
3139                                    // the buffers cannot possibly belong to the
3140                                    // same memory and thus cannot overlap.
3141                                    src.copy_to_nonoverlapping(dst, length_in_bytes)
3142                                }
3143                            }
3144                        }
3145                    }
3146                } else {
3147                    let store_opaque = store.0.store_opaque_mut();
3148                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
3149                    let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3150                    let abi = lift.types.canonical_abi(&ty);
3151                    let size = usize::try_from(abi.size32).unwrap();
3152                    if write_address % usize::try_from(abi.align32)? != 0 {
3153                        bail!("write pointer not aligned");
3154                    }
3155                    let bytes = lift
3156                        .memory()
3157                        .get(write_address..)
3158                        .and_then(|b| b.get(..size * count))
3159                        .ok_or_else(|| {
3160                            crate::format_err!("write pointer out of bounds of memory")
3161                        })?;
3162
3163                    let values = (0..count)
3164                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3165                        .collect::<Result<Vec<_>>>()?;
3166
3167                    let id = TableId::<TransmitHandle>::new(rep);
3168                    log::trace!("copy values {values:?} for {id:?}");
3169
3170                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3171                    let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3172                    let abi = lower.types.canonical_abi(&ty);
3173                    if read_address % usize::try_from(abi.align32)? != 0 {
3174                        bail!("read pointer not aligned");
3175                    }
3176                    let size = usize::try_from(abi.size32).unwrap();
3177                    lower
3178                        .as_slice_mut()
3179                        .get_mut(read_address..)
3180                        .and_then(|b| b.get_mut(..size * count))
3181                        .ok_or_else(|| {
3182                            crate::format_err!("read pointer out of bounds of memory")
3183                        })?;
3184                    let mut ptr = read_address;
3185                    for value in values {
3186                        value.store(lower, ty, ptr)?;
3187                        ptr += size
3188                    }
3189                }
3190            }
3191            _ => unreachable!(),
3192        }
3193
3194        Ok(())
3195    }
3196
3197    fn check_bounds(
3198        self,
3199        store: &StoreOpaque,
3200        options: OptionsIndex,
3201        ty: TransmitIndex,
3202        address: usize,
3203        count: usize,
3204    ) -> Result<()> {
3205        let types = self.id().get(store).component().types();
3206        let size = usize::try_from(
3207            match ty {
3208                TransmitIndex::Future(ty) => types[types[ty].ty]
3209                    .payload
3210                    .map(|ty| types.canonical_abi(&ty).size32),
3211                TransmitIndex::Stream(ty) => types[types[ty].ty]
3212                    .payload
3213                    .map(|ty| types.canonical_abi(&ty).size32),
3214            }
3215            .unwrap_or(0),
3216        )
3217        .unwrap();
3218
3219        if count > 0 && size > 0 {
3220            self.options_memory(store, options)
3221                .get(address..)
3222                .and_then(|b| b.get(..(size * count)))
3223                .map(drop)
3224                .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3225        } else {
3226            Ok(())
3227        }
3228    }
3229
3230    /// Write to the specified stream or future from the guest.
3231    pub(super) fn guest_write<T: 'static>(
3232        self,
3233        mut store: StoreContextMut<T>,
3234        caller: RuntimeComponentInstanceIndex,
3235        ty: TransmitIndex,
3236        options: OptionsIndex,
3237        flat_abi: Option<FlatAbi>,
3238        handle: u32,
3239        address: u32,
3240        count: u32,
3241    ) -> Result<ReturnCode> {
3242        if !self.options(store.0, options).async_ {
3243            // The caller may only sync call `{stream,future}.write` from an
3244            // async task (i.e. a task created via a call to an async export).
3245            // Otherwise, we'll trap.
3246            store.0.check_blocking()?;
3247        }
3248
3249        let address = usize::try_from(address).unwrap();
3250        let count = usize::try_from(count).unwrap();
3251        self.check_bounds(store.0, options, ty, address, count)?;
3252        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3253        let TransmitLocalState::Write { done } = *state else {
3254            bail!(
3255                "invalid handle {handle}; expected `Write`; got {:?}",
3256                *state
3257            );
3258        };
3259
3260        if done {
3261            bail!("cannot write to stream after being notified that the readable end dropped");
3262        }
3263
3264        *state = TransmitLocalState::Busy;
3265        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3266        let concurrent_state = store.0.concurrent_state_mut();
3267        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3268        let transmit = concurrent_state.get_mut(transmit_id)?;
3269        log::trace!(
3270            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3271            transmit.read
3272        );
3273
3274        if transmit.done {
3275            bail!("cannot write to future after previous write succeeded or readable end dropped");
3276        }
3277
3278        let new_state = if let ReadState::Dropped = &transmit.read {
3279            ReadState::Dropped
3280        } else {
3281            ReadState::Open
3282        };
3283
3284        let set_guest_ready = |me: &mut ConcurrentState| {
3285            let transmit = me.get_mut(transmit_id)?;
3286            assert!(
3287                matches!(&transmit.write, WriteState::Open),
3288                "expected `WriteState::Open`; got `{:?}`",
3289                transmit.write
3290            );
3291            transmit.write = WriteState::GuestReady {
3292                instance: self,
3293                caller,
3294                ty,
3295                flat_abi,
3296                options,
3297                address,
3298                count,
3299                handle,
3300            };
3301            Ok::<_, crate::Error>(())
3302        };
3303
3304        let mut result = match mem::replace(&mut transmit.read, new_state) {
3305            ReadState::GuestReady {
3306                ty: read_ty,
3307                flat_abi: read_flat_abi,
3308                options: read_options,
3309                address: read_address,
3310                count: read_count,
3311                handle: read_handle,
3312                instance: read_instance,
3313                caller: read_caller,
3314            } => {
3315                assert_eq!(flat_abi, read_flat_abi);
3316
3317                if let TransmitIndex::Future(_) = ty {
3318                    transmit.done = true;
3319                }
3320
3321                // Note that zero-length reads and writes are handling specially
3322                // by the spec to allow each end to signal readiness to the
3323                // other.  Quoting the spec:
3324                //
3325                // ```
3326                // The meaning of a read or write when the length is 0 is that
3327                // the caller is querying the "readiness" of the other
3328                // side. When a 0-length read/write rendezvous with a
3329                // non-0-length read/write, only the 0-length read/write
3330                // completes; the non-0-length read/write is kept pending (and
3331                // ready for a subsequent rendezvous).
3332                //
3333                // In the corner case where a 0-length read and write
3334                // rendezvous, only the writer is notified of readiness. To
3335                // avoid livelock, the Canonical ABI requires that a writer must
3336                // (eventually) follow a completed 0-length write with a
3337                // non-0-length write that is allowed to block (allowing the
3338                // reader end to run and rendezvous with its own non-0-length
3339                // read).
3340                // ```
3341
3342                let write_complete = count == 0 || read_count > 0;
3343                let read_complete = count > 0;
3344                let read_buffer_remaining = count < read_count;
3345
3346                let read_handle_rep = transmit.read_handle.rep();
3347
3348                let count = count.min(read_count);
3349
3350                self.copy(
3351                    store.as_context_mut(),
3352                    flat_abi,
3353                    caller,
3354                    ty,
3355                    options,
3356                    address,
3357                    read_caller,
3358                    read_ty,
3359                    read_options,
3360                    read_address,
3361                    count,
3362                    rep,
3363                )?;
3364
3365                let instance = self.id().get_mut(store.0);
3366                let types = instance.component().types();
3367                let item_size = payload(ty, types)
3368                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3369                    .unwrap_or(0);
3370                let concurrent_state = store.0.concurrent_state_mut();
3371                if read_complete {
3372                    let count = u32::try_from(count).unwrap();
3373                    let total = if let Some(Event::StreamRead {
3374                        code: ReturnCode::Completed(old_total),
3375                        ..
3376                    }) = concurrent_state.take_event(read_handle_rep)?
3377                    {
3378                        count + old_total
3379                    } else {
3380                        count
3381                    };
3382
3383                    let code = ReturnCode::completed(ty.kind(), total);
3384
3385                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3386                }
3387
3388                if read_buffer_remaining {
3389                    let transmit = concurrent_state.get_mut(transmit_id)?;
3390                    transmit.read = ReadState::GuestReady {
3391                        ty: read_ty,
3392                        flat_abi: read_flat_abi,
3393                        options: read_options,
3394                        address: read_address + (count * item_size),
3395                        count: read_count - count,
3396                        handle: read_handle,
3397                        instance: read_instance,
3398                        caller: read_caller,
3399                    };
3400                }
3401
3402                if write_complete {
3403                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3404                } else {
3405                    set_guest_ready(concurrent_state)?;
3406                    ReturnCode::Blocked
3407                }
3408            }
3409
3410            ReadState::HostReady {
3411                consume,
3412                guest_offset,
3413                cancel,
3414                cancel_waker,
3415            } => {
3416                assert!(cancel_waker.is_none());
3417                assert!(!cancel);
3418                assert_eq!(0, guest_offset);
3419
3420                if let TransmitIndex::Future(_) = ty {
3421                    transmit.done = true;
3422                }
3423
3424                set_guest_ready(concurrent_state)?;
3425                self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3426            }
3427
3428            ReadState::HostToHost { .. } => unreachable!(),
3429
3430            ReadState::Open => {
3431                set_guest_ready(concurrent_state)?;
3432                ReturnCode::Blocked
3433            }
3434
3435            ReadState::Dropped => {
3436                if let TransmitIndex::Future(_) = ty {
3437                    transmit.done = true;
3438                }
3439
3440                ReturnCode::Dropped(0)
3441            }
3442        };
3443
3444        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3445            result = self.wait_for_write(store.0, transmit_handle)?;
3446        }
3447
3448        if result != ReturnCode::Blocked {
3449            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3450                TransmitLocalState::Write {
3451                    done: matches!(
3452                        (result, ty),
3453                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3454                    ),
3455                };
3456        }
3457
3458        log::trace!(
3459            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3460        );
3461
3462        Ok(result)
3463    }
3464
3465    /// Read from the specified stream or future from the guest.
3466    pub(super) fn guest_read<T: 'static>(
3467        self,
3468        mut store: StoreContextMut<T>,
3469        caller: RuntimeComponentInstanceIndex,
3470        ty: TransmitIndex,
3471        options: OptionsIndex,
3472        flat_abi: Option<FlatAbi>,
3473        handle: u32,
3474        address: u32,
3475        count: u32,
3476    ) -> Result<ReturnCode> {
3477        if !self.options(store.0, options).async_ {
3478            // The caller may only sync call `{stream,future}.read` from an
3479            // async task (i.e. a task created via a call to an async export).
3480            // Otherwise, we'll trap.
3481            store.0.check_blocking()?;
3482        }
3483
3484        let address = usize::try_from(address).unwrap();
3485        let count = usize::try_from(count).unwrap();
3486        self.check_bounds(store.0, options, ty, address, count)?;
3487        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3488        let TransmitLocalState::Read { done } = *state else {
3489            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3490        };
3491
3492        if done {
3493            bail!("cannot read from stream after being notified that the writable end dropped");
3494        }
3495
3496        *state = TransmitLocalState::Busy;
3497        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3498        let concurrent_state = store.0.concurrent_state_mut();
3499        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3500        let transmit = concurrent_state.get_mut(transmit_id)?;
3501        log::trace!(
3502            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3503            transmit.write
3504        );
3505
3506        if transmit.done {
3507            bail!("cannot read from future after previous read succeeded");
3508        }
3509
3510        let new_state = if let WriteState::Dropped = &transmit.write {
3511            WriteState::Dropped
3512        } else {
3513            WriteState::Open
3514        };
3515
3516        let set_guest_ready = |me: &mut ConcurrentState| {
3517            let transmit = me.get_mut(transmit_id)?;
3518            assert!(
3519                matches!(&transmit.read, ReadState::Open),
3520                "expected `ReadState::Open`; got `{:?}`",
3521                transmit.read
3522            );
3523            transmit.read = ReadState::GuestReady {
3524                ty,
3525                flat_abi,
3526                options,
3527                address,
3528                count,
3529                handle,
3530                instance: self,
3531                caller,
3532            };
3533            Ok::<_, crate::Error>(())
3534        };
3535
3536        let mut result = match mem::replace(&mut transmit.write, new_state) {
3537            WriteState::GuestReady {
3538                instance: _,
3539                ty: write_ty,
3540                flat_abi: write_flat_abi,
3541                options: write_options,
3542                address: write_address,
3543                count: write_count,
3544                handle: write_handle,
3545                caller: write_caller,
3546            } => {
3547                assert_eq!(flat_abi, write_flat_abi);
3548
3549                if let TransmitIndex::Future(_) = ty {
3550                    transmit.done = true;
3551                }
3552
3553                let write_handle_rep = transmit.write_handle.rep();
3554
3555                // See the comment in `guest_write` for the
3556                // `ReadState::GuestReady` case concerning zero-length reads and
3557                // writes.
3558
3559                let write_complete = write_count == 0 || count > 0;
3560                let read_complete = write_count > 0;
3561                let write_buffer_remaining = count < write_count;
3562
3563                let count = count.min(write_count);
3564
3565                self.copy(
3566                    store.as_context_mut(),
3567                    flat_abi,
3568                    write_caller,
3569                    write_ty,
3570                    write_options,
3571                    write_address,
3572                    caller,
3573                    ty,
3574                    options,
3575                    address,
3576                    count,
3577                    rep,
3578                )?;
3579
3580                let instance = self.id().get_mut(store.0);
3581                let types = instance.component().types();
3582                let item_size = payload(ty, types)
3583                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3584                    .unwrap_or(0);
3585                let concurrent_state = store.0.concurrent_state_mut();
3586
3587                if write_complete {
3588                    let count = u32::try_from(count).unwrap();
3589                    let total = if let Some(Event::StreamWrite {
3590                        code: ReturnCode::Completed(old_total),
3591                        ..
3592                    }) = concurrent_state.take_event(write_handle_rep)?
3593                    {
3594                        count + old_total
3595                    } else {
3596                        count
3597                    };
3598
3599                    let code = ReturnCode::completed(ty.kind(), total);
3600
3601                    concurrent_state.send_write_result(
3602                        write_ty,
3603                        transmit_id,
3604                        write_handle,
3605                        code,
3606                    )?;
3607                }
3608
3609                if write_buffer_remaining {
3610                    let transmit = concurrent_state.get_mut(transmit_id)?;
3611                    transmit.write = WriteState::GuestReady {
3612                        instance: self,
3613                        caller: write_caller,
3614                        ty: write_ty,
3615                        flat_abi: write_flat_abi,
3616                        options: write_options,
3617                        address: write_address + (count * item_size),
3618                        count: write_count - count,
3619                        handle: write_handle,
3620                    };
3621                }
3622
3623                if read_complete {
3624                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3625                } else {
3626                    set_guest_ready(concurrent_state)?;
3627                    ReturnCode::Blocked
3628                }
3629            }
3630
3631            WriteState::HostReady {
3632                produce,
3633                try_into,
3634                guest_offset,
3635                cancel,
3636                cancel_waker,
3637            } => {
3638                assert!(cancel_waker.is_none());
3639                assert!(!cancel);
3640                assert_eq!(0, guest_offset);
3641
3642                set_guest_ready(concurrent_state)?;
3643
3644                let code =
3645                    self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3646
3647                if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3648                    store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3649                }
3650
3651                code
3652            }
3653
3654            WriteState::Open => {
3655                set_guest_ready(concurrent_state)?;
3656                ReturnCode::Blocked
3657            }
3658
3659            WriteState::Dropped => ReturnCode::Dropped(0),
3660        };
3661
3662        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3663            result = self.wait_for_read(store.0, transmit_handle)?;
3664        }
3665
3666        if result != ReturnCode::Blocked {
3667            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3668                TransmitLocalState::Read {
3669                    done: matches!(
3670                        (result, ty),
3671                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3672                    ),
3673                };
3674        }
3675
3676        log::trace!(
3677            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3678        );
3679
3680        Ok(result)
3681    }
3682
3683    fn wait_for_write(
3684        self,
3685        store: &mut StoreOpaque,
3686        handle: TableId<TransmitHandle>,
3687    ) -> Result<ReturnCode> {
3688        let waitable = Waitable::Transmit(handle);
3689        store.wait_for_event(waitable)?;
3690        let event = waitable.take_event(store.concurrent_state_mut())?;
3691        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3692            event
3693        {
3694            waitable.on_delivery(store, self, event);
3695            Ok(code)
3696        } else {
3697            unreachable!()
3698        }
3699    }
3700
3701    /// Cancel a pending stream or future write.
3702    fn cancel_write(
3703        self,
3704        store: &mut StoreOpaque,
3705        transmit_id: TableId<TransmitState>,
3706        async_: bool,
3707    ) -> Result<ReturnCode> {
3708        let state = store.concurrent_state_mut();
3709        let transmit = state.get_mut(transmit_id)?;
3710        log::trace!(
3711            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3712            transmit.read,
3713            transmit.write
3714        );
3715
3716        let code = if let Some(event) =
3717            Waitable::Transmit(transmit.write_handle).take_event(state)?
3718        {
3719            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3720                unreachable!();
3721            };
3722            match (code, event) {
3723                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3724                    ReturnCode::Cancelled(count)
3725                }
3726                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3727                _ => unreachable!(),
3728            }
3729        } else if let ReadState::HostReady {
3730            cancel,
3731            cancel_waker,
3732            ..
3733        } = &mut state.get_mut(transmit_id)?.read
3734        {
3735            *cancel = true;
3736            if let Some(waker) = cancel_waker.take() {
3737                waker.wake();
3738            }
3739
3740            if async_ {
3741                ReturnCode::Blocked
3742            } else {
3743                let handle = store
3744                    .concurrent_state_mut()
3745                    .get_mut(transmit_id)?
3746                    .write_handle;
3747                self.wait_for_write(store, handle)?
3748            }
3749        } else {
3750            ReturnCode::Cancelled(0)
3751        };
3752
3753        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3754
3755        match &transmit.write {
3756            WriteState::GuestReady { .. } => {
3757                transmit.write = WriteState::Open;
3758            }
3759            WriteState::HostReady { .. } => todo!("support host write cancellation"),
3760            WriteState::Open | WriteState::Dropped => {}
3761        }
3762
3763        log::trace!("cancelled write {transmit_id:?}: {code:?}");
3764
3765        Ok(code)
3766    }
3767
3768    fn wait_for_read(
3769        self,
3770        store: &mut StoreOpaque,
3771        handle: TableId<TransmitHandle>,
3772    ) -> Result<ReturnCode> {
3773        let waitable = Waitable::Transmit(handle);
3774        store.wait_for_event(waitable)?;
3775        let event = waitable.take_event(store.concurrent_state_mut())?;
3776        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3777            event
3778        {
3779            waitable.on_delivery(store, self, event);
3780            Ok(code)
3781        } else {
3782            unreachable!()
3783        }
3784    }
3785
3786    /// Cancel a pending stream or future read.
3787    fn cancel_read(
3788        self,
3789        store: &mut StoreOpaque,
3790        transmit_id: TableId<TransmitState>,
3791        async_: bool,
3792    ) -> Result<ReturnCode> {
3793        let state = store.concurrent_state_mut();
3794        let transmit = state.get_mut(transmit_id)?;
3795        log::trace!(
3796            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3797            transmit.read,
3798            transmit.write
3799        );
3800
3801        let code = if let Some(event) =
3802            Waitable::Transmit(transmit.read_handle).take_event(state)?
3803        {
3804            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3805                unreachable!();
3806            };
3807            match (code, event) {
3808                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3809                    ReturnCode::Cancelled(count)
3810                }
3811                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3812                _ => unreachable!(),
3813            }
3814        } else if let WriteState::HostReady {
3815            cancel,
3816            cancel_waker,
3817            ..
3818        } = &mut state.get_mut(transmit_id)?.write
3819        {
3820            *cancel = true;
3821            if let Some(waker) = cancel_waker.take() {
3822                waker.wake();
3823            }
3824
3825            if async_ {
3826                ReturnCode::Blocked
3827            } else {
3828                let handle = store
3829                    .concurrent_state_mut()
3830                    .get_mut(transmit_id)?
3831                    .read_handle;
3832                self.wait_for_read(store, handle)?
3833            }
3834        } else {
3835            ReturnCode::Cancelled(0)
3836        };
3837
3838        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3839
3840        match &transmit.read {
3841            ReadState::GuestReady { .. } => {
3842                transmit.read = ReadState::Open;
3843            }
3844            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3845                todo!("support host read cancellation")
3846            }
3847            ReadState::Open | ReadState::Dropped => {}
3848        }
3849
3850        log::trace!("cancelled read {transmit_id:?}: {code:?}");
3851
3852        Ok(code)
3853    }
3854
3855    /// Cancel a pending write for the specified stream or future from the guest.
3856    fn guest_cancel_write(
3857        self,
3858        store: &mut StoreOpaque,
3859        ty: TransmitIndex,
3860        async_: bool,
3861        writer: u32,
3862    ) -> Result<ReturnCode> {
3863        if !async_ {
3864            // The caller may only sync call `{stream,future}.cancel-write` from
3865            // an async task (i.e. a task created via a call to an async
3866            // export).  Otherwise, we'll trap.
3867            store.check_blocking()?;
3868        }
3869
3870        let (rep, state) =
3871            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3872        let id = TableId::<TransmitHandle>::new(rep);
3873        log::trace!("guest cancel write {id:?} (handle {writer})");
3874        match state {
3875            TransmitLocalState::Write { .. } => {
3876                bail!("stream or future write cancelled when no write is pending")
3877            }
3878            TransmitLocalState::Read { .. } => {
3879                bail!("passed read end to `{{stream|future}}.cancel-write`")
3880            }
3881            TransmitLocalState::Busy => {}
3882        }
3883        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3884        let code = self.cancel_write(store, transmit_id, async_)?;
3885        if !matches!(code, ReturnCode::Blocked) {
3886            let state =
3887                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3888                    .1;
3889            if let TransmitLocalState::Busy = state {
3890                *state = TransmitLocalState::Write { done: false };
3891            }
3892        }
3893        Ok(code)
3894    }
3895
3896    /// Cancel a pending read for the specified stream or future from the guest.
3897    fn guest_cancel_read(
3898        self,
3899        store: &mut StoreOpaque,
3900        ty: TransmitIndex,
3901        async_: bool,
3902        reader: u32,
3903    ) -> Result<ReturnCode> {
3904        if !async_ {
3905            // The caller may only sync call `{stream,future}.cancel-read` from
3906            // an async task (i.e. a task created via a call to an async
3907            // export).  Otherwise, we'll trap.
3908            store.check_blocking()?;
3909        }
3910
3911        let (rep, state) =
3912            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3913        let id = TableId::<TransmitHandle>::new(rep);
3914        log::trace!("guest cancel read {id:?} (handle {reader})");
3915        match state {
3916            TransmitLocalState::Read { .. } => {
3917                bail!("stream or future read cancelled when no read is pending")
3918            }
3919            TransmitLocalState::Write { .. } => {
3920                bail!("passed write end to `{{stream|future}}.cancel-read`")
3921            }
3922            TransmitLocalState::Busy => {}
3923        }
3924        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3925        let code = self.cancel_read(store, transmit_id, async_)?;
3926        if !matches!(code, ReturnCode::Blocked) {
3927            let state =
3928                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3929                    .1;
3930            if let TransmitLocalState::Busy = state {
3931                *state = TransmitLocalState::Read { done: false };
3932            }
3933        }
3934        Ok(code)
3935    }
3936
3937    /// Drop the readable end of the specified stream or future from the guest.
3938    fn guest_drop_readable(
3939        self,
3940        store: &mut StoreOpaque,
3941        ty: TransmitIndex,
3942        reader: u32,
3943    ) -> Result<()> {
3944        let table = self.id().get_mut(store).table_for_transmit(ty);
3945        let (rep, _is_done) = match ty {
3946            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3947            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3948        };
3949        let kind = match ty {
3950            TransmitIndex::Stream(_) => TransmitKind::Stream,
3951            TransmitIndex::Future(_) => TransmitKind::Future,
3952        };
3953        let id = TableId::<TransmitHandle>::new(rep);
3954        log::trace!("guest_drop_readable: drop reader {id:?}");
3955        store.host_drop_reader(id, kind)
3956    }
3957
3958    /// Create a new error context for the given component.
3959    pub(crate) fn error_context_new(
3960        self,
3961        store: &mut StoreOpaque,
3962        caller: RuntimeComponentInstanceIndex,
3963        ty: TypeComponentLocalErrorContextTableIndex,
3964        options: OptionsIndex,
3965        debug_msg_address: u32,
3966        debug_msg_len: u32,
3967    ) -> Result<u32> {
3968        self.id().get(store).check_may_leave(caller)?;
3969        let lift_ctx = &mut LiftContext::new(store, options, self);
3970        let debug_msg = String::linear_lift_from_flat(
3971            lift_ctx,
3972            InterfaceType::String,
3973            &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
3974        )?;
3975
3976        // Create a new ErrorContext that is tracked along with other concurrent state
3977        let err_ctx = ErrorContextState { debug_msg };
3978        let state = store.concurrent_state_mut();
3979        let table_id = state.push(err_ctx)?;
3980        let global_ref_count_idx =
3981            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3982
3983        // Add to the global error context ref counts
3984        let _ = state
3985            .global_error_context_ref_counts
3986            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3987
3988        // Error context are tracked both locally (to a single component instance) and globally
3989        // the counts for both must stay in sync.
3990        //
3991        // Here we reflect the newly created global concurrent error context state into the
3992        // component instance's locally tracked count, along with the appropriate key into the global
3993        // ref tracking data structures to enable later lookup
3994        let local_idx = self
3995            .id()
3996            .get_mut(store)
3997            .table_for_error_context(ty)
3998            .error_context_insert(table_id.rep())?;
3999
4000        Ok(local_idx)
4001    }
4002
4003    /// Retrieve the debug message from the specified error context.
4004    pub(super) fn error_context_debug_message<T>(
4005        self,
4006        store: StoreContextMut<T>,
4007        ty: TypeComponentLocalErrorContextTableIndex,
4008        options: OptionsIndex,
4009        err_ctx_handle: u32,
4010        debug_msg_address: u32,
4011    ) -> Result<()> {
4012        // Retrieve the error context and internal debug message
4013        let handle_table_id_rep = self
4014            .id()
4015            .get_mut(store.0)
4016            .table_for_error_context(ty)
4017            .error_context_rep(err_ctx_handle)?;
4018
4019        let state = store.0.concurrent_state_mut();
4020        // Get the state associated with the error context
4021        let ErrorContextState { debug_msg } =
4022            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4023        let debug_msg = debug_msg.clone();
4024
4025        let lower_cx = &mut LowerContext::new(store, options, self);
4026        let debug_msg_address = usize::try_from(debug_msg_address)?;
4027        // Lower the string into the component's memory
4028        let offset = lower_cx
4029            .as_slice_mut()
4030            .get(debug_msg_address..)
4031            .and_then(|b| b.get(..debug_msg.bytes().len()))
4032            .map(|_| debug_msg_address)
4033            .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4034        debug_msg
4035            .as_str()
4036            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4037
4038        Ok(())
4039    }
4040
4041    /// Implements the `future.cancel-read` intrinsic.
4042    pub(crate) fn future_cancel_read(
4043        self,
4044        store: &mut StoreOpaque,
4045        caller: RuntimeComponentInstanceIndex,
4046        ty: TypeFutureTableIndex,
4047        async_: bool,
4048        reader: u32,
4049    ) -> Result<u32> {
4050        self.id().get(store).check_may_leave(caller)?;
4051        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4052            .map(|v| v.encode())
4053    }
4054
4055    /// Implements the `future.cancel-write` intrinsic.
4056    pub(crate) fn future_cancel_write(
4057        self,
4058        store: &mut StoreOpaque,
4059        caller: RuntimeComponentInstanceIndex,
4060        ty: TypeFutureTableIndex,
4061        async_: bool,
4062        writer: u32,
4063    ) -> Result<u32> {
4064        self.id().get(store).check_may_leave(caller)?;
4065        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4066            .map(|v| v.encode())
4067    }
4068
4069    /// Implements the `stream.cancel-read` intrinsic.
4070    pub(crate) fn stream_cancel_read(
4071        self,
4072        store: &mut StoreOpaque,
4073        caller: RuntimeComponentInstanceIndex,
4074        ty: TypeStreamTableIndex,
4075        async_: bool,
4076        reader: u32,
4077    ) -> Result<u32> {
4078        self.id().get(store).check_may_leave(caller)?;
4079        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4080            .map(|v| v.encode())
4081    }
4082
4083    /// Implements the `stream.cancel-write` intrinsic.
4084    pub(crate) fn stream_cancel_write(
4085        self,
4086        store: &mut StoreOpaque,
4087        caller: RuntimeComponentInstanceIndex,
4088        ty: TypeStreamTableIndex,
4089        async_: bool,
4090        writer: u32,
4091    ) -> Result<u32> {
4092        self.id().get(store).check_may_leave(caller)?;
4093        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4094            .map(|v| v.encode())
4095    }
4096
4097    /// Implements the `future.drop-readable` intrinsic.
4098    pub(crate) fn future_drop_readable(
4099        self,
4100        store: &mut StoreOpaque,
4101        caller: RuntimeComponentInstanceIndex,
4102        ty: TypeFutureTableIndex,
4103        reader: u32,
4104    ) -> Result<()> {
4105        self.id().get(store).check_may_leave(caller)?;
4106        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4107    }
4108
4109    /// Implements the `stream.drop-readable` intrinsic.
4110    pub(crate) fn stream_drop_readable(
4111        self,
4112        store: &mut StoreOpaque,
4113        caller: RuntimeComponentInstanceIndex,
4114        ty: TypeStreamTableIndex,
4115        reader: u32,
4116    ) -> Result<()> {
4117        self.id().get(store).check_may_leave(caller)?;
4118        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4119    }
4120
4121    /// Allocate a new future or stream and grant ownership of both the read and
4122    /// write ends to the (sub-)component instance to which the specified
4123    /// `TransmitIndex` belongs.
4124    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4125        let (write, read) = store
4126            .concurrent_state_mut()
4127            .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4128
4129        let table = self.id().get_mut(store).table_for_transmit(ty);
4130        let (read_handle, write_handle) = match ty {
4131            TransmitIndex::Future(ty) => (
4132                table.future_insert_read(ty, read.rep())?,
4133                table.future_insert_write(ty, write.rep())?,
4134            ),
4135            TransmitIndex::Stream(ty) => (
4136                table.stream_insert_read(ty, read.rep())?,
4137                table.stream_insert_write(ty, write.rep())?,
4138            ),
4139        };
4140
4141        let state = store.concurrent_state_mut();
4142        state.get_mut(read)?.common.handle = Some(read_handle);
4143        state.get_mut(write)?.common.handle = Some(write_handle);
4144
4145        Ok(ResourcePair {
4146            write: write_handle,
4147            read: read_handle,
4148        })
4149    }
4150
4151    /// Drop the specified error context.
4152    pub(crate) fn error_context_drop(
4153        self,
4154        store: &mut StoreOpaque,
4155        caller: RuntimeComponentInstanceIndex,
4156        ty: TypeComponentLocalErrorContextTableIndex,
4157        error_context: u32,
4158    ) -> Result<()> {
4159        let instance = self.id().get_mut(store);
4160        instance.check_may_leave(caller)?;
4161
4162        let local_handle_table = instance.table_for_error_context(ty);
4163
4164        let rep = local_handle_table.error_context_drop(error_context)?;
4165
4166        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4167
4168        let state = store.concurrent_state_mut();
4169        let GlobalErrorContextRefCount(global_ref_count) = state
4170            .global_error_context_ref_counts
4171            .get_mut(&global_ref_count_idx)
4172            .expect("retrieve concurrent state for error context during drop");
4173
4174        // Reduce the component-global ref count, removing tracking if necessary
4175        assert!(*global_ref_count >= 1);
4176        *global_ref_count -= 1;
4177        if *global_ref_count == 0 {
4178            state
4179                .global_error_context_ref_counts
4180                .remove(&global_ref_count_idx);
4181
4182            state
4183                .delete(TableId::<ErrorContextState>::new(rep))
4184                .context("deleting component-global error context data")?;
4185        }
4186
4187        Ok(())
4188    }
4189
4190    /// Transfer ownership of the specified stream or future read end from one
4191    /// guest to another.
4192    fn guest_transfer(
4193        self,
4194        store: &mut StoreOpaque,
4195        src_idx: u32,
4196        src: TransmitIndex,
4197        dst: TransmitIndex,
4198    ) -> Result<u32> {
4199        let mut instance = self.id().get_mut(store);
4200        let src_table = instance.as_mut().table_for_transmit(src);
4201        let (rep, is_done) = match src {
4202            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4203            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4204        };
4205        if is_done {
4206            bail!("cannot lift after being notified that the writable end dropped");
4207        }
4208        let dst_table = instance.table_for_transmit(dst);
4209        let handle = match dst {
4210            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4211            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4212        }?;
4213        store
4214            .concurrent_state_mut()
4215            .get_mut(TableId::<TransmitHandle>::new(rep))?
4216            .common
4217            .handle = Some(handle);
4218        Ok(handle)
4219    }
4220
4221    /// Implements the `future.new` intrinsic.
4222    pub(crate) fn future_new(
4223        self,
4224        store: &mut StoreOpaque,
4225        caller: RuntimeComponentInstanceIndex,
4226        ty: TypeFutureTableIndex,
4227    ) -> Result<ResourcePair> {
4228        self.id().get(store).check_may_leave(caller)?;
4229        self.guest_new(store, TransmitIndex::Future(ty))
4230    }
4231
4232    /// Implements the `stream.new` intrinsic.
4233    pub(crate) fn stream_new(
4234        self,
4235        store: &mut StoreOpaque,
4236        caller: RuntimeComponentInstanceIndex,
4237        ty: TypeStreamTableIndex,
4238    ) -> Result<ResourcePair> {
4239        self.id().get(store).check_may_leave(caller)?;
4240        self.guest_new(store, TransmitIndex::Stream(ty))
4241    }
4242
4243    /// Transfer ownership of the specified future read end from one guest to
4244    /// another.
4245    pub(crate) fn future_transfer(
4246        self,
4247        store: &mut StoreOpaque,
4248        src_idx: u32,
4249        src: TypeFutureTableIndex,
4250        dst: TypeFutureTableIndex,
4251    ) -> Result<u32> {
4252        self.guest_transfer(
4253            store,
4254            src_idx,
4255            TransmitIndex::Future(src),
4256            TransmitIndex::Future(dst),
4257        )
4258    }
4259
4260    /// Transfer ownership of the specified stream read end from one guest to
4261    /// another.
4262    pub(crate) fn stream_transfer(
4263        self,
4264        store: &mut StoreOpaque,
4265        src_idx: u32,
4266        src: TypeStreamTableIndex,
4267        dst: TypeStreamTableIndex,
4268    ) -> Result<u32> {
4269        self.guest_transfer(
4270            store,
4271            src_idx,
4272            TransmitIndex::Stream(src),
4273            TransmitIndex::Stream(dst),
4274        )
4275    }
4276
4277    /// Copy the specified error context from one component to another.
4278    pub(crate) fn error_context_transfer(
4279        self,
4280        store: &mut StoreOpaque,
4281        src_idx: u32,
4282        src: TypeComponentLocalErrorContextTableIndex,
4283        dst: TypeComponentLocalErrorContextTableIndex,
4284    ) -> Result<u32> {
4285        let mut instance = self.id().get_mut(store);
4286        let rep = instance
4287            .as_mut()
4288            .table_for_error_context(src)
4289            .error_context_rep(src_idx)?;
4290        let dst_idx = instance
4291            .table_for_error_context(dst)
4292            .error_context_insert(rep)?;
4293
4294        // Update the global (cross-subcomponent) count for error contexts
4295        // as the new component has essentially created a new reference that will
4296        // be dropped/handled independently
4297        let global_ref_count = store
4298            .concurrent_state_mut()
4299            .global_error_context_ref_counts
4300            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4301            .context("global ref count present for existing (sub)component error context")?;
4302        global_ref_count.0 += 1;
4303
4304        Ok(dst_idx)
4305    }
4306}
4307
4308impl ComponentInstance {
4309    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4310        let (states, types) = self.instance_states();
4311        let runtime_instance = match ty {
4312            TransmitIndex::Stream(ty) => types[ty].instance,
4313            TransmitIndex::Future(ty) => types[ty].instance,
4314        };
4315        states[runtime_instance].handle_table()
4316    }
4317
4318    fn table_for_error_context(
4319        self: Pin<&mut Self>,
4320        ty: TypeComponentLocalErrorContextTableIndex,
4321    ) -> &mut HandleTable {
4322        let (states, types) = self.instance_states();
4323        let runtime_instance = types[ty].instance;
4324        states[runtime_instance].handle_table()
4325    }
4326
4327    fn get_mut_by_index(
4328        self: Pin<&mut Self>,
4329        ty: TransmitIndex,
4330        index: u32,
4331    ) -> Result<(u32, &mut TransmitLocalState)> {
4332        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4333    }
4334}
4335
4336impl ConcurrentState {
4337    fn send_write_result(
4338        &mut self,
4339        ty: TransmitIndex,
4340        id: TableId<TransmitState>,
4341        handle: u32,
4342        code: ReturnCode,
4343    ) -> Result<()> {
4344        let write_handle = self.get_mut(id)?.write_handle.rep();
4345        self.set_event(
4346            write_handle,
4347            match ty {
4348                TransmitIndex::Future(ty) => Event::FutureWrite {
4349                    code,
4350                    pending: Some((ty, handle)),
4351                },
4352                TransmitIndex::Stream(ty) => Event::StreamWrite {
4353                    code,
4354                    pending: Some((ty, handle)),
4355                },
4356            },
4357        )
4358    }
4359
4360    fn send_read_result(
4361        &mut self,
4362        ty: TransmitIndex,
4363        id: TableId<TransmitState>,
4364        handle: u32,
4365        code: ReturnCode,
4366    ) -> Result<()> {
4367        let read_handle = self.get_mut(id)?.read_handle.rep();
4368        self.set_event(
4369            read_handle,
4370            match ty {
4371                TransmitIndex::Future(ty) => Event::FutureRead {
4372                    code,
4373                    pending: Some((ty, handle)),
4374                },
4375                TransmitIndex::Stream(ty) => Event::StreamRead {
4376                    code,
4377                    pending: Some((ty, handle)),
4378                },
4379            },
4380        )
4381    }
4382
4383    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4384        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4385    }
4386
4387    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4388        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4389    }
4390
4391    /// Set or update the event for the specified waitable.
4392    ///
4393    /// If there is already an event set for this waitable, we assert that it is
4394    /// of the same variant as the new one and reuse the `ReturnCode` count and
4395    /// the `pending` field if applicable.
4396    // TODO: This is a bit awkward due to how
4397    // `Event::{Stream,Future}{Write,Read}` and
4398    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4399    // Consider updating those representations in a way that allows this
4400    // function to be simplified.
4401    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4402        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4403
4404        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4405            let (ReturnCode::Completed(count)
4406            | ReturnCode::Dropped(count)
4407            | ReturnCode::Cancelled(count)) = old
4408            else {
4409                unreachable!()
4410            };
4411
4412            match new {
4413                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4414                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4415                _ => unreachable!(),
4416            }
4417        }
4418
4419        let event = match (waitable.take_event(self)?, event) {
4420            (None, _) => event,
4421            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4422            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4423            (
4424                Some(Event::StreamWrite {
4425                    code: old_code,
4426                    pending: old_pending,
4427                }),
4428                Event::StreamWrite { code, pending },
4429            ) => Event::StreamWrite {
4430                code: update_code(old_code, code),
4431                pending: old_pending.or(pending),
4432            },
4433            (
4434                Some(Event::StreamRead {
4435                    code: old_code,
4436                    pending: old_pending,
4437                }),
4438                Event::StreamRead { code, pending },
4439            ) => Event::StreamRead {
4440                code: update_code(old_code, code),
4441                pending: old_pending.or(pending),
4442            },
4443            _ => unreachable!(),
4444        };
4445
4446        waitable.set_event(self, Some(event))
4447    }
4448
4449    /// Allocate a new future or stream, including the `TransmitState` and the
4450    /// `TransmitHandle`s corresponding to the read and write ends.
4451    fn new_transmit(
4452        &mut self,
4453        origin: TransmitOrigin,
4454    ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4455        let state_id = self.push(TransmitState::new(origin))?;
4456
4457        let write = self.push(TransmitHandle::new(state_id))?;
4458        let read = self.push(TransmitHandle::new(state_id))?;
4459
4460        let state = self.get_mut(state_id)?;
4461        state.write_handle = write;
4462        state.read_handle = read;
4463
4464        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4465
4466        Ok((write, read))
4467    }
4468
4469    /// Delete the specified future or stream, including the read and write ends.
4470    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4471        let state = self.delete(state_id)?;
4472        self.delete(state.write_handle)?;
4473        self.delete(state.read_handle)?;
4474
4475        log::trace!(
4476            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4477            state.write_handle,
4478            state.read_handle,
4479        );
4480
4481        Ok(())
4482    }
4483}
4484
4485pub(crate) struct ResourcePair {
4486    pub(crate) write: u32,
4487    pub(crate) read: u32,
4488}
4489
4490impl Waitable {
4491    /// Handle the imminent delivery of the specified event, e.g. by updating
4492    /// the state of the stream or future.
4493    pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4494        match event {
4495            Event::FutureRead {
4496                pending: Some((ty, handle)),
4497                ..
4498            }
4499            | Event::FutureWrite {
4500                pending: Some((ty, handle)),
4501                ..
4502            } => {
4503                let instance = instance.id().get_mut(store);
4504                let runtime_instance = instance.component().types()[ty].instance;
4505                let (rep, state) = instance.instance_states().0[runtime_instance]
4506                    .handle_table()
4507                    .future_rep(ty, handle)
4508                    .unwrap();
4509                assert_eq!(rep, self.rep());
4510                assert_eq!(*state, TransmitLocalState::Busy);
4511                *state = match event {
4512                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4513                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4514                    _ => unreachable!(),
4515                };
4516            }
4517            Event::StreamRead {
4518                pending: Some((ty, handle)),
4519                code,
4520            }
4521            | Event::StreamWrite {
4522                pending: Some((ty, handle)),
4523                code,
4524            } => {
4525                let instance = instance.id().get_mut(store);
4526                let runtime_instance = instance.component().types()[ty].instance;
4527                let (rep, state) = instance.instance_states().0[runtime_instance]
4528                    .handle_table()
4529                    .stream_rep(ty, handle)
4530                    .unwrap();
4531                assert_eq!(rep, self.rep());
4532                assert_eq!(*state, TransmitLocalState::Busy);
4533                let done = matches!(code, ReturnCode::Dropped(_));
4534                *state = match event {
4535                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
4536                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4537                    _ => unreachable!(),
4538                };
4539
4540                let transmit_handle = TableId::<TransmitHandle>::new(rep);
4541                let state = store.concurrent_state_mut();
4542                let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4543                let transmit = state.get_mut(transmit_id).unwrap();
4544
4545                match event {
4546                    Event::StreamRead { .. } => {
4547                        transmit.read = ReadState::Open;
4548                    }
4549                    Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4550                    _ => unreachable!(),
4551                };
4552            }
4553            _ => {}
4554        }
4555    }
4556}
4557
4558/// Determine whether an intra-component read/write is allowed for the specified
4559/// `stream` or `future` payload type according to the component model
4560/// specification.
4561fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4562    matches!(
4563        ty,
4564        None | Some(
4565            InterfaceType::S8
4566                | InterfaceType::U8
4567                | InterfaceType::S16
4568                | InterfaceType::U16
4569                | InterfaceType::S32
4570                | InterfaceType::U32
4571                | InterfaceType::S64
4572                | InterfaceType::U64
4573                | InterfaceType::Float32
4574                | InterfaceType::Float64
4575        )
4576    )
4577}
4578
4579#[cfg(test)]
4580mod tests {
4581    use super::*;
4582    use crate::{Engine, Store};
4583    use core::future::pending;
4584    use core::pin::pin;
4585    use std::sync::LazyLock;
4586
4587    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4588
4589    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4590    where
4591        T: FutureProducer<()>,
4592    {
4593        rx.poll_produce(
4594            &mut Context::from_waker(Waker::noop()),
4595            Store::new(&ENGINE, ()).as_context_mut(),
4596            finish,
4597        )
4598    }
4599
4600    #[test]
4601    fn future_producer() {
4602        let mut fut = pin!(async { crate::error::Ok(()) });
4603        assert!(matches!(
4604            poll_future_producer(fut.as_mut(), false),
4605            Poll::Ready(Ok(Some(()))),
4606        ));
4607
4608        let mut fut = pin!(async { crate::error::Ok(()) });
4609        assert!(matches!(
4610            poll_future_producer(fut.as_mut(), true),
4611            Poll::Ready(Ok(Some(()))),
4612        ));
4613
4614        let mut fut = pin!(pending::<Result<()>>());
4615        assert!(matches!(
4616            poll_future_producer(fut.as_mut(), false),
4617            Poll::Pending,
4618        ));
4619        assert!(matches!(
4620            poll_future_producer(fut.as_mut(), true),
4621            Poll::Ready(Ok(None)),
4622        ));
4623
4624        let (tx, rx) = oneshot::channel();
4625        let mut rx = pin!(rx);
4626        assert!(matches!(
4627            poll_future_producer(rx.as_mut(), false),
4628            Poll::Pending,
4629        ));
4630        assert!(matches!(
4631            poll_future_producer(rx.as_mut(), true),
4632            Poll::Ready(Ok(None)),
4633        ));
4634        tx.send(()).unwrap();
4635        assert!(matches!(
4636            poll_future_producer(rx.as_mut(), true),
4637            Poll::Ready(Ok(Some(()))),
4638        ));
4639
4640        let (tx, rx) = oneshot::channel();
4641        let mut rx = pin!(rx);
4642        tx.send(()).unwrap();
4643        assert!(matches!(
4644            poll_future_producer(rx.as_mut(), false),
4645            Poll::Ready(Ok(Some(()))),
4646        ));
4647
4648        let (tx, rx) = oneshot::channel::<()>();
4649        let mut rx = pin!(rx);
4650        drop(tx);
4651        assert!(matches!(
4652            poll_future_producer(rx.as_mut(), false),
4653            Poll::Ready(Err(..)),
4654        ));
4655
4656        let (tx, rx) = oneshot::channel::<()>();
4657        let mut rx = pin!(rx);
4658        drop(tx);
4659        assert!(matches!(
4660            poll_future_producer(rx.as_mut(), true),
4661            Poll::Ready(Err(..)),
4662        ));
4663    }
4664}