Skip to main content

wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::types;
7use crate::component::values::ErrorContextAny;
8use crate::component::{
9    AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
10    Val, WasmList,
11};
12use crate::store::{StoreOpaque, StoreToken};
13use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
14use crate::vm::{AlwaysMut, VMStore};
15use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
16use crate::{
17    Error, Result, bail,
18    error::{Context as _, format_err},
19};
20use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
21use core::fmt;
22use core::future;
23use core::iter;
24use core::marker::PhantomData;
25use core::mem::{self, MaybeUninit};
26use core::pin::Pin;
27use core::task::{Context, Poll, Waker, ready};
28use futures::channel::oneshot;
29use futures::{FutureExt as _, stream};
30use std::any::{Any, TypeId};
31use std::boxed::Box;
32use std::io::Cursor;
33use std::string::String;
34use std::sync::{Arc, Mutex};
35use std::vec::Vec;
36use wasmtime_environ::component::{
37    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
38    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
39    TypeFutureTableIndex, TypeStreamTableIndex,
40};
41
42pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
43
44mod buffers;
45
46/// Enum for distinguishing between a stream or future in functions that handle
47/// both.
48#[derive(Copy, Clone, Debug)]
49pub enum TransmitKind {
50    Stream,
51    Future,
52}
53
54/// Represents `{stream,future}.{read,write}` results.
55#[derive(Copy, Clone, Debug, PartialEq)]
56pub enum ReturnCode {
57    Blocked,
58    Completed(u32),
59    Dropped(u32),
60    Cancelled(u32),
61}
62
63impl ReturnCode {
64    /// Pack `self` into a single 32-bit integer that may be returned to the
65    /// guest.
66    ///
67    /// This corresponds to `pack_copy_result` in the Component Model spec.
68    pub fn encode(&self) -> u32 {
69        const BLOCKED: u32 = 0xffff_ffff;
70        const COMPLETED: u32 = 0x0;
71        const DROPPED: u32 = 0x1;
72        const CANCELLED: u32 = 0x2;
73        match self {
74            ReturnCode::Blocked => BLOCKED,
75            ReturnCode::Completed(n) => {
76                debug_assert!(*n < (1 << 28));
77                (n << 4) | COMPLETED
78            }
79            ReturnCode::Dropped(n) => {
80                debug_assert!(*n < (1 << 28));
81                (n << 4) | DROPPED
82            }
83            ReturnCode::Cancelled(n) => {
84                debug_assert!(*n < (1 << 28));
85                (n << 4) | CANCELLED
86            }
87        }
88    }
89
90    /// Returns `Self::Completed` with the specified count (or zero if
91    /// `matches!(kind, TransmitKind::Future)`)
92    fn completed(kind: TransmitKind, count: u32) -> Self {
93        Self::Completed(if let TransmitKind::Future = kind {
94            0
95        } else {
96            count
97        })
98    }
99}
100
101/// Represents a stream or future type index.
102///
103/// This is useful as a parameter type for functions which operate on either a
104/// future or a stream.
105#[derive(Copy, Clone, Debug)]
106pub enum TransmitIndex {
107    Stream(TypeStreamTableIndex),
108    Future(TypeFutureTableIndex),
109}
110
111impl TransmitIndex {
112    pub fn kind(&self) -> TransmitKind {
113        match self {
114            TransmitIndex::Stream(_) => TransmitKind::Stream,
115            TransmitIndex::Future(_) => TransmitKind::Future,
116        }
117    }
118}
119
120/// Retrieve the payload type of the specified stream or future, or `None` if it
121/// has no payload type.
122fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
123    match ty {
124        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
125        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
126    }
127}
128
129/// Retrieve the host rep and state for the specified guest-visible waitable
130/// handle.
131fn get_mut_by_index_from(
132    handle_table: &mut HandleTable,
133    ty: TransmitIndex,
134    index: u32,
135) -> Result<(u32, &mut TransmitLocalState)> {
136    match ty {
137        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
138        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
139    }
140}
141
142fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
143    mut store: StoreContextMut<U>,
144    instance: Instance,
145    options: OptionsIndex,
146    ty: TransmitIndex,
147    address: usize,
148    count: usize,
149    buffer: &mut B,
150) -> Result<()> {
151    let count = buffer.remaining().len().min(count);
152
153    let lower = &mut if T::MAY_REQUIRE_REALLOC {
154        LowerContext::new
155    } else {
156        LowerContext::new_without_realloc
157    }(store.as_context_mut(), options, instance);
158
159    if address % usize::try_from(T::ALIGN32)? != 0 {
160        bail!("read pointer not aligned");
161    }
162    lower
163        .as_slice_mut()
164        .get_mut(address..)
165        .and_then(|b| b.get_mut(..T::SIZE32 * count))
166        .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
167
168    if let Some(ty) = payload(ty, lower.types) {
169        T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
170    }
171
172    buffer.skip(count);
173
174    Ok(())
175}
176
177fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
178    lift: &mut LiftContext<'_>,
179    ty: Option<InterfaceType>,
180    buffer: &mut B,
181    address: usize,
182    count: usize,
183) -> Result<()> {
184    let count = count.min(buffer.remaining_capacity());
185    if T::IS_RUST_UNIT_TYPE {
186        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
187        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
188        // is a valid way to populate the zero-sized buffer.
189        buffer.extend(
190            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
191        )
192    } else {
193        let ty = ty.unwrap();
194        if address % usize::try_from(T::ALIGN32)? != 0 {
195            bail!("write pointer not aligned");
196        }
197        lift.memory()
198            .get(address..)
199            .and_then(|b| b.get(..T::SIZE32 * count))
200            .ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
201
202        let list = &WasmList::new(address, count, lift, ty)?;
203        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
204    }
205    Ok(())
206}
207
208/// Represents the state associated with an error context
209#[derive(Debug, PartialEq, Eq, PartialOrd)]
210pub(super) struct ErrorContextState {
211    /// Debug message associated with the error context
212    pub(crate) debug_msg: String,
213}
214
215/// Represents the size and alignment for a "flat" Component Model type,
216/// i.e. one containing no pointers or handles.
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub(super) struct FlatAbi {
219    pub(super) size: u32,
220    pub(super) align: u32,
221}
222
223/// Represents the buffer for a host- or guest-initiated stream read.
224pub struct Destination<'a, T, B> {
225    id: TableId<TransmitState>,
226    buffer: &'a mut B,
227    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
228    _phantom: PhantomData<fn() -> T>,
229}
230
231impl<'a, T, B> Destination<'a, T, B> {
232    /// Reborrow `self` so it can be used again later.
233    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
234        Destination {
235            id: self.id,
236            buffer: &mut *self.buffer,
237            host_buffer: self.host_buffer.as_deref_mut(),
238            _phantom: PhantomData,
239        }
240    }
241
242    /// Take the buffer out of `self`, leaving a default-initialized one in its
243    /// place.
244    ///
245    /// This can be useful for reusing the previously-stored buffer's capacity
246    /// instead of allocating a fresh one.
247    pub fn take_buffer(&mut self) -> B
248    where
249        B: Default,
250    {
251        mem::take(self.buffer)
252    }
253
254    /// Store the specified buffer in `self`.
255    ///
256    /// Any items contained in the buffer will be delivered to the reader after
257    /// the `StreamProducer::poll_produce` call to which this `Destination` was
258    /// passed returns (unless overwritten by another call to `set_buffer`).
259    ///
260    /// If items are stored via this buffer _and_ written via a
261    /// `DirectDestination` view of `self`, then the items in the buffer will be
262    /// delivered after the ones written using `DirectDestination`.
263    pub fn set_buffer(&mut self, buffer: B) {
264        *self.buffer = buffer;
265    }
266
267    /// Return the remaining number of items the current read has capacity to
268    /// accept, if known.
269    ///
270    /// This will return `Some(_)` if the reader is a guest; it will return
271    /// `None` if the reader is the host.
272    ///
273    /// Note that this can return `Some(0)`. This means that the guest is
274    /// attempting to perform a zero-length read which typically means that it's
275    /// trying to wait for this stream to be ready-to-read but is not actually
276    /// ready to receive the items yet. The host in this case is allowed to
277    /// either block waiting for readiness or immediately complete the
278    /// operation. The guest is expected to handle both cases. Some more
279    /// discussion about this case can be found in the discussion of ["Stream
280    /// Readiness" in the component-model repo][docs].
281    ///
282    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
283    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
284        let transmit = store
285            .as_context_mut()
286            .0
287            .concurrent_state_mut()
288            .get_mut(self.id)
289            .unwrap();
290
291        if let &ReadState::GuestReady { count, .. } = &transmit.read {
292            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
293                unreachable!()
294            };
295
296            Some(count - guest_offset)
297        } else {
298            None
299        }
300    }
301}
302
303impl<'a, B> Destination<'a, u8, B> {
304    /// Return a `DirectDestination` view of `self`.
305    ///
306    /// If the reader is a guest, this will provide direct access to the guest's
307    /// read buffer.  If the reader is a host, this will provide access to a
308    /// buffer which will be delivered to the host before any items stored using
309    /// `Destination::set_buffer`.
310    ///
311    /// `capacity` will only be used if the reader is a host, in which case it
312    /// will update the length of the buffer, possibly zero-initializing the new
313    /// elements if the new length is larger than the old length.
314    pub fn as_direct<D>(
315        mut self,
316        store: StoreContextMut<'a, D>,
317        capacity: usize,
318    ) -> DirectDestination<'a, D> {
319        if let Some(buffer) = self.host_buffer.as_deref_mut() {
320            buffer.set_position(0);
321            if buffer.get_mut().is_empty() {
322                buffer.get_mut().resize(capacity, 0);
323            }
324        }
325
326        DirectDestination {
327            id: self.id,
328            host_buffer: self.host_buffer,
329            store,
330        }
331    }
332}
333
334/// Represents a read from a `stream<u8>`, providing direct access to the
335/// writer's buffer.
336pub struct DirectDestination<'a, D: 'static> {
337    id: TableId<TransmitState>,
338    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
339    store: StoreContextMut<'a, D>,
340}
341
342impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
343    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
344        let rem = self.remaining();
345        let n = rem.len().min(buf.len());
346        rem[..n].copy_from_slice(&buf[..n]);
347        self.mark_written(n);
348        Ok(n)
349    }
350
351    fn flush(&mut self) -> std::io::Result<()> {
352        Ok(())
353    }
354}
355
356impl<D: 'static> DirectDestination<'_, D> {
357    /// Provide direct access to the writer's buffer.
358    pub fn remaining(&mut self) -> &mut [u8] {
359        if let Some(buffer) = self.host_buffer.as_deref_mut() {
360            buffer.get_mut()
361        } else {
362            let transmit = self
363                .store
364                .as_context_mut()
365                .0
366                .concurrent_state_mut()
367                .get_mut(self.id)
368                .unwrap();
369
370            let &ReadState::GuestReady {
371                address,
372                count,
373                options,
374                instance,
375                ..
376            } = &transmit.read
377            else {
378                unreachable!();
379            };
380
381            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
382                unreachable!()
383            };
384
385            instance
386                .options_memory_mut(self.store.0, options)
387                .get_mut((address + guest_offset)..)
388                .and_then(|b| b.get_mut(..(count - guest_offset)))
389                .unwrap()
390        }
391    }
392
393    /// Mark the specified number of bytes as written to the writer's buffer.
394    ///
395    /// This will panic if the count is larger than the size of the
396    /// buffer returned by `Self::remaining`.
397    pub fn mark_written(&mut self, count: usize) {
398        if let Some(buffer) = self.host_buffer.as_deref_mut() {
399            buffer.set_position(
400                buffer
401                    .position()
402                    .checked_add(u64::try_from(count).unwrap())
403                    .unwrap(),
404            );
405        } else {
406            let transmit = self
407                .store
408                .as_context_mut()
409                .0
410                .concurrent_state_mut()
411                .get_mut(self.id)
412                .unwrap();
413
414            let ReadState::GuestReady {
415                count: read_count, ..
416            } = &transmit.read
417            else {
418                unreachable!();
419            };
420
421            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
422                unreachable!()
423            };
424
425            if *guest_offset + count > *read_count {
426                panic!(
427                    "write count ({count}) must be less than or equal to read count ({read_count})"
428                )
429            } else {
430                *guest_offset += count;
431            }
432        }
433    }
434}
435
436/// Represents the state of a `Stream{Producer,Consumer}`.
437#[derive(Copy, Clone, Debug)]
438pub enum StreamResult {
439    /// The operation completed normally, and the producer or consumer may be
440    /// able to produce or consume more items, respectively.
441    Completed,
442    /// The operation was interrupted (i.e. it wrapped up early after receiving
443    /// a `finish` parameter value of true in a call to `poll_produce` or
444    /// `poll_consume`), and the producer or consumer may be able to produce or
445    /// consume more items, respectively.
446    Cancelled,
447    /// The operation completed normally, but the producer or consumer will
448    /// _not_ able to produce or consume more items, respectively.
449    Dropped,
450}
451
452/// Represents the host-owned write end of a stream.
453pub trait StreamProducer<D>: Send + 'static {
454    /// The payload type of this stream.
455    type Item;
456
457    /// The `WriteBuffer` type to use when delivering items.
458    type Buffer: WriteBuffer<Self::Item> + Default;
459
460    /// Handle a host- or guest-initiated read by delivering zero or more items
461    /// to the specified destination.
462    ///
463    /// This will be called whenever the reader starts a read.
464    ///
465    /// # Arguments
466    ///
467    /// * `self` - a `Pin`'d version of self to perform Rust-level
468    ///   future-related operations on.
469    /// * `cx` - a Rust-related [`Context`] which is passed to other
470    ///   future-related operations or used to acquire a waker.
471    /// * `store` - the Wasmtime store that this operation is happening within.
472    ///   Used, for example, to consult the state `D` associated with the store.
473    /// * `destination` - the location that items are to be written to.
474    /// * `finish` - a flag indicating whether the host should strive to
475    ///   immediately complete/cancel any pending operation. See below for more
476    ///   details.
477    ///
478    /// # Behavior
479    ///
480    /// If the implementation is able to produce one or more items immediately,
481    /// it should write them to `destination` and return either
482    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
483    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
484    /// any more items.
485    ///
486    /// If the implementation is unable to produce any items immediately, but
487    /// expects to do so later, and `finish` is _false_, it should store the
488    /// waker from `cx` for later and return `Poll::Pending` without writing
489    /// anything to `destination`.  Later, it should alert the waker when either
490    /// the items arrive, the stream has ended, or an error occurs.
491    ///
492    /// If more items are written to `destination` than the reader has immediate
493    /// capacity to accept, they will be retained in memory by the caller and
494    /// used to satisfy future reads, in which case `poll_produce` will only be
495    /// called again once all those items have been delivered.
496    ///
497    /// # Zero-length reads
498    ///
499    /// This function may be called with a zero-length capacity buffer
500    /// (i.e. `Destination::remaining` returns `Some(0)`). This indicates that
501    /// the guest wants to wait to see if an item is ready without actually
502    /// reading the item. For example think of a UNIX `poll` function run on a
503    /// TCP stream, seeing if it's readable without actually reading it.
504    ///
505    /// In this situation the host is allowed to either return immediately or
506    /// wait for readiness. Note that waiting for readiness is not always
507    /// possible. For example it's impossible to test if a Rust-native `Future`
508    /// is ready without actually reading the item. Stream-specific
509    /// optimizations, such as testing if a TCP stream is readable, may be
510    /// possible however.
511    ///
512    /// For a zero-length read, the host is allowed to:
513    ///
514    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
515    ///   anything if it expects to be able to produce items immediately (i.e.
516    ///   without first returning `Poll::Pending`) the next time `poll_produce`
517    ///   is called with non-zero capacity. This is the best-case scenario of
518    ///   fulfilling the guest's desire -- items aren't read/buffered but the
519    ///   host is saying it's ready when the guest is.
520    ///
521    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without actually
522    ///   testing for readiness. The guest doesn't know this yet, but the guest
523    ///   will realize that zero-length reads won't work on this stream when a
524    ///   subsequent nonzero read attempt is made which returns `Poll::Pending`
525    ///   here.
526    ///
527    /// - Return `Poll::Pending` if the host has performed necessary async work
528    ///   to wait for this stream to be readable without actually reading
529    ///   anything. This is also a best-case scenario where the host is letting
530    ///   the guest know that nothing is ready yet. Later the zero-length read
531    ///   will complete and then the guest will attempt a nonzero-length read to
532    ///   actually read some bytes.
533    ///
534    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
535    ///   `Destination::set_buffer` with one more more items. Note, however,
536    ///   that this creates the hazard that the items will never be received by
537    ///   the guest if it decides not to do another non-zero-length read before
538    ///   closing the stream.  Moreover, if `Self::Item` is e.g. a
539    ///   `Resource<_>`, they may end up leaking in that scenario. It is not
540    ///   recommended to do this and it's better to return
541    ///   `StreamResult::Completed` without buffering anything instead.
542    ///
543    /// For more discussion on zero-length reads see the [documentation in the
544    /// component-model repo itself][docs].
545    ///
546    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
547    ///
548    /// # Return
549    ///
550    /// This function can return a number of possible cases from this function:
551    ///
552    /// * `Poll::Pending` - this operation cannot complete at this time. The
553    ///   Rust-level `Future::poll` contract applies here where a waker should
554    ///   be stored from the `cx` argument and be arranged to receive a
555    ///   notification when this implementation can make progress. For example
556    ///   if you call `Future::poll` on a sub-future, that's enough. If items
557    ///   were written to `destination` then a trap in the guest will be raised.
558    ///
559    ///   Note that implementations should strive to avoid this return value
560    ///   when `finish` is `true`. In such a situation the guest is attempting
561    ///   to, for example, cancel a previous operation. By returning
562    ///   `Poll::Pending` the guest will be blocked during the cancellation
563    ///   request. If `finish` is `true` then `StreamResult::Cancelled` is
564    ///   favored to indicate that no items were read. If a short read happened,
565    ///   however, it's ok to return `StreamResult::Completed` indicating some
566    ///   items were read.
567    ///
568    /// * `Poll::Ok(StreamResult::Completed)` - items, if applicable, were
569    ///   written to the `destination`.
570    ///
571    /// * `Poll::Ok(StreamResult::Cancelled)` - used when `finish` is `true` and
572    ///   the implementation was able to successfully cancel any async work that
573    ///   a previous read kicked off, if any. The host should not buffer values
574    ///   received after returning `Cancelled` because the guest will not be
575    ///   aware of these values and the guest could close the stream after
576    ///   cancelling a read. Hosts should only return `Cancelled` when there are
577    ///   no more async operations in flight for a previous read.
578    ///
579    ///   If items were written to `destination` then a trap in the guest will
580    ///   be raised. If `finish` is `false` then this return value will raise a
581    ///   trap in the guest.
582    ///
583    /// * `Poll::Ok(StreamResult::Dropped)` - end-of-stream marker, indicating
584    ///   that this producer should not be polled again. Note that items may
585    ///   still be written to `destination`.
586    ///
587    /// # Errors
588    ///
589    /// The implementation may alternatively choose to return `Err(_)` to
590    /// indicate an unrecoverable error. This will cause the guest (if any) to
591    /// trap and render the component instance (if any) unusable. The
592    /// implementation should report errors that _are_ recoverable by other
593    /// means (e.g. by writing to a `future`) and return
594    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
595    fn poll_produce<'a>(
596        self: Pin<&mut Self>,
597        cx: &mut Context<'_>,
598        store: StoreContextMut<'a, D>,
599        destination: Destination<'a, Self::Item, Self::Buffer>,
600        finish: bool,
601    ) -> Poll<Result<StreamResult>>;
602
603    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
604    /// be downcast to the specified type.
605    ///
606    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
607    /// to the specified type is guaranteed to succeed.
608    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
609        Err(me)
610    }
611}
612
613impl<T, D> StreamProducer<D> for iter::Empty<T>
614where
615    T: Send + Sync + 'static,
616{
617    type Item = T;
618    type Buffer = Option<Self::Item>;
619
620    fn poll_produce<'a>(
621        self: Pin<&mut Self>,
622        _: &mut Context<'_>,
623        _: StoreContextMut<'a, D>,
624        _: Destination<'a, Self::Item, Self::Buffer>,
625        _: bool,
626    ) -> Poll<Result<StreamResult>> {
627        Poll::Ready(Ok(StreamResult::Dropped))
628    }
629}
630
631impl<T, D> StreamProducer<D> for stream::Empty<T>
632where
633    T: Send + Sync + 'static,
634{
635    type Item = T;
636    type Buffer = Option<Self::Item>;
637
638    fn poll_produce<'a>(
639        self: Pin<&mut Self>,
640        _: &mut Context<'_>,
641        _: StoreContextMut<'a, D>,
642        _: Destination<'a, Self::Item, Self::Buffer>,
643        _: bool,
644    ) -> Poll<Result<StreamResult>> {
645        Poll::Ready(Ok(StreamResult::Dropped))
646    }
647}
648
649impl<T, D> StreamProducer<D> for Vec<T>
650where
651    T: Unpin + Send + Sync + 'static,
652{
653    type Item = T;
654    type Buffer = VecBuffer<T>;
655
656    fn poll_produce<'a>(
657        self: Pin<&mut Self>,
658        _: &mut Context<'_>,
659        _: StoreContextMut<'a, D>,
660        mut dst: Destination<'a, Self::Item, Self::Buffer>,
661        _: bool,
662    ) -> Poll<Result<StreamResult>> {
663        dst.set_buffer(mem::take(self.get_mut()).into());
664        Poll::Ready(Ok(StreamResult::Dropped))
665    }
666}
667
668impl<T, D> StreamProducer<D> for Box<[T]>
669where
670    T: Unpin + Send + Sync + 'static,
671{
672    type Item = T;
673    type Buffer = VecBuffer<T>;
674
675    fn poll_produce<'a>(
676        self: Pin<&mut Self>,
677        _: &mut Context<'_>,
678        _: StoreContextMut<'a, D>,
679        mut dst: Destination<'a, Self::Item, Self::Buffer>,
680        _: bool,
681    ) -> Poll<Result<StreamResult>> {
682        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
683        Poll::Ready(Ok(StreamResult::Dropped))
684    }
685}
686
687#[cfg(feature = "component-model-async-bytes")]
688impl<D> StreamProducer<D> for bytes::Bytes {
689    type Item = u8;
690    type Buffer = Cursor<Self>;
691
692    fn poll_produce<'a>(
693        mut self: Pin<&mut Self>,
694        _: &mut Context<'_>,
695        mut store: StoreContextMut<'a, D>,
696        mut dst: Destination<'a, Self::Item, Self::Buffer>,
697        _: bool,
698    ) -> Poll<Result<StreamResult>> {
699        let cap = dst.remaining(&mut store);
700        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
701            // on 0-length or host reads, buffer the bytes
702            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
703            return Poll::Ready(Ok(StreamResult::Dropped));
704        };
705        let cap = cap.into();
706        // data does not fit in destination, fill it and buffer the rest
707        dst.set_buffer(Cursor::new(self.split_off(cap)));
708        let mut dst = dst.as_direct(store, cap);
709        dst.remaining().copy_from_slice(&self);
710        dst.mark_written(cap);
711        Poll::Ready(Ok(StreamResult::Dropped))
712    }
713}
714
715#[cfg(feature = "component-model-async-bytes")]
716impl<D> StreamProducer<D> for bytes::BytesMut {
717    type Item = u8;
718    type Buffer = Cursor<Self>;
719
720    fn poll_produce<'a>(
721        mut self: Pin<&mut Self>,
722        _: &mut Context<'_>,
723        mut store: StoreContextMut<'a, D>,
724        mut dst: Destination<'a, Self::Item, Self::Buffer>,
725        _: bool,
726    ) -> Poll<Result<StreamResult>> {
727        let cap = dst.remaining(&mut store);
728        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
729            // on 0-length or host reads, buffer the bytes
730            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
731            return Poll::Ready(Ok(StreamResult::Dropped));
732        };
733        let cap = cap.into();
734        // data does not fit in destination, fill it and buffer the rest
735        dst.set_buffer(Cursor::new(self.split_off(cap)));
736        let mut dst = dst.as_direct(store, cap);
737        dst.remaining().copy_from_slice(&self);
738        dst.mark_written(cap);
739        Poll::Ready(Ok(StreamResult::Dropped))
740    }
741}
742
743/// Represents the buffer for a host- or guest-initiated stream write.
744pub struct Source<'a, T> {
745    id: TableId<TransmitState>,
746    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
747}
748
749impl<'a, T> Source<'a, T> {
750    /// Reborrow `self` so it can be used again later.
751    pub fn reborrow(&mut self) -> Source<'_, T> {
752        Source {
753            id: self.id,
754            host_buffer: self.host_buffer.as_deref_mut(),
755        }
756    }
757
758    /// Accept zero or more items from the writer.
759    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
760    where
761        T: func::Lift + 'static,
762        B: ReadBuffer<T>,
763    {
764        if let Some(input) = &mut self.host_buffer {
765            let count = input.remaining().len().min(buffer.remaining_capacity());
766            buffer.move_from(*input, count);
767        } else {
768            let store = store.as_context_mut();
769            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
770
771            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
772                unreachable!();
773            };
774
775            let &WriteState::GuestReady {
776                ty,
777                address,
778                count,
779                options,
780                instance,
781                ..
782            } = &transmit.write
783            else {
784                unreachable!()
785            };
786
787            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
788            let ty = payload(ty, cx.types);
789            let old_remaining = buffer.remaining_capacity();
790            lift::<T, B>(
791                cx,
792                ty,
793                buffer,
794                address + (T::SIZE32 * guest_offset),
795                count - guest_offset,
796            )?;
797
798            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
799
800            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
801                unreachable!();
802            };
803
804            *guest_offset += old_remaining - buffer.remaining_capacity();
805        }
806
807        Ok(())
808    }
809
810    /// Return the number of items remaining to be read from the current write
811    /// operation.
812    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
813    where
814        T: 'static,
815    {
816        let transmit = store
817            .as_context_mut()
818            .0
819            .concurrent_state_mut()
820            .get_mut(self.id)
821            .unwrap();
822
823        if let &WriteState::GuestReady { count, .. } = &transmit.write {
824            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
825                unreachable!()
826            };
827
828            count - guest_offset
829        } else if let Some(host_buffer) = &self.host_buffer {
830            host_buffer.remaining().len()
831        } else {
832            unreachable!()
833        }
834    }
835}
836
837impl<'a> Source<'a, u8> {
838    /// Return a `DirectSource` view of `self`.
839    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
840        DirectSource {
841            id: self.id,
842            host_buffer: self.host_buffer,
843            store,
844        }
845    }
846}
847
848/// Represents a write to a `stream<u8>`, providing direct access to the
849/// writer's buffer.
850pub struct DirectSource<'a, D: 'static> {
851    id: TableId<TransmitState>,
852    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
853    store: StoreContextMut<'a, D>,
854}
855
856impl<D: 'static> std::io::Read for DirectSource<'_, D> {
857    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
858        let rem = self.remaining();
859        let n = rem.len().min(buf.len());
860        buf[..n].copy_from_slice(&rem[..n]);
861        self.mark_read(n);
862        Ok(n)
863    }
864}
865
866impl<D: 'static> DirectSource<'_, D> {
867    /// Provide direct access to the writer's buffer.
868    pub fn remaining(&mut self) -> &[u8] {
869        if let Some(buffer) = self.host_buffer.as_deref_mut() {
870            buffer.remaining()
871        } else {
872            let transmit = self
873                .store
874                .as_context_mut()
875                .0
876                .concurrent_state_mut()
877                .get_mut(self.id)
878                .unwrap();
879
880            let &WriteState::GuestReady {
881                address,
882                count,
883                options,
884                instance,
885                ..
886            } = &transmit.write
887            else {
888                unreachable!()
889            };
890
891            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
892                unreachable!()
893            };
894
895            instance
896                .options_memory(self.store.0, options)
897                .get((address + guest_offset)..)
898                .and_then(|b| b.get(..(count - guest_offset)))
899                .unwrap()
900        }
901    }
902
903    /// Mark the specified number of bytes as read from the writer's buffer.
904    ///
905    /// This will panic if the count is larger than the size of the buffer
906    /// returned by `Self::remaining`.
907    pub fn mark_read(&mut self, count: usize) {
908        if let Some(buffer) = self.host_buffer.as_deref_mut() {
909            buffer.skip(count);
910        } else {
911            let transmit = self
912                .store
913                .as_context_mut()
914                .0
915                .concurrent_state_mut()
916                .get_mut(self.id)
917                .unwrap();
918
919            let WriteState::GuestReady {
920                count: write_count, ..
921            } = &transmit.write
922            else {
923                unreachable!()
924            };
925
926            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
927                unreachable!()
928            };
929
930            if *guest_offset + count > *write_count {
931                panic!(
932                    "read count ({count}) must be less than or equal to write count ({write_count})"
933                )
934            } else {
935                *guest_offset += count;
936            }
937        }
938    }
939}
940
941/// Represents the host-owned read end of a stream.
942pub trait StreamConsumer<D>: Send + 'static {
943    /// The payload type of this stream.
944    type Item;
945
946    /// Handle a host- or guest-initiated write by accepting zero or more items
947    /// from the specified source.
948    ///
949    /// This will be called whenever the writer starts a write.
950    ///
951    /// If the implementation is able to consume one or more items immediately,
952    /// it should take them from `source` and return either
953    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
954    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
955    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
956    /// indicate that the caller should delay sending a `COMPLETED` event to the
957    /// writer until a later call to this function returns `Poll::Ready(_)`.
958    /// For more about that, see the `Backpressure` section below.
959    ///
960    /// If the implementation cannot consume any items immediately and `finish`
961    /// is _false_, it should store the waker from `cx` for later and return
962    /// `Poll::Pending` without writing anything to `destination`.  Later, it
963    /// should alert the waker when either (1) the items arrive, (2) the stream
964    /// has ended, or (3) an error occurs.
965    ///
966    /// If the implementation cannot consume any items immediately and `finish`
967    /// is _true_, it should, if possible, return
968    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
969    /// anything from `source`.  However, that might not be possible if an
970    /// earlier call to `poll_consume` kicked off an asynchronous operation
971    /// which needs to be completed (and possibly interrupted) gracefully, in
972    /// which case the implementation may return `Poll::Pending` and later alert
973    /// the waker as described above.  In other words, when `finish` is true,
974    /// the implementation should prioritize returning a result to the reader
975    /// (even if no items can be consumed) rather than wait indefinitely for at
976    /// capacity to free up.
977    ///
978    /// In all of the above cases, the implementation may alternatively choose
979    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
980    /// the guest (if any) to trap and render the component instance (if any)
981    /// unusable.  The implementation should report errors that _are_
982    /// recoverable by other means (e.g. by writing to a `future`) and return
983    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
984    ///
985    /// Note that the implementation should only return
986    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
987    /// items from `source` if called with `finish` set to true.  If it does so
988    /// when `finish` is false, the caller will trap.  Additionally, it should
989    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
990    /// least one item from `source` if there is an item available; otherwise,
991    /// the caller will trap.  If `poll_consume` is called with no items in
992    /// `source`, it should only return `Poll::Ready(_)` once it is able to
993    /// accept at least one item during the next call to `poll_consume`.
994    ///
995    /// Note that any items which the implementation of this trait takes from
996    /// `source` become the responsibility of that implementation.  For that
997    /// reason, an implementation which forwards items to an upstream sink
998    /// should reserve capacity in that sink before taking items out of
999    /// `source`, if possible.  Alternatively, it might buffer items which can't
1000    /// be forwarded immediately and send them once capacity is freed up.
1001    ///
1002    /// ## Backpressure
1003    ///
1004    /// As mentioned above, an implementation might choose to return
1005    /// `Poll::Pending` after taking items from `source`, which tells the caller
1006    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
1007    /// a form of backpressure when the items are forwarded to an upstream sink
1008    /// asynchronously.  Note, however, that it's not possible to "put back"
1009    /// items into `source` once they've been taken out, so if the upstream sink
1010    /// is unable to accept all the items, that cannot be communicated to the
1011    /// writer at this level of abstraction.  Just as with application-specific,
1012    /// recoverable errors, information about which items could be forwarded and
1013    /// which could not must be communicated out-of-band, e.g. by writing to an
1014    /// application-specific `future`.
1015    ///
1016    /// Similarly, if the writer cancels the write after items have been taken
1017    /// from `source` but before the items have all been forwarded to an
1018    /// upstream sink, `poll_consume` will be called with `finish` set to true,
1019    /// and the implementation may either:
1020    ///
1021    /// - Interrupt the forwarding process gracefully.  This may be preferable
1022    /// if there is an out-of-band channel for communicating to the writer how
1023    /// many items were forwarded before being interrupted.
1024    ///
1025    /// - Allow the forwarding to complete without interrupting it.  This is
1026    /// usually preferable if there's no out-of-band channel for reporting back
1027    /// to the writer how many items were forwarded.
1028    fn poll_consume(
1029        self: Pin<&mut Self>,
1030        cx: &mut Context<'_>,
1031        store: StoreContextMut<D>,
1032        source: Source<'_, Self::Item>,
1033        finish: bool,
1034    ) -> Poll<Result<StreamResult>>;
1035}
1036
1037/// Represents a host-owned write end of a future.
1038pub trait FutureProducer<D>: Send + 'static {
1039    /// The payload type of this future.
1040    type Item;
1041
1042    /// Handle a host- or guest-initiated read by producing a value.
1043    ///
1044    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1045    /// simplified interface for futures.
1046    ///
1047    /// If `finish` is true, the implementation may return
1048    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
1049    /// could produce a value.  Otherwise, it must either return
1050    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
1051    fn poll_produce(
1052        self: Pin<&mut Self>,
1053        cx: &mut Context<'_>,
1054        store: StoreContextMut<D>,
1055        finish: bool,
1056    ) -> Poll<Result<Option<Self::Item>>>;
1057}
1058
1059impl<T, E, D, Fut> FutureProducer<D> for Fut
1060where
1061    E: Into<Error>,
1062    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1063{
1064    type Item = T;
1065
1066    fn poll_produce<'a>(
1067        self: Pin<&mut Self>,
1068        cx: &mut Context<'_>,
1069        _: StoreContextMut<'a, D>,
1070        finish: bool,
1071    ) -> Poll<Result<Option<T>>> {
1072        match self.poll(cx) {
1073            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1074            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1075            Poll::Pending if finish => Poll::Ready(Ok(None)),
1076            Poll::Pending => Poll::Pending,
1077        }
1078    }
1079}
1080
1081/// Represents a host-owned read end of a future.
1082pub trait FutureConsumer<D>: Send + 'static {
1083    /// The payload type of this future.
1084    type Item;
1085
1086    /// Handle a host- or guest-initiated write by consuming a value.
1087    ///
1088    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1089    /// simplified interface for futures.
1090    ///
1091    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
1092    /// without taking the item from `source`, which indicates the operation was
1093    /// canceled before it could consume the value.  Otherwise, it must either
1094    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
1095    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
1096    /// the item).
1097    fn poll_consume(
1098        self: Pin<&mut Self>,
1099        cx: &mut Context<'_>,
1100        store: StoreContextMut<D>,
1101        source: Source<'_, Self::Item>,
1102        finish: bool,
1103    ) -> Poll<Result<()>>;
1104}
1105
1106/// Represents the readable end of a Component Model `future`.
1107///
1108/// Note that `FutureReader` instances must be disposed of using either `pipe`
1109/// or `close`; otherwise the in-store representation will leak and the writer
1110/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1111/// ensure that disposal happens automatically.
1112pub struct FutureReader<T> {
1113    id: TableId<TransmitHandle>,
1114    _phantom: PhantomData<T>,
1115}
1116
1117impl<T> FutureReader<T> {
1118    /// Create a new future with the specified producer.
1119    ///
1120    /// # Panics
1121    ///
1122    /// Panics if [`Config::concurrency_support`] is not enabled.
1123    ///
1124    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1125    pub fn new<S: AsContextMut>(
1126        mut store: S,
1127        producer: impl FutureProducer<S::Data, Item = T>,
1128    ) -> Self
1129    where
1130        T: func::Lower + func::Lift + Send + Sync + 'static,
1131    {
1132        assert!(store.as_context().0.concurrency_support());
1133
1134        struct Producer<P>(P);
1135
1136        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1137            for Producer<P>
1138        {
1139            type Item = P::Item;
1140            type Buffer = Option<P::Item>;
1141
1142            fn poll_produce<'a>(
1143                self: Pin<&mut Self>,
1144                cx: &mut Context<'_>,
1145                store: StoreContextMut<D>,
1146                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1147                finish: bool,
1148            ) -> Poll<Result<StreamResult>> {
1149                // SAFETY: This is a standard pin-projection, and we never move
1150                // out of `self`.
1151                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1152
1153                Poll::Ready(Ok(
1154                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1155                        destination.set_buffer(Some(value));
1156
1157                        // Here we return `StreamResult::Completed` even though
1158                        // we've produced the last item we'll ever produce.
1159                        // That's because the ABI expects
1160                        // `ReturnCode::Completed(1)` rather than
1161                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1162                        // called again since the future will have resolved.
1163                        StreamResult::Completed
1164                    } else {
1165                        StreamResult::Cancelled
1166                    },
1167                ))
1168            }
1169        }
1170
1171        Self::new_(
1172            store
1173                .as_context_mut()
1174                .new_transmit(TransmitKind::Future, Producer(producer)),
1175        )
1176    }
1177
1178    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1179        Self {
1180            id,
1181            _phantom: PhantomData,
1182        }
1183    }
1184
1185    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1186        self.id
1187    }
1188
1189    /// Set the consumer that accepts the result of this future.
1190    pub fn pipe<S: AsContextMut>(
1191        self,
1192        mut store: S,
1193        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1194    ) where
1195        T: func::Lift + 'static,
1196    {
1197        struct Consumer<C>(C);
1198
1199        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1200            for Consumer<C>
1201        {
1202            type Item = T;
1203
1204            fn poll_consume(
1205                self: Pin<&mut Self>,
1206                cx: &mut Context<'_>,
1207                mut store: StoreContextMut<D>,
1208                mut source: Source<Self::Item>,
1209                finish: bool,
1210            ) -> Poll<Result<StreamResult>> {
1211                // SAFETY: This is a standard pin-projection, and we never move
1212                // out of `self`.
1213                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1214
1215                ready!(consumer.poll_consume(
1216                    cx,
1217                    store.as_context_mut(),
1218                    source.reborrow(),
1219                    finish
1220                ))?;
1221
1222                Poll::Ready(Ok(if source.remaining(store) == 0 {
1223                    // Here we return `StreamResult::Completed` even though
1224                    // we've consumed the last item we'll ever consume.  That's
1225                    // because the ABI expects `ReturnCode::Completed(1)` rather
1226                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1227                    // called again since the future will have resolved.
1228                    StreamResult::Completed
1229                } else {
1230                    StreamResult::Cancelled
1231                }))
1232            }
1233        }
1234
1235        store
1236            .as_context_mut()
1237            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1238    }
1239
1240    /// Transfer ownership of the read end of a future from a guest to the host.
1241    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1242        let id = lift_index_to_future(cx, ty, index)?;
1243        Ok(Self::new_(id))
1244    }
1245
1246    /// Close this `FutureReader`.
1247    ///
1248    /// This will close this half of the future which will signal to a pending
1249    /// write, if any, that the reader side is dropped. If the writer half has
1250    /// not yet written a value then when it attempts to write a value it will
1251    /// see that this end is closed.
1252    ///
1253    /// # Panics
1254    ///
1255    /// Panics if the store that the [`Accessor`] is derived from does not own
1256    /// this future. Usage of this future after calling `close` will also cause
1257    /// a panic.
1258    ///
1259    /// [`Accessor`]: crate::component::Accessor
1260    pub fn close(&mut self, mut store: impl AsContextMut) {
1261        future_close(store.as_context_mut().0, &mut self.id)
1262    }
1263
1264    /// Convenience method around [`Self::close`].
1265    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1266        accessor.as_accessor().with(|access| self.close(access))
1267    }
1268
1269    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1270    /// drop and clean it up from the store.
1271    ///
1272    /// Note that the `accessor` provided must own this future and is
1273    /// additionally transferred to the `GuardedFutureReader` return value.
1274    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1275    where
1276        A: AsAccessor,
1277    {
1278        GuardedFutureReader::new(accessor, self)
1279    }
1280
1281    /// Attempts to convert this [`FutureReader<T>`] to a [`FutureAny`].
1282    ///
1283    /// # Errors
1284    ///
1285    /// This function will return an error if `self` does not belong to
1286    /// `store`.
1287    pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
1288    where
1289        T: ComponentType + 'static,
1290    {
1291        FutureAny::try_from_future_reader(store, self)
1292    }
1293
1294    /// Attempts to convert a [`FutureAny`] into a [`FutureReader<T>`].
1295    ///
1296    /// # Errors
1297    ///
1298    /// This function will fail if `T` doesn't match the type of the value that
1299    /// `future` is sending.
1300    pub fn try_from_future_any(future: FutureAny) -> Result<Self>
1301    where
1302        T: ComponentType + 'static,
1303    {
1304        future.try_into_future_reader()
1305    }
1306}
1307
1308impl<T> fmt::Debug for FutureReader<T> {
1309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1310        f.debug_struct("FutureReader")
1311            .field("id", &self.id)
1312            .finish()
1313    }
1314}
1315
1316pub(super) fn future_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1317    let id = mem::replace(id, TableId::new(u32::MAX));
1318    store.host_drop_reader(id, TransmitKind::Future).unwrap();
1319}
1320
1321/// Transfer ownership of the read end of a future from the host to a guest.
1322pub(super) fn lift_index_to_future(
1323    cx: &mut LiftContext<'_>,
1324    ty: InterfaceType,
1325    index: u32,
1326) -> Result<TableId<TransmitHandle>> {
1327    match ty {
1328        InterfaceType::Future(src) => {
1329            let handle_table = cx
1330                .instance_mut()
1331                .table_for_transmit(TransmitIndex::Future(src));
1332            let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1333            if is_done {
1334                bail!("cannot lift future after being notified that the writable end dropped");
1335            }
1336            let id = TableId::<TransmitHandle>::new(rep);
1337            let concurrent_state = cx.concurrent_state_mut();
1338            let future = concurrent_state.get_mut(id)?;
1339            future.common.handle = None;
1340            let state = future.state;
1341
1342            if concurrent_state.get_mut(state)?.done {
1343                bail!("cannot lift future after previous read succeeded");
1344            }
1345
1346            Ok(id)
1347        }
1348        _ => func::bad_type_info(),
1349    }
1350}
1351
1352/// Transfer ownership of the read end of a future from the host to a guest.
1353pub(super) fn lower_future_to_index<U>(
1354    id: TableId<TransmitHandle>,
1355    cx: &mut LowerContext<'_, U>,
1356    ty: InterfaceType,
1357) -> Result<u32> {
1358    match ty {
1359        InterfaceType::Future(dst) => {
1360            let concurrent_state = cx.store.0.concurrent_state_mut();
1361            let state = concurrent_state.get_mut(id)?.state;
1362            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1363
1364            let handle = cx
1365                .instance_mut()
1366                .table_for_transmit(TransmitIndex::Future(dst))
1367                .future_insert_read(dst, rep)?;
1368
1369            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1370
1371            Ok(handle)
1372        }
1373        _ => func::bad_type_info(),
1374    }
1375}
1376
1377// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1378// safe and correct since we lift and lower future handles as `u32`s.
1379unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
1380    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1381
1382    type Lower = <u32 as func::ComponentType>::Lower;
1383
1384    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1385        match ty {
1386            InterfaceType::Future(ty) => {
1387                let ty = types.types[*ty].ty;
1388                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1389            }
1390            other => bail!("expected `future`, found `{}`", func::desc(other)),
1391        }
1392    }
1393}
1394
1395// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1396unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
1397    fn linear_lower_to_flat<U>(
1398        &self,
1399        cx: &mut LowerContext<'_, U>,
1400        ty: InterfaceType,
1401        dst: &mut MaybeUninit<Self::Lower>,
1402    ) -> Result<()> {
1403        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1404    }
1405
1406    fn linear_lower_to_memory<U>(
1407        &self,
1408        cx: &mut LowerContext<'_, U>,
1409        ty: InterfaceType,
1410        offset: usize,
1411    ) -> Result<()> {
1412        lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1413            cx,
1414            InterfaceType::U32,
1415            offset,
1416        )
1417    }
1418}
1419
1420// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1421unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
1422    fn linear_lift_from_flat(
1423        cx: &mut LiftContext<'_>,
1424        ty: InterfaceType,
1425        src: &Self::Lower,
1426    ) -> Result<Self> {
1427        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1428        Self::lift_from_index(cx, ty, index)
1429    }
1430
1431    fn linear_lift_from_memory(
1432        cx: &mut LiftContext<'_>,
1433        ty: InterfaceType,
1434        bytes: &[u8],
1435    ) -> Result<Self> {
1436        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1437        Self::lift_from_index(cx, ty, index)
1438    }
1439}
1440
1441/// A [`FutureReader`] paired with an [`Accessor`].
1442///
1443/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1444/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1445/// [`FutureReader::guard`].
1446///
1447/// [`Accessor`]: crate::component::Accessor
1448pub struct GuardedFutureReader<T, A>
1449where
1450    A: AsAccessor,
1451{
1452    // This field is `None` to implement the conversion from this guard back to
1453    // `FutureReader`. When `None` is seen in the destructor it will cause the
1454    // destructor to do nothing.
1455    reader: Option<FutureReader<T>>,
1456    accessor: A,
1457}
1458
1459impl<T, A> GuardedFutureReader<T, A>
1460where
1461    A: AsAccessor,
1462{
1463    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1464    ///
1465    /// # Panics
1466    ///
1467    /// Panics if [`Config::concurrency_support`] is not enabled.
1468    ///
1469    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1470    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1471        assert!(
1472            accessor
1473                .as_accessor()
1474                .with(|a| a.as_context().0.concurrency_support())
1475        );
1476        Self {
1477            reader: Some(reader),
1478            accessor,
1479        }
1480    }
1481
1482    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1483    /// back.
1484    pub fn into_future(self) -> FutureReader<T> {
1485        self.into()
1486    }
1487}
1488
1489impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1490where
1491    A: AsAccessor,
1492{
1493    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1494        guard.reader.take().unwrap()
1495    }
1496}
1497
1498impl<T, A> Drop for GuardedFutureReader<T, A>
1499where
1500    A: AsAccessor,
1501{
1502    fn drop(&mut self) {
1503        if let Some(reader) = &mut self.reader {
1504            reader.close_with(&self.accessor)
1505        }
1506    }
1507}
1508
1509/// Represents the readable end of a Component Model `stream`.
1510///
1511/// Note that `StreamReader` instances must be disposed of using `close`;
1512/// otherwise the in-store representation will leak and the writer end will hang
1513/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1514/// disposal happens automatically.
1515pub struct StreamReader<T> {
1516    id: TableId<TransmitHandle>,
1517    _phantom: PhantomData<T>,
1518}
1519
1520impl<T> StreamReader<T> {
1521    /// Create a new stream with the specified producer.
1522    ///
1523    /// # Panics
1524    ///
1525    /// Panics if [`Config::concurrency_support`] is not enabled.
1526    ///
1527    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1528    pub fn new<S: AsContextMut>(
1529        mut store: S,
1530        producer: impl StreamProducer<S::Data, Item = T>,
1531    ) -> Self
1532    where
1533        T: func::Lower + func::Lift + Send + Sync + 'static,
1534    {
1535        assert!(store.as_context().0.concurrency_support());
1536        Self::new_(
1537            store
1538                .as_context_mut()
1539                .new_transmit(TransmitKind::Stream, producer),
1540        )
1541    }
1542
1543    pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
1544        Self {
1545            id,
1546            _phantom: PhantomData,
1547        }
1548    }
1549
1550    pub(super) fn id(&self) -> TableId<TransmitHandle> {
1551        self.id
1552    }
1553
1554    /// Attempt to consume this object by converting it into the specified type.
1555    ///
1556    /// This can be useful for "short-circuiting" host-to-host streams,
1557    /// bypassing the guest entirely.  For example, if a guest task returns a
1558    /// host-created stream and then exits, this function may be used to
1559    /// retrieve the write end, after which the guest instance and store may be
1560    /// disposed of if no longer needed.
1561    ///
1562    /// This will return `Ok(_)` if and only if the following conditions are
1563    /// met:
1564    ///
1565    /// - The stream was created by the host (i.e. not by the guest).
1566    ///
1567    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1568    /// producer provided to `StreamReader::new` when the stream was created,
1569    /// along with `TypeId::of::<V>()`.
1570    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1571        let store = store.as_context_mut();
1572        let state = store.0.concurrent_state_mut();
1573        let id = state.get_mut(self.id).unwrap().state;
1574        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1575            match try_into(TypeId::of::<V>()) {
1576                Some(result) => {
1577                    self.close(store);
1578                    Ok(*result.downcast::<V>().unwrap())
1579                }
1580                None => Err(self),
1581            }
1582        } else {
1583            Err(self)
1584        }
1585    }
1586
1587    /// Set the consumer that accepts the items delivered to this stream.
1588    pub fn pipe<S: AsContextMut>(
1589        self,
1590        mut store: S,
1591        consumer: impl StreamConsumer<S::Data, Item = T>,
1592    ) where
1593        T: 'static,
1594    {
1595        store
1596            .as_context_mut()
1597            .set_consumer(self.id, TransmitKind::Stream, consumer);
1598    }
1599
1600    /// Transfer ownership of the read end of a stream from a guest to the host.
1601    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1602        let id = lift_index_to_stream(cx, ty, index)?;
1603        Ok(Self::new_(id))
1604    }
1605
1606    /// Close this `StreamReader`.
1607    ///
1608    /// This will signal that this portion of the stream is closed causing all
1609    /// future writes to return immediately with "DROPPED".
1610    ///
1611    /// # Panics
1612    ///
1613    /// Panics if the `store` does not own this future. Usage of this future
1614    /// after calling `close` will also cause a panic.
1615    pub fn close(&mut self, mut store: impl AsContextMut) {
1616        stream_close(store.as_context_mut().0, &mut self.id);
1617    }
1618
1619    /// Convenience method around [`Self::close`].
1620    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1621        accessor.as_accessor().with(|access| self.close(access))
1622    }
1623
1624    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1625    /// drop and clean it up from the store.
1626    ///
1627    /// Note that the `accessor` provided must own this future and is
1628    /// additionally transferred to the `GuardedStreamReader` return value.
1629    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1630    where
1631        A: AsAccessor,
1632    {
1633        GuardedStreamReader::new(accessor, self)
1634    }
1635
1636    /// Attempts to convert this [`StreamReader<T>`] to a [`StreamAny`].
1637    ///
1638    /// # Errors
1639    ///
1640    /// This function will return an error if `self` does not belong to
1641    /// `store`.
1642    pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
1643    where
1644        T: ComponentType + 'static,
1645    {
1646        StreamAny::try_from_stream_reader(store, self)
1647    }
1648
1649    /// Attempts to convert a [`StreamAny`] into a [`StreamReader<T>`].
1650    ///
1651    /// # Errors
1652    ///
1653    /// This function will fail if `T` doesn't match the type of the value that
1654    /// `stream` is sending.
1655    pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
1656    where
1657        T: ComponentType + 'static,
1658    {
1659        stream.try_into_stream_reader()
1660    }
1661}
1662
1663impl<T> fmt::Debug for StreamReader<T> {
1664    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1665        f.debug_struct("StreamReader")
1666            .field("id", &self.id)
1667            .finish()
1668    }
1669}
1670
1671pub(super) fn stream_close(store: &mut StoreOpaque, id: &mut TableId<TransmitHandle>) {
1672    let id = mem::replace(id, TableId::new(u32::MAX));
1673    store.host_drop_reader(id, TransmitKind::Stream).unwrap();
1674}
1675
1676/// Transfer ownership of the read end of a stream from a guest to the host.
1677pub(super) fn lift_index_to_stream(
1678    cx: &mut LiftContext<'_>,
1679    ty: InterfaceType,
1680    index: u32,
1681) -> Result<TableId<TransmitHandle>> {
1682    match ty {
1683        InterfaceType::Stream(src) => {
1684            let handle_table = cx
1685                .instance_mut()
1686                .table_for_transmit(TransmitIndex::Stream(src));
1687            let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1688            if is_done {
1689                bail!("cannot lift stream after being notified that the writable end dropped");
1690            }
1691            let id = TableId::<TransmitHandle>::new(rep);
1692            cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1693            Ok(id)
1694        }
1695        _ => func::bad_type_info(),
1696    }
1697}
1698
1699/// Transfer ownership of the read end of a stream from the host to a guest.
1700pub(super) fn lower_stream_to_index<U>(
1701    id: TableId<TransmitHandle>,
1702    cx: &mut LowerContext<'_, U>,
1703    ty: InterfaceType,
1704) -> Result<u32> {
1705    match ty {
1706        InterfaceType::Stream(dst) => {
1707            let concurrent_state = cx.store.0.concurrent_state_mut();
1708            let state = concurrent_state.get_mut(id)?.state;
1709            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1710
1711            let handle = cx
1712                .instance_mut()
1713                .table_for_transmit(TransmitIndex::Stream(dst))
1714                .stream_insert_read(dst, rep)?;
1715
1716            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1717
1718            Ok(handle)
1719        }
1720        _ => func::bad_type_info(),
1721    }
1722}
1723
1724// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1725// safe and correct since we lift and lower stream handles as `u32`s.
1726unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
1727    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1728
1729    type Lower = <u32 as func::ComponentType>::Lower;
1730
1731    fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
1732        match ty {
1733            InterfaceType::Stream(ty) => {
1734                let ty = types.types[*ty].ty;
1735                types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
1736            }
1737            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1738        }
1739    }
1740}
1741
1742// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1743unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
1744    fn linear_lower_to_flat<U>(
1745        &self,
1746        cx: &mut LowerContext<'_, U>,
1747        ty: InterfaceType,
1748        dst: &mut MaybeUninit<Self::Lower>,
1749    ) -> Result<()> {
1750        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
1751    }
1752
1753    fn linear_lower_to_memory<U>(
1754        &self,
1755        cx: &mut LowerContext<'_, U>,
1756        ty: InterfaceType,
1757        offset: usize,
1758    ) -> Result<()> {
1759        lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
1760            cx,
1761            InterfaceType::U32,
1762            offset,
1763        )
1764    }
1765}
1766
1767// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1768unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
1769    fn linear_lift_from_flat(
1770        cx: &mut LiftContext<'_>,
1771        ty: InterfaceType,
1772        src: &Self::Lower,
1773    ) -> Result<Self> {
1774        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1775        Self::lift_from_index(cx, ty, index)
1776    }
1777
1778    fn linear_lift_from_memory(
1779        cx: &mut LiftContext<'_>,
1780        ty: InterfaceType,
1781        bytes: &[u8],
1782    ) -> Result<Self> {
1783        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1784        Self::lift_from_index(cx, ty, index)
1785    }
1786}
1787
1788/// A [`StreamReader`] paired with an [`Accessor`].
1789///
1790/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1791/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1792/// [`StreamReader::guard`].
1793///
1794/// [`Accessor`]: crate::component::Accessor
1795pub struct GuardedStreamReader<T, A>
1796where
1797    A: AsAccessor,
1798{
1799    // This field is `None` to implement the conversion from this guard back to
1800    // `StreamReader`. When `None` is seen in the destructor it will cause the
1801    // destructor to do nothing.
1802    reader: Option<StreamReader<T>>,
1803    accessor: A,
1804}
1805
1806impl<T, A> GuardedStreamReader<T, A>
1807where
1808    A: AsAccessor,
1809{
1810    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1811    /// `reader`.
1812    ///
1813    /// # Panics
1814    ///
1815    /// Panics if [`Config::concurrency_support`] is not enabled.
1816    ///
1817    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1818    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1819        assert!(
1820            accessor
1821                .as_accessor()
1822                .with(|a| a.as_context().0.concurrency_support())
1823        );
1824        Self {
1825            reader: Some(reader),
1826            accessor,
1827        }
1828    }
1829
1830    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1831    /// back.
1832    pub fn into_stream(self) -> StreamReader<T> {
1833        self.into()
1834    }
1835}
1836
1837impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1838where
1839    A: AsAccessor,
1840{
1841    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1842        guard.reader.take().unwrap()
1843    }
1844}
1845
1846impl<T, A> Drop for GuardedStreamReader<T, A>
1847where
1848    A: AsAccessor,
1849{
1850    fn drop(&mut self) {
1851        if let Some(reader) = &mut self.reader {
1852            reader.close_with(&self.accessor)
1853        }
1854    }
1855}
1856
1857/// Represents a Component Model `error-context`.
1858pub struct ErrorContext {
1859    rep: u32,
1860}
1861
1862impl ErrorContext {
1863    pub(crate) fn new(rep: u32) -> Self {
1864        Self { rep }
1865    }
1866
1867    /// Convert this `ErrorContext` into a [`Val`].
1868    pub fn into_val(self) -> Val {
1869        Val::ErrorContext(ErrorContextAny(self.rep))
1870    }
1871
1872    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1873    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1874        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1875            bail!("expected `error-context`; got `{}`", value.desc());
1876        };
1877        Ok(Self::new(*rep))
1878    }
1879
1880    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1881        match ty {
1882            InterfaceType::ErrorContext(src) => {
1883                let rep = cx
1884                    .instance_mut()
1885                    .table_for_error_context(src)
1886                    .error_context_rep(index)?;
1887
1888                Ok(Self { rep })
1889            }
1890            _ => func::bad_type_info(),
1891        }
1892    }
1893}
1894
1895pub(crate) fn lower_error_context_to_index<U>(
1896    rep: u32,
1897    cx: &mut LowerContext<'_, U>,
1898    ty: InterfaceType,
1899) -> Result<u32> {
1900    match ty {
1901        InterfaceType::ErrorContext(dst) => {
1902            let tbl = cx.instance_mut().table_for_error_context(dst);
1903            tbl.error_context_insert(rep)
1904        }
1905        _ => func::bad_type_info(),
1906    }
1907}
1908// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1909// safe and correct since we lift and lower future handles as `u32`s.
1910unsafe impl func::ComponentType for ErrorContext {
1911    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1912
1913    type Lower = <u32 as func::ComponentType>::Lower;
1914
1915    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1916        match ty {
1917            InterfaceType::ErrorContext(_) => Ok(()),
1918            other => bail!("expected `error`, found `{}`", func::desc(other)),
1919        }
1920    }
1921}
1922
1923// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1924unsafe impl func::Lower for ErrorContext {
1925    fn linear_lower_to_flat<T>(
1926        &self,
1927        cx: &mut LowerContext<'_, T>,
1928        ty: InterfaceType,
1929        dst: &mut MaybeUninit<Self::Lower>,
1930    ) -> Result<()> {
1931        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1932            cx,
1933            InterfaceType::U32,
1934            dst,
1935        )
1936    }
1937
1938    fn linear_lower_to_memory<T>(
1939        &self,
1940        cx: &mut LowerContext<'_, T>,
1941        ty: InterfaceType,
1942        offset: usize,
1943    ) -> Result<()> {
1944        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1945            cx,
1946            InterfaceType::U32,
1947            offset,
1948        )
1949    }
1950}
1951
1952// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1953unsafe impl func::Lift for ErrorContext {
1954    fn linear_lift_from_flat(
1955        cx: &mut LiftContext<'_>,
1956        ty: InterfaceType,
1957        src: &Self::Lower,
1958    ) -> Result<Self> {
1959        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1960        Self::lift_from_index(cx, ty, index)
1961    }
1962
1963    fn linear_lift_from_memory(
1964        cx: &mut LiftContext<'_>,
1965        ty: InterfaceType,
1966        bytes: &[u8],
1967    ) -> Result<Self> {
1968        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1969        Self::lift_from_index(cx, ty, index)
1970    }
1971}
1972
1973/// Represents the read or write end of a stream or future.
1974pub(super) struct TransmitHandle {
1975    pub(super) common: WaitableCommon,
1976    /// See `TransmitState`
1977    state: TableId<TransmitState>,
1978}
1979
1980impl TransmitHandle {
1981    fn new(state: TableId<TransmitState>) -> Self {
1982        Self {
1983            common: WaitableCommon::default(),
1984            state,
1985        }
1986    }
1987}
1988
1989impl TableDebug for TransmitHandle {
1990    fn type_name() -> &'static str {
1991        "TransmitHandle"
1992    }
1993}
1994
1995/// Represents the state of a stream or future.
1996struct TransmitState {
1997    /// The write end of the stream or future.
1998    write_handle: TableId<TransmitHandle>,
1999    /// The read end of the stream or future.
2000    read_handle: TableId<TransmitHandle>,
2001    /// See `WriteState`
2002    write: WriteState,
2003    /// See `ReadState`
2004    read: ReadState,
2005    /// Whether futher values may be transmitted via this stream or future.
2006    done: bool,
2007    /// The original creator of this stream, used for type-checking with
2008    /// `{Future,Stream}Any`.
2009    pub(super) origin: TransmitOrigin,
2010}
2011
2012#[derive(Copy, Clone)]
2013pub(super) enum TransmitOrigin {
2014    Host,
2015    GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
2016    GuestStream(ComponentInstanceId, TypeStreamTableIndex),
2017}
2018
2019impl TransmitState {
2020    fn new(origin: TransmitOrigin) -> Self {
2021        Self {
2022            write_handle: TableId::new(u32::MAX),
2023            read_handle: TableId::new(u32::MAX),
2024            read: ReadState::Open,
2025            write: WriteState::Open,
2026            done: false,
2027            origin,
2028        }
2029    }
2030}
2031
2032impl TableDebug for TransmitState {
2033    fn type_name() -> &'static str {
2034        "TransmitState"
2035    }
2036}
2037
2038impl TransmitOrigin {
2039    fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
2040        match index {
2041            TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
2042            TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
2043        }
2044    }
2045}
2046
2047type PollStream = Box<
2048    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
2049>;
2050
2051type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
2052
2053/// Represents the state of the write end of a stream or future.
2054enum WriteState {
2055    /// The write end is open, but no write is pending.
2056    Open,
2057    /// The write end is owned by a guest task and a write is pending.
2058    GuestReady {
2059        instance: Instance,
2060        caller: RuntimeComponentInstanceIndex,
2061        ty: TransmitIndex,
2062        flat_abi: Option<FlatAbi>,
2063        options: OptionsIndex,
2064        address: usize,
2065        count: usize,
2066        handle: u32,
2067    },
2068    /// The write end is owned by the host, which is ready to produce items.
2069    HostReady {
2070        produce: PollStream,
2071        try_into: TryInto,
2072        guest_offset: usize,
2073        cancel: bool,
2074        cancel_waker: Option<Waker>,
2075    },
2076    /// The write end has been dropped.
2077    Dropped,
2078}
2079
2080impl fmt::Debug for WriteState {
2081    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2082        match self {
2083            Self::Open => f.debug_tuple("Open").finish(),
2084            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2085            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2086            Self::Dropped => f.debug_tuple("Dropped").finish(),
2087        }
2088    }
2089}
2090
2091/// Represents the state of the read end of a stream or future.
2092enum ReadState {
2093    /// The read end is open, but no read is pending.
2094    Open,
2095    /// The read end is owned by a guest task and a read is pending.
2096    GuestReady {
2097        ty: TransmitIndex,
2098        caller: RuntimeComponentInstanceIndex,
2099        flat_abi: Option<FlatAbi>,
2100        instance: Instance,
2101        options: OptionsIndex,
2102        address: usize,
2103        count: usize,
2104        handle: u32,
2105    },
2106    /// The read end is owned by a host task, and it is ready to consume items.
2107    HostReady {
2108        consume: PollStream,
2109        guest_offset: usize,
2110        cancel: bool,
2111        cancel_waker: Option<Waker>,
2112    },
2113    /// Both the read and write ends are owned by the host.
2114    HostToHost {
2115        accept: Box<
2116            dyn for<'a> Fn(
2117                    &'a mut UntypedWriteBuffer<'a>,
2118                )
2119                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
2120                + Send
2121                + Sync,
2122        >,
2123        buffer: Vec<u8>,
2124        limit: usize,
2125    },
2126    /// The read end has been dropped.
2127    Dropped,
2128}
2129
2130impl fmt::Debug for ReadState {
2131    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2132        match self {
2133            Self::Open => f.debug_tuple("Open").finish(),
2134            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
2135            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
2136            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
2137            Self::Dropped => f.debug_tuple("Dropped").finish(),
2138        }
2139    }
2140}
2141
2142fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2143    let count = guest_offset.try_into().unwrap();
2144    match state {
2145        StreamResult::Dropped => ReturnCode::Dropped(count),
2146        StreamResult::Completed => ReturnCode::completed(kind, count),
2147        StreamResult::Cancelled => ReturnCode::Cancelled(count),
2148    }
2149}
2150
2151impl StoreOpaque {
2152    fn pipe_from_guest(
2153        &mut self,
2154        kind: TransmitKind,
2155        id: TableId<TransmitState>,
2156        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2157    ) {
2158        let future = async move {
2159            let stream_state = future.await?;
2160            tls::get(|store| {
2161                let state = store.concurrent_state_mut();
2162                let transmit = state.get_mut(id)?;
2163                let ReadState::HostReady {
2164                    consume,
2165                    guest_offset,
2166                    ..
2167                } = mem::replace(&mut transmit.read, ReadState::Open)
2168                else {
2169                    unreachable!();
2170                };
2171                let code = return_code(kind, stream_state, guest_offset);
2172                transmit.read = match stream_state {
2173                    StreamResult::Dropped => ReadState::Dropped,
2174                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2175                        consume,
2176                        guest_offset: 0,
2177                        cancel: false,
2178                        cancel_waker: None,
2179                    },
2180                };
2181                let WriteState::GuestReady { ty, handle, .. } =
2182                    mem::replace(&mut transmit.write, WriteState::Open)
2183                else {
2184                    unreachable!();
2185                };
2186                state.send_write_result(ty, id, handle, code)?;
2187                Ok(())
2188            })
2189        };
2190
2191        self.concurrent_state_mut().push_future(future.boxed());
2192    }
2193
2194    fn pipe_to_guest(
2195        &mut self,
2196        kind: TransmitKind,
2197        id: TableId<TransmitState>,
2198        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2199    ) {
2200        let future = async move {
2201            let stream_state = future.await?;
2202            tls::get(|store| {
2203                let state = store.concurrent_state_mut();
2204                let transmit = state.get_mut(id)?;
2205                let WriteState::HostReady {
2206                    produce,
2207                    try_into,
2208                    guest_offset,
2209                    ..
2210                } = mem::replace(&mut transmit.write, WriteState::Open)
2211                else {
2212                    unreachable!();
2213                };
2214                let code = return_code(kind, stream_state, guest_offset);
2215                transmit.write = match stream_state {
2216                    StreamResult::Dropped => WriteState::Dropped,
2217                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2218                        produce,
2219                        try_into,
2220                        guest_offset: 0,
2221                        cancel: false,
2222                        cancel_waker: None,
2223                    },
2224                };
2225                let ReadState::GuestReady { ty, handle, .. } =
2226                    mem::replace(&mut transmit.read, ReadState::Open)
2227                else {
2228                    unreachable!();
2229                };
2230                state.send_read_result(ty, id, handle, code)?;
2231                Ok(())
2232            })
2233        };
2234
2235        self.concurrent_state_mut().push_future(future.boxed());
2236    }
2237
2238    /// Drop the read end of a stream or future read from the host.
2239    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2240        let state = self.concurrent_state_mut();
2241        let transmit_id = state.get_mut(id)?.state;
2242        let transmit = state
2243            .get_mut(transmit_id)
2244            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2245        log::trace!(
2246            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2247            transmit.read,
2248            transmit.write
2249        );
2250
2251        transmit.read = ReadState::Dropped;
2252
2253        // If the write end is already dropped, it should stay dropped,
2254        // otherwise, it should be opened.
2255        let new_state = if let WriteState::Dropped = &transmit.write {
2256            WriteState::Dropped
2257        } else {
2258            WriteState::Open
2259        };
2260
2261        let write_handle = transmit.write_handle;
2262
2263        match mem::replace(&mut transmit.write, new_state) {
2264            // If a guest is waiting to write, notify it that the read end has
2265            // been dropped.
2266            WriteState::GuestReady { ty, handle, .. } => {
2267                state.update_event(
2268                    write_handle.rep(),
2269                    match ty {
2270                        TransmitIndex::Future(ty) => Event::FutureWrite {
2271                            code: ReturnCode::Dropped(0),
2272                            pending: Some((ty, handle)),
2273                        },
2274                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2275                            code: ReturnCode::Dropped(0),
2276                            pending: Some((ty, handle)),
2277                        },
2278                    },
2279                )?;
2280            }
2281
2282            WriteState::HostReady { .. } => {}
2283
2284            WriteState::Open => {
2285                state.update_event(
2286                    write_handle.rep(),
2287                    match kind {
2288                        TransmitKind::Future => Event::FutureWrite {
2289                            code: ReturnCode::Dropped(0),
2290                            pending: None,
2291                        },
2292                        TransmitKind::Stream => Event::StreamWrite {
2293                            code: ReturnCode::Dropped(0),
2294                            pending: None,
2295                        },
2296                    },
2297                )?;
2298            }
2299
2300            WriteState::Dropped => {
2301                log::trace!("host_drop_reader delete {transmit_id:?}");
2302                state.delete_transmit(transmit_id)?;
2303            }
2304        }
2305        Ok(())
2306    }
2307
2308    /// Drop the write end of a stream or future read from the host.
2309    fn host_drop_writer(
2310        &mut self,
2311        id: TableId<TransmitHandle>,
2312        on_drop_open: Option<fn() -> Result<()>>,
2313    ) -> Result<()> {
2314        let state = self.concurrent_state_mut();
2315        let transmit_id = state.get_mut(id)?.state;
2316        let transmit = state
2317            .get_mut(transmit_id)
2318            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2319        log::trace!(
2320            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2321            transmit.read,
2322            transmit.write
2323        );
2324
2325        // Existing queued transmits must be updated with information for the impending writer closure
2326        match &mut transmit.write {
2327            WriteState::GuestReady { .. } => {
2328                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2329            }
2330            WriteState::HostReady { .. } => {}
2331            v @ WriteState::Open => {
2332                if let (Some(on_drop_open), false) = (
2333                    on_drop_open,
2334                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2335                ) {
2336                    on_drop_open()?;
2337                } else {
2338                    *v = WriteState::Dropped;
2339                }
2340            }
2341            WriteState::Dropped => unreachable!("write state is already dropped"),
2342        }
2343
2344        let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2345
2346        // If the existing read state is dropped, then there's nothing to read
2347        // and we can keep it that way.
2348        //
2349        // If the read state was any other state, then we must set the new state to open
2350        // to indicate that there *is* data to be read
2351        let new_state = if let ReadState::Dropped = &transmit.read {
2352            ReadState::Dropped
2353        } else {
2354            ReadState::Open
2355        };
2356
2357        let read_handle = transmit.read_handle;
2358
2359        // Swap in the new read state
2360        match mem::replace(&mut transmit.read, new_state) {
2361            // If the guest was ready to read, then we cannot drop the reader (or writer);
2362            // we must deliver the event, and update the state associated with the handle to
2363            // represent that a read must be performed
2364            ReadState::GuestReady { ty, handle, .. } => {
2365                // Ensure the final read of the guest is queued, with appropriate closure indicator
2366                self.concurrent_state_mut().update_event(
2367                    read_handle.rep(),
2368                    match ty {
2369                        TransmitIndex::Future(ty) => Event::FutureRead {
2370                            code: ReturnCode::Dropped(0),
2371                            pending: Some((ty, handle)),
2372                        },
2373                        TransmitIndex::Stream(ty) => Event::StreamRead {
2374                            code: ReturnCode::Dropped(0),
2375                            pending: Some((ty, handle)),
2376                        },
2377                    },
2378                )?;
2379            }
2380
2381            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2382
2383            // If the read state is open, then there are no registered readers of the stream/future
2384            ReadState::Open => {
2385                self.concurrent_state_mut().update_event(
2386                    read_handle.rep(),
2387                    match on_drop_open {
2388                        Some(_) => Event::FutureRead {
2389                            code: ReturnCode::Dropped(0),
2390                            pending: None,
2391                        },
2392                        None => Event::StreamRead {
2393                            code: ReturnCode::Dropped(0),
2394                            pending: None,
2395                        },
2396                    },
2397                )?;
2398            }
2399
2400            // If the read state was already dropped, then we can remove the transmit state completely
2401            // (both writer and reader have been dropped)
2402            ReadState::Dropped => {
2403                log::trace!("host_drop_writer delete {transmit_id:?}");
2404                self.concurrent_state_mut().delete_transmit(transmit_id)?;
2405            }
2406        }
2407        Ok(())
2408    }
2409
2410    pub(super) fn transmit_origin(
2411        &mut self,
2412        id: TableId<TransmitHandle>,
2413    ) -> Result<TransmitOrigin> {
2414        let state = self.concurrent_state_mut();
2415        let state_id = state.get_mut(id)?.state;
2416        Ok(state.get_mut(state_id)?.origin)
2417    }
2418}
2419
2420impl<T> StoreContextMut<'_, T> {
2421    fn new_transmit<P: StreamProducer<T>>(
2422        mut self,
2423        kind: TransmitKind,
2424        producer: P,
2425    ) -> TableId<TransmitHandle>
2426    where
2427        P::Item: func::Lower,
2428    {
2429        let token = StoreToken::new(self.as_context_mut());
2430        let state = self.0.concurrent_state_mut();
2431        let (_, read) = state.new_transmit(TransmitOrigin::Host).unwrap();
2432        let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2433        let id = state.get_mut(read).unwrap().state;
2434        let mut dropped = false;
2435        let produce = Box::new({
2436            let producer = producer.clone();
2437            move || {
2438                let producer = producer.clone();
2439                async move {
2440                    let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2441
2442                    let (result, cancelled) = if buffer.remaining().is_empty() {
2443                        future::poll_fn(|cx| {
2444                            tls::get(|store| {
2445                                let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2446
2447                                let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2448                                    unreachable!();
2449                                };
2450
2451                                let mut host_buffer =
2452                                    if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2453                                        Some(Cursor::new(mem::take(buffer)))
2454                                    } else {
2455                                        None
2456                                    };
2457
2458                                let poll = mine.as_mut().poll_produce(
2459                                    cx,
2460                                    token.as_context_mut(store),
2461                                    Destination {
2462                                        id,
2463                                        buffer: &mut buffer,
2464                                        host_buffer: host_buffer.as_mut(),
2465                                        _phantom: PhantomData,
2466                                    },
2467                                    cancel,
2468                                );
2469
2470                                let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2471
2472                                let host_offset = if let (
2473                                    Some(host_buffer),
2474                                    ReadState::HostToHost { buffer, limit, .. },
2475                                ) = (host_buffer, &mut transmit.read)
2476                                {
2477                                    *limit = usize::try_from(host_buffer.position()).unwrap();
2478                                    *buffer = host_buffer.into_inner();
2479                                    *limit
2480                                } else {
2481                                    0
2482                                };
2483
2484                                {
2485                                    let WriteState::HostReady {
2486                                        guest_offset,
2487                                        cancel,
2488                                        cancel_waker,
2489                                        ..
2490                                    } = &mut transmit.write
2491                                    else {
2492                                        unreachable!();
2493                                    };
2494
2495                                    if poll.is_pending() {
2496                                        if !buffer.remaining().is_empty()
2497                                            || *guest_offset > 0
2498                                            || host_offset > 0
2499                                        {
2500                                            return Poll::Ready(Err(format_err!(
2501                                                "StreamProducer::poll_produce returned Poll::Pending \
2502                                                 after producing at least one item"
2503                                            )));
2504                                        }
2505                                        *cancel_waker = Some(cx.waker().clone());
2506                                    } else {
2507                                        *cancel_waker = None;
2508                                        *cancel = false;
2509                                    }
2510                                }
2511
2512                                poll.map(|v| v.map(|result| (result, cancel)))
2513                            })
2514                        })
2515                            .await?
2516                    } else {
2517                        (StreamResult::Completed, false)
2518                    };
2519
2520                    let (guest_offset, host_offset, count) = tls::get(|store| {
2521                        let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2522                        let (count, host_offset) = match &transmit.read {
2523                            &ReadState::GuestReady { count, .. } => (count, 0),
2524                            &ReadState::HostToHost { limit, .. } => (1, limit),
2525                            _ => unreachable!(),
2526                        };
2527                        let guest_offset = match &transmit.write {
2528                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2529                            _ => unreachable!(),
2530                        };
2531                        (guest_offset, host_offset, count)
2532                    });
2533
2534                    match result {
2535                        StreamResult::Completed => {
2536                            if count > 1
2537                                && buffer.remaining().is_empty()
2538                                && guest_offset == 0
2539                                && host_offset == 0
2540                            {
2541                                bail!(
2542                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2543                                     without producing any items"
2544                                );
2545                            }
2546                        }
2547                        StreamResult::Cancelled => {
2548                            if !cancelled {
2549                                bail!(
2550                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2551                                     without being given a `finish` parameter value of true"
2552                                );
2553                            }
2554                        }
2555                        StreamResult::Dropped => {
2556                            dropped = true;
2557                        }
2558                    }
2559
2560                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2561
2562                    *producer.lock().unwrap() = Some((mine, buffer));
2563
2564                    if write_buffer {
2565                        write(token, id, producer.clone(), kind).await?;
2566                    }
2567
2568                    Ok(if dropped {
2569                        if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2570                        {
2571                            StreamResult::Dropped
2572                        } else {
2573                            StreamResult::Completed
2574                        }
2575                    } else {
2576                        result
2577                    })
2578                }
2579                .boxed()
2580            }
2581        });
2582        let try_into = Box::new(move |ty| {
2583            let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2584            match P::try_into(mine, ty) {
2585                Ok(value) => Some(value),
2586                Err(mine) => {
2587                    *producer.lock().unwrap() = Some((mine, buffer));
2588                    None
2589                }
2590            }
2591        });
2592        state.get_mut(id).unwrap().write = WriteState::HostReady {
2593            produce,
2594            try_into,
2595            guest_offset: 0,
2596            cancel: false,
2597            cancel_waker: None,
2598        };
2599        read
2600    }
2601
2602    fn set_consumer<C: StreamConsumer<T>>(
2603        mut self,
2604        id: TableId<TransmitHandle>,
2605        kind: TransmitKind,
2606        consumer: C,
2607    ) {
2608        let token = StoreToken::new(self.as_context_mut());
2609        let state = self.0.concurrent_state_mut();
2610        let id = state.get_mut(id).unwrap().state;
2611        let transmit = state.get_mut(id).unwrap();
2612        let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2613        let consume_with_buffer = {
2614            let consumer = consumer.clone();
2615            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2616                let mut mine = consumer.lock().unwrap().take().unwrap();
2617
2618                let host_buffer_remaining_before =
2619                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2620
2621                let (result, cancelled) = future::poll_fn(|cx| {
2622                    tls::get(|store| {
2623                        let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2624                            &ReadState::HostReady { cancel, .. } => cancel,
2625                            ReadState::Open => false,
2626                            _ => unreachable!(),
2627                        };
2628
2629                        let poll = mine.as_mut().poll_consume(
2630                            cx,
2631                            token.as_context_mut(store),
2632                            Source {
2633                                id,
2634                                host_buffer: host_buffer.as_deref_mut(),
2635                            },
2636                            cancel,
2637                        );
2638
2639                        if let ReadState::HostReady {
2640                            cancel_waker,
2641                            cancel,
2642                            ..
2643                        } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2644                        {
2645                            if poll.is_pending() {
2646                                *cancel_waker = Some(cx.waker().clone());
2647                            } else {
2648                                *cancel_waker = None;
2649                                *cancel = false;
2650                            }
2651                        }
2652
2653                        poll.map(|v| v.map(|result| (result, cancel)))
2654                    })
2655                })
2656                .await?;
2657
2658                let (guest_offset, count) = tls::get(|store| {
2659                    let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2660                    (
2661                        match &transmit.read {
2662                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2663                            ReadState::Open => 0,
2664                            _ => unreachable!(),
2665                        },
2666                        match &transmit.write {
2667                            &WriteState::GuestReady { count, .. } => count,
2668                            WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2669                            _ => unreachable!(),
2670                        },
2671                    )
2672                });
2673
2674                match result {
2675                    StreamResult::Completed => {
2676                        if count > 0
2677                            && guest_offset == 0
2678                            && host_buffer_remaining_before
2679                                .zip(host_buffer.map(|v| v.remaining().len()))
2680                                .map(|(before, after)| before == after)
2681                                .unwrap_or(false)
2682                        {
2683                            bail!(
2684                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2685                                 without consuming any items"
2686                            );
2687                        }
2688
2689                        if let TransmitKind::Future = kind {
2690                            tls::get(|store| {
2691                                store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2692                            });
2693                        }
2694                    }
2695                    StreamResult::Cancelled => {
2696                        if !cancelled {
2697                            bail!(
2698                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2699                                 without being given a `finish` parameter value of true"
2700                            );
2701                        }
2702                    }
2703                    StreamResult::Dropped => {}
2704                }
2705
2706                *consumer.lock().unwrap() = Some(mine);
2707
2708                Ok(result)
2709            }
2710        };
2711        let consume = {
2712            let consume = consume_with_buffer.clone();
2713            Box::new(move || {
2714                let consume = consume.clone();
2715                async move { consume(None).await }.boxed()
2716            })
2717        };
2718
2719        match &transmit.write {
2720            WriteState::Open => {
2721                transmit.read = ReadState::HostReady {
2722                    consume,
2723                    guest_offset: 0,
2724                    cancel: false,
2725                    cancel_waker: None,
2726                };
2727            }
2728            &WriteState::GuestReady { .. } => {
2729                let future = consume();
2730                transmit.read = ReadState::HostReady {
2731                    consume,
2732                    guest_offset: 0,
2733                    cancel: false,
2734                    cancel_waker: None,
2735                };
2736                self.0.pipe_from_guest(kind, id, future);
2737            }
2738            WriteState::HostReady { .. } => {
2739                let WriteState::HostReady { produce, .. } = mem::replace(
2740                    &mut transmit.write,
2741                    WriteState::HostReady {
2742                        produce: Box::new(|| unreachable!()),
2743                        try_into: Box::new(|_| unreachable!()),
2744                        guest_offset: 0,
2745                        cancel: false,
2746                        cancel_waker: None,
2747                    },
2748                ) else {
2749                    unreachable!();
2750                };
2751
2752                transmit.read = ReadState::HostToHost {
2753                    accept: Box::new(move |input| {
2754                        let consume = consume_with_buffer.clone();
2755                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2756                    }),
2757                    buffer: Vec::new(),
2758                    limit: 0,
2759                };
2760
2761                let future = async move {
2762                    loop {
2763                        if tls::get(|store| {
2764                            crate::error::Ok(matches!(
2765                                store.concurrent_state_mut().get_mut(id)?.read,
2766                                ReadState::Dropped
2767                            ))
2768                        })? {
2769                            break Ok(());
2770                        }
2771
2772                        match produce().await? {
2773                            StreamResult::Completed | StreamResult::Cancelled => {}
2774                            StreamResult::Dropped => break Ok(()),
2775                        }
2776
2777                        if let TransmitKind::Future = kind {
2778                            break Ok(());
2779                        }
2780                    }
2781                }
2782                .map(move |result| {
2783                    tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2784                    result
2785                });
2786
2787                state.push_future(Box::pin(future));
2788            }
2789            WriteState::Dropped => {
2790                let reader = transmit.read_handle;
2791                self.0.host_drop_reader(reader, kind).unwrap();
2792            }
2793        }
2794    }
2795}
2796
2797async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2798    token: StoreToken<D>,
2799    id: TableId<TransmitState>,
2800    pair: Arc<Mutex<Option<(P, B)>>>,
2801    kind: TransmitKind,
2802) -> Result<()> {
2803    let (read, guest_offset) = tls::get(|store| {
2804        let transmit = store.concurrent_state_mut().get_mut(id)?;
2805
2806        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2807            Some(guest_offset)
2808        } else {
2809            None
2810        };
2811
2812        crate::error::Ok((
2813            mem::replace(&mut transmit.read, ReadState::Open),
2814            guest_offset,
2815        ))
2816    })?;
2817
2818    match read {
2819        ReadState::GuestReady {
2820            ty,
2821            flat_abi,
2822            options,
2823            address,
2824            count,
2825            handle,
2826            instance,
2827            caller,
2828        } => {
2829            let guest_offset = guest_offset.unwrap();
2830
2831            if let TransmitKind::Future = kind {
2832                tls::get(|store| {
2833                    store.concurrent_state_mut().get_mut(id)?.done = true;
2834                    crate::error::Ok(())
2835                })?;
2836            }
2837
2838            let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2839            let accept = {
2840                let pair = pair.clone();
2841                move |mut store: StoreContextMut<D>| {
2842                    lower::<T, B, D>(
2843                        store.as_context_mut(),
2844                        instance,
2845                        options,
2846                        ty,
2847                        address + (T::SIZE32 * guest_offset),
2848                        count - guest_offset,
2849                        &mut pair.lock().unwrap().as_mut().unwrap().1,
2850                    )?;
2851                    crate::error::Ok(())
2852                }
2853            };
2854
2855            if guest_offset < count {
2856                if T::MAY_REQUIRE_REALLOC {
2857                    // For payloads which may require a realloc call, use a
2858                    // oneshot::channel and background task.  This is
2859                    // necessary because calling the guest while there are
2860                    // host embedder frames on the stack is unsound.
2861                    let (tx, rx) = oneshot::channel();
2862                    tls::get(move |store| {
2863                        store
2864                            .concurrent_state_mut()
2865                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2866                                move |store| {
2867                                    _ = tx.send(accept(token.as_context_mut(store))?);
2868                                    Ok(())
2869                                },
2870                            ))))
2871                    });
2872                    rx.await?
2873                } else {
2874                    // Optimize flat payloads (i.e. those which do not
2875                    // require calling the guest's realloc function) by
2876                    // lowering directly instead of using a oneshot::channel
2877                    // and background task.
2878                    tls::get(|store| accept(token.as_context_mut(store)))?
2879                };
2880            }
2881
2882            tls::get(|store| {
2883                let count =
2884                    old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2885
2886                let transmit = store.concurrent_state_mut().get_mut(id)?;
2887
2888                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2889                    unreachable!();
2890                };
2891
2892                *guest_offset += count;
2893
2894                transmit.read = ReadState::GuestReady {
2895                    ty,
2896                    flat_abi,
2897                    options,
2898                    address,
2899                    count,
2900                    handle,
2901                    instance,
2902                    caller,
2903                };
2904
2905                crate::error::Ok(())
2906            })?;
2907
2908            Ok(())
2909        }
2910
2911        ReadState::HostToHost {
2912            accept,
2913            mut buffer,
2914            limit,
2915        } => {
2916            let mut state = StreamResult::Completed;
2917            let mut position = 0;
2918
2919            while !matches!(state, StreamResult::Dropped) && position < limit {
2920                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2921                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2922                (buffer, position, _) = slice_buffer.into_parts();
2923            }
2924
2925            {
2926                let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2927
2928                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2929                    state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2930                }
2931
2932                *pair.lock().unwrap() = Some((mine, buffer));
2933            }
2934
2935            tls::get(|store| {
2936                store.concurrent_state_mut().get_mut(id)?.read = match state {
2937                    StreamResult::Dropped => ReadState::Dropped,
2938                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2939                        accept,
2940                        buffer,
2941                        limit: 0,
2942                    },
2943                };
2944
2945                crate::error::Ok(())
2946            })?;
2947            Ok(())
2948        }
2949
2950        _ => unreachable!(),
2951    }
2952}
2953
2954impl Instance {
2955    /// Handle a host- or guest-initiated write by delivering the item(s) to the
2956    /// `StreamConsumer` for the specified stream or future.
2957    fn consume(
2958        self,
2959        store: &mut dyn VMStore,
2960        kind: TransmitKind,
2961        transmit_id: TableId<TransmitState>,
2962        consume: PollStream,
2963        guest_offset: usize,
2964        cancel: bool,
2965    ) -> Result<ReturnCode> {
2966        let mut future = consume();
2967        store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2968            consume,
2969            guest_offset,
2970            cancel,
2971            cancel_waker: None,
2972        };
2973        let poll = tls::set(store, || {
2974            future
2975                .as_mut()
2976                .poll(&mut Context::from_waker(&Waker::noop()))
2977        });
2978
2979        Ok(match poll {
2980            Poll::Ready(state) => {
2981                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2982                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2983                    unreachable!();
2984                };
2985                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2986                transmit.write = WriteState::Open;
2987                code
2988            }
2989            Poll::Pending => {
2990                store.pipe_from_guest(kind, transmit_id, future);
2991                ReturnCode::Blocked
2992            }
2993        })
2994    }
2995
2996    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
2997    /// for the specified stream or future for items.
2998    fn produce(
2999        self,
3000        store: &mut dyn VMStore,
3001        kind: TransmitKind,
3002        transmit_id: TableId<TransmitState>,
3003        produce: PollStream,
3004        try_into: TryInto,
3005        guest_offset: usize,
3006        cancel: bool,
3007    ) -> Result<ReturnCode> {
3008        let mut future = produce();
3009        store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
3010            produce,
3011            try_into,
3012            guest_offset,
3013            cancel,
3014            cancel_waker: None,
3015        };
3016        let poll = tls::set(store, || {
3017            future
3018                .as_mut()
3019                .poll(&mut Context::from_waker(&Waker::noop()))
3020        });
3021
3022        Ok(match poll {
3023            Poll::Ready(state) => {
3024                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3025                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
3026                    unreachable!();
3027                };
3028                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
3029                transmit.read = ReadState::Open;
3030                code
3031            }
3032            Poll::Pending => {
3033                store.pipe_to_guest(kind, transmit_id, future);
3034                ReturnCode::Blocked
3035            }
3036        })
3037    }
3038
3039    /// Drop the writable end of the specified stream or future from the guest.
3040    pub(super) fn guest_drop_writable(
3041        self,
3042        store: &mut StoreOpaque,
3043        ty: TransmitIndex,
3044        writer: u32,
3045    ) -> Result<()> {
3046        let table = self.id().get_mut(store).table_for_transmit(ty);
3047        let transmit_rep = match ty {
3048            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
3049            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
3050        };
3051
3052        let id = TableId::<TransmitHandle>::new(transmit_rep);
3053        log::trace!("guest_drop_writable: drop writer {id:?}");
3054        match ty {
3055            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
3056            TransmitIndex::Future(_) => store.host_drop_writer(
3057                id,
3058                Some(|| {
3059                    Err(format_err!(
3060                        "cannot drop future write end without first writing a value"
3061                    ))
3062                }),
3063            ),
3064        }
3065    }
3066
3067    /// Copy `count` items from `read_address` to `write_address` for the
3068    /// specified stream or future.
3069    fn copy<T: 'static>(
3070        self,
3071        mut store: StoreContextMut<T>,
3072        flat_abi: Option<FlatAbi>,
3073        write_caller: RuntimeComponentInstanceIndex,
3074        write_ty: TransmitIndex,
3075        write_options: OptionsIndex,
3076        write_address: usize,
3077        read_caller: RuntimeComponentInstanceIndex,
3078        read_ty: TransmitIndex,
3079        read_options: OptionsIndex,
3080        read_address: usize,
3081        count: usize,
3082        rep: u32,
3083    ) -> Result<()> {
3084        let types = self.id().get(store.0).component().types();
3085        match (write_ty, read_ty) {
3086            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3087                assert_eq!(count, 1);
3088
3089                let payload = types[types[write_ty].ty].payload;
3090
3091                if write_caller == read_caller && !allow_intra_component_read_write(payload) {
3092                    bail!(
3093                        "cannot read from and write to intra-component future with non-numeric payload"
3094                    )
3095                }
3096
3097                let val = payload
3098                    .map(|ty| {
3099                        let lift =
3100                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3101
3102                        let abi = lift.types.canonical_abi(&ty);
3103                        // FIXME: needs to read an i64 for memory64
3104                        if write_address % usize::try_from(abi.align32)? != 0 {
3105                            bail!("write pointer not aligned");
3106                        }
3107
3108                        let bytes = lift
3109                            .memory()
3110                            .get(write_address..)
3111                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
3112                            .ok_or_else(|| {
3113                                crate::format_err!("write pointer out of bounds of memory")
3114                            })?;
3115
3116                        Val::load(lift, ty, bytes)
3117                    })
3118                    .transpose()?;
3119
3120                if let Some(val) = val {
3121                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3122                    let types = lower.types;
3123                    let ty = types[types[read_ty].ty].payload.unwrap();
3124                    let ptr = func::validate_inbounds_dynamic(
3125                        types.canonical_abi(&ty),
3126                        lower.as_slice_mut(),
3127                        &ValRaw::u32(read_address.try_into().unwrap()),
3128                    )?;
3129                    val.store(lower, ty, ptr)?;
3130                }
3131            }
3132            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3133                if write_caller == read_caller
3134                    && !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3135                {
3136                    bail!(
3137                        "cannot read from and write to intra-component stream with non-numeric payload"
3138                    )
3139                }
3140
3141                if let Some(flat_abi) = flat_abi {
3142                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
3143                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
3144                    if length_in_bytes > 0 {
3145                        if write_address % usize::try_from(flat_abi.align)? != 0 {
3146                            bail!("write pointer not aligned");
3147                        }
3148                        if read_address % usize::try_from(flat_abi.align)? != 0 {
3149                            bail!("read pointer not aligned");
3150                        }
3151
3152                        let store_opaque = store.0.store_opaque_mut();
3153
3154                        {
3155                            let src = self
3156                                .options_memory(store_opaque, write_options)
3157                                .get(write_address..)
3158                                .and_then(|b| b.get(..length_in_bytes))
3159                                .ok_or_else(|| {
3160                                    crate::format_err!("write pointer out of bounds of memory")
3161                                })?
3162                                .as_ptr();
3163                            let dst = self
3164                                .options_memory_mut(store_opaque, read_options)
3165                                .get_mut(read_address..)
3166                                .and_then(|b| b.get_mut(..length_in_bytes))
3167                                .ok_or_else(|| {
3168                                    crate::format_err!("read pointer out of bounds of memory")
3169                                })?
3170                                .as_mut_ptr();
3171                            // SAFETY: Both `src` and `dst` have been validated
3172                            // above.
3173                            unsafe {
3174                                if write_caller == read_caller {
3175                                    // If the same instance owns both ends of
3176                                    // the stream, the source and destination
3177                                    // buffers might overlap.
3178                                    src.copy_to(dst, length_in_bytes)
3179                                } else {
3180                                    // Since the read and write ends of the
3181                                    // stream are owned by distinct instances,
3182                                    // the buffers cannot possibly belong to the
3183                                    // same memory and thus cannot overlap.
3184                                    src.copy_to_nonoverlapping(dst, length_in_bytes)
3185                                }
3186                            }
3187                        }
3188                    }
3189                } else {
3190                    let store_opaque = store.0.store_opaque_mut();
3191                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
3192                    let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3193                    let abi = lift.types.canonical_abi(&ty);
3194                    let size = usize::try_from(abi.size32).unwrap();
3195                    if write_address % usize::try_from(abi.align32)? != 0 {
3196                        bail!("write pointer not aligned");
3197                    }
3198                    let bytes = lift
3199                        .memory()
3200                        .get(write_address..)
3201                        .and_then(|b| b.get(..size * count))
3202                        .ok_or_else(|| {
3203                            crate::format_err!("write pointer out of bounds of memory")
3204                        })?;
3205
3206                    let values = (0..count)
3207                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3208                        .collect::<Result<Vec<_>>>()?;
3209
3210                    let id = TableId::<TransmitHandle>::new(rep);
3211                    log::trace!("copy values {values:?} for {id:?}");
3212
3213                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3214                    let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3215                    let abi = lower.types.canonical_abi(&ty);
3216                    if read_address % usize::try_from(abi.align32)? != 0 {
3217                        bail!("read pointer not aligned");
3218                    }
3219                    let size = usize::try_from(abi.size32).unwrap();
3220                    lower
3221                        .as_slice_mut()
3222                        .get_mut(read_address..)
3223                        .and_then(|b| b.get_mut(..size * count))
3224                        .ok_or_else(|| {
3225                            crate::format_err!("read pointer out of bounds of memory")
3226                        })?;
3227                    let mut ptr = read_address;
3228                    for value in values {
3229                        value.store(lower, ty, ptr)?;
3230                        ptr += size
3231                    }
3232                }
3233            }
3234            _ => unreachable!(),
3235        }
3236
3237        Ok(())
3238    }
3239
3240    fn check_bounds(
3241        self,
3242        store: &StoreOpaque,
3243        options: OptionsIndex,
3244        ty: TransmitIndex,
3245        address: usize,
3246        count: usize,
3247    ) -> Result<()> {
3248        let types = self.id().get(store).component().types();
3249        let size = usize::try_from(
3250            match ty {
3251                TransmitIndex::Future(ty) => types[types[ty].ty]
3252                    .payload
3253                    .map(|ty| types.canonical_abi(&ty).size32),
3254                TransmitIndex::Stream(ty) => types[types[ty].ty]
3255                    .payload
3256                    .map(|ty| types.canonical_abi(&ty).size32),
3257            }
3258            .unwrap_or(0),
3259        )
3260        .unwrap();
3261
3262        if count > 0 && size > 0 {
3263            self.options_memory(store, options)
3264                .get(address..)
3265                .and_then(|b| b.get(..(size * count)))
3266                .map(drop)
3267                .ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
3268        } else {
3269            Ok(())
3270        }
3271    }
3272
3273    /// Write to the specified stream or future from the guest.
3274    pub(super) fn guest_write<T: 'static>(
3275        self,
3276        mut store: StoreContextMut<T>,
3277        caller: RuntimeComponentInstanceIndex,
3278        ty: TransmitIndex,
3279        options: OptionsIndex,
3280        flat_abi: Option<FlatAbi>,
3281        handle: u32,
3282        address: u32,
3283        count: u32,
3284    ) -> Result<ReturnCode> {
3285        if !self.options(store.0, options).async_ {
3286            // The caller may only sync call `{stream,future}.write` from an
3287            // async task (i.e. a task created via a call to an async export).
3288            // Otherwise, we'll trap.
3289            store.0.check_blocking()?;
3290        }
3291
3292        let address = usize::try_from(address).unwrap();
3293        let count = usize::try_from(count).unwrap();
3294        self.check_bounds(store.0, options, ty, address, count)?;
3295        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3296        let TransmitLocalState::Write { done } = *state else {
3297            bail!(
3298                "invalid handle {handle}; expected `Write`; got {:?}",
3299                *state
3300            );
3301        };
3302
3303        if done {
3304            bail!("cannot write to stream after being notified that the readable end dropped");
3305        }
3306
3307        *state = TransmitLocalState::Busy;
3308        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3309        let concurrent_state = store.0.concurrent_state_mut();
3310        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3311        let transmit = concurrent_state.get_mut(transmit_id)?;
3312        log::trace!(
3313            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3314            transmit.read
3315        );
3316
3317        if transmit.done {
3318            bail!("cannot write to future after previous write succeeded or readable end dropped");
3319        }
3320
3321        let new_state = if let ReadState::Dropped = &transmit.read {
3322            ReadState::Dropped
3323        } else {
3324            ReadState::Open
3325        };
3326
3327        let set_guest_ready = |me: &mut ConcurrentState| {
3328            let transmit = me.get_mut(transmit_id)?;
3329            assert!(
3330                matches!(&transmit.write, WriteState::Open),
3331                "expected `WriteState::Open`; got `{:?}`",
3332                transmit.write
3333            );
3334            transmit.write = WriteState::GuestReady {
3335                instance: self,
3336                caller,
3337                ty,
3338                flat_abi,
3339                options,
3340                address,
3341                count,
3342                handle,
3343            };
3344            Ok::<_, crate::Error>(())
3345        };
3346
3347        let mut result = match mem::replace(&mut transmit.read, new_state) {
3348            ReadState::GuestReady {
3349                ty: read_ty,
3350                flat_abi: read_flat_abi,
3351                options: read_options,
3352                address: read_address,
3353                count: read_count,
3354                handle: read_handle,
3355                instance: read_instance,
3356                caller: read_caller,
3357            } => {
3358                assert_eq!(flat_abi, read_flat_abi);
3359
3360                if let TransmitIndex::Future(_) = ty {
3361                    transmit.done = true;
3362                }
3363
3364                // Note that zero-length reads and writes are handling specially
3365                // by the spec to allow each end to signal readiness to the
3366                // other.  Quoting the spec:
3367                //
3368                // ```
3369                // The meaning of a read or write when the length is 0 is that
3370                // the caller is querying the "readiness" of the other
3371                // side. When a 0-length read/write rendezvous with a
3372                // non-0-length read/write, only the 0-length read/write
3373                // completes; the non-0-length read/write is kept pending (and
3374                // ready for a subsequent rendezvous).
3375                //
3376                // In the corner case where a 0-length read and write
3377                // rendezvous, only the writer is notified of readiness. To
3378                // avoid livelock, the Canonical ABI requires that a writer must
3379                // (eventually) follow a completed 0-length write with a
3380                // non-0-length write that is allowed to block (allowing the
3381                // reader end to run and rendezvous with its own non-0-length
3382                // read).
3383                // ```
3384
3385                let write_complete = count == 0 || read_count > 0;
3386                let read_complete = count > 0;
3387                let read_buffer_remaining = count < read_count;
3388
3389                let read_handle_rep = transmit.read_handle.rep();
3390
3391                let count = count.min(read_count);
3392
3393                self.copy(
3394                    store.as_context_mut(),
3395                    flat_abi,
3396                    caller,
3397                    ty,
3398                    options,
3399                    address,
3400                    read_caller,
3401                    read_ty,
3402                    read_options,
3403                    read_address,
3404                    count,
3405                    rep,
3406                )?;
3407
3408                let instance = self.id().get_mut(store.0);
3409                let types = instance.component().types();
3410                let item_size = payload(ty, types)
3411                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3412                    .unwrap_or(0);
3413                let concurrent_state = store.0.concurrent_state_mut();
3414                if read_complete {
3415                    let count = u32::try_from(count).unwrap();
3416                    let total = if let Some(Event::StreamRead {
3417                        code: ReturnCode::Completed(old_total),
3418                        ..
3419                    }) = concurrent_state.take_event(read_handle_rep)?
3420                    {
3421                        count + old_total
3422                    } else {
3423                        count
3424                    };
3425
3426                    let code = ReturnCode::completed(ty.kind(), total);
3427
3428                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3429                }
3430
3431                if read_buffer_remaining {
3432                    let transmit = concurrent_state.get_mut(transmit_id)?;
3433                    transmit.read = ReadState::GuestReady {
3434                        ty: read_ty,
3435                        flat_abi: read_flat_abi,
3436                        options: read_options,
3437                        address: read_address + (count * item_size),
3438                        count: read_count - count,
3439                        handle: read_handle,
3440                        instance: read_instance,
3441                        caller: read_caller,
3442                    };
3443                }
3444
3445                if write_complete {
3446                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3447                } else {
3448                    set_guest_ready(concurrent_state)?;
3449                    ReturnCode::Blocked
3450                }
3451            }
3452
3453            ReadState::HostReady {
3454                consume,
3455                guest_offset,
3456                cancel,
3457                cancel_waker,
3458            } => {
3459                assert!(cancel_waker.is_none());
3460                assert!(!cancel);
3461                assert_eq!(0, guest_offset);
3462
3463                if let TransmitIndex::Future(_) = ty {
3464                    transmit.done = true;
3465                }
3466
3467                set_guest_ready(concurrent_state)?;
3468                self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3469            }
3470
3471            ReadState::HostToHost { .. } => unreachable!(),
3472
3473            ReadState::Open => {
3474                set_guest_ready(concurrent_state)?;
3475                ReturnCode::Blocked
3476            }
3477
3478            ReadState::Dropped => {
3479                if let TransmitIndex::Future(_) = ty {
3480                    transmit.done = true;
3481                }
3482
3483                ReturnCode::Dropped(0)
3484            }
3485        };
3486
3487        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3488            result = self.wait_for_write(store.0, transmit_handle)?;
3489        }
3490
3491        if result != ReturnCode::Blocked {
3492            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3493                TransmitLocalState::Write {
3494                    done: matches!(
3495                        (result, ty),
3496                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3497                    ),
3498                };
3499        }
3500
3501        log::trace!(
3502            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3503        );
3504
3505        Ok(result)
3506    }
3507
3508    /// Read from the specified stream or future from the guest.
3509    pub(super) fn guest_read<T: 'static>(
3510        self,
3511        mut store: StoreContextMut<T>,
3512        caller: RuntimeComponentInstanceIndex,
3513        ty: TransmitIndex,
3514        options: OptionsIndex,
3515        flat_abi: Option<FlatAbi>,
3516        handle: u32,
3517        address: u32,
3518        count: u32,
3519    ) -> Result<ReturnCode> {
3520        if !self.options(store.0, options).async_ {
3521            // The caller may only sync call `{stream,future}.read` from an
3522            // async task (i.e. a task created via a call to an async export).
3523            // Otherwise, we'll trap.
3524            store.0.check_blocking()?;
3525        }
3526
3527        let address = usize::try_from(address).unwrap();
3528        let count = usize::try_from(count).unwrap();
3529        self.check_bounds(store.0, options, ty, address, count)?;
3530        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3531        let TransmitLocalState::Read { done } = *state else {
3532            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3533        };
3534
3535        if done {
3536            bail!("cannot read from stream after being notified that the writable end dropped");
3537        }
3538
3539        *state = TransmitLocalState::Busy;
3540        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3541        let concurrent_state = store.0.concurrent_state_mut();
3542        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3543        let transmit = concurrent_state.get_mut(transmit_id)?;
3544        log::trace!(
3545            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3546            transmit.write
3547        );
3548
3549        if transmit.done {
3550            bail!("cannot read from future after previous read succeeded");
3551        }
3552
3553        let new_state = if let WriteState::Dropped = &transmit.write {
3554            WriteState::Dropped
3555        } else {
3556            WriteState::Open
3557        };
3558
3559        let set_guest_ready = |me: &mut ConcurrentState| {
3560            let transmit = me.get_mut(transmit_id)?;
3561            assert!(
3562                matches!(&transmit.read, ReadState::Open),
3563                "expected `ReadState::Open`; got `{:?}`",
3564                transmit.read
3565            );
3566            transmit.read = ReadState::GuestReady {
3567                ty,
3568                flat_abi,
3569                options,
3570                address,
3571                count,
3572                handle,
3573                instance: self,
3574                caller,
3575            };
3576            Ok::<_, crate::Error>(())
3577        };
3578
3579        let mut result = match mem::replace(&mut transmit.write, new_state) {
3580            WriteState::GuestReady {
3581                instance: _,
3582                ty: write_ty,
3583                flat_abi: write_flat_abi,
3584                options: write_options,
3585                address: write_address,
3586                count: write_count,
3587                handle: write_handle,
3588                caller: write_caller,
3589            } => {
3590                assert_eq!(flat_abi, write_flat_abi);
3591
3592                if let TransmitIndex::Future(_) = ty {
3593                    transmit.done = true;
3594                }
3595
3596                let write_handle_rep = transmit.write_handle.rep();
3597
3598                // See the comment in `guest_write` for the
3599                // `ReadState::GuestReady` case concerning zero-length reads and
3600                // writes.
3601
3602                let write_complete = write_count == 0 || count > 0;
3603                let read_complete = write_count > 0;
3604                let write_buffer_remaining = count < write_count;
3605
3606                let count = count.min(write_count);
3607
3608                self.copy(
3609                    store.as_context_mut(),
3610                    flat_abi,
3611                    write_caller,
3612                    write_ty,
3613                    write_options,
3614                    write_address,
3615                    caller,
3616                    ty,
3617                    options,
3618                    address,
3619                    count,
3620                    rep,
3621                )?;
3622
3623                let instance = self.id().get_mut(store.0);
3624                let types = instance.component().types();
3625                let item_size = payload(ty, types)
3626                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3627                    .unwrap_or(0);
3628                let concurrent_state = store.0.concurrent_state_mut();
3629
3630                if write_complete {
3631                    let count = u32::try_from(count).unwrap();
3632                    let total = if let Some(Event::StreamWrite {
3633                        code: ReturnCode::Completed(old_total),
3634                        ..
3635                    }) = concurrent_state.take_event(write_handle_rep)?
3636                    {
3637                        count + old_total
3638                    } else {
3639                        count
3640                    };
3641
3642                    let code = ReturnCode::completed(ty.kind(), total);
3643
3644                    concurrent_state.send_write_result(
3645                        write_ty,
3646                        transmit_id,
3647                        write_handle,
3648                        code,
3649                    )?;
3650                }
3651
3652                if write_buffer_remaining {
3653                    let transmit = concurrent_state.get_mut(transmit_id)?;
3654                    transmit.write = WriteState::GuestReady {
3655                        instance: self,
3656                        caller: write_caller,
3657                        ty: write_ty,
3658                        flat_abi: write_flat_abi,
3659                        options: write_options,
3660                        address: write_address + (count * item_size),
3661                        count: write_count - count,
3662                        handle: write_handle,
3663                    };
3664                }
3665
3666                if read_complete {
3667                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3668                } else {
3669                    set_guest_ready(concurrent_state)?;
3670                    ReturnCode::Blocked
3671                }
3672            }
3673
3674            WriteState::HostReady {
3675                produce,
3676                try_into,
3677                guest_offset,
3678                cancel,
3679                cancel_waker,
3680            } => {
3681                assert!(cancel_waker.is_none());
3682                assert!(!cancel);
3683                assert_eq!(0, guest_offset);
3684
3685                set_guest_ready(concurrent_state)?;
3686
3687                let code =
3688                    self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3689
3690                if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3691                    store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3692                }
3693
3694                code
3695            }
3696
3697            WriteState::Open => {
3698                set_guest_ready(concurrent_state)?;
3699                ReturnCode::Blocked
3700            }
3701
3702            WriteState::Dropped => ReturnCode::Dropped(0),
3703        };
3704
3705        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3706            result = self.wait_for_read(store.0, transmit_handle)?;
3707        }
3708
3709        if result != ReturnCode::Blocked {
3710            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3711                TransmitLocalState::Read {
3712                    done: matches!(
3713                        (result, ty),
3714                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3715                    ),
3716                };
3717        }
3718
3719        log::trace!(
3720            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3721        );
3722
3723        Ok(result)
3724    }
3725
3726    fn wait_for_write(
3727        self,
3728        store: &mut StoreOpaque,
3729        handle: TableId<TransmitHandle>,
3730    ) -> Result<ReturnCode> {
3731        let waitable = Waitable::Transmit(handle);
3732        store.wait_for_event(waitable)?;
3733        let event = waitable.take_event(store.concurrent_state_mut())?;
3734        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3735            event
3736        {
3737            waitable.on_delivery(store, self, event);
3738            Ok(code)
3739        } else {
3740            unreachable!()
3741        }
3742    }
3743
3744    /// Cancel a pending stream or future write.
3745    fn cancel_write(
3746        self,
3747        store: &mut StoreOpaque,
3748        transmit_id: TableId<TransmitState>,
3749        async_: bool,
3750    ) -> Result<ReturnCode> {
3751        let state = store.concurrent_state_mut();
3752        let transmit = state.get_mut(transmit_id)?;
3753        log::trace!(
3754            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3755            transmit.read,
3756            transmit.write
3757        );
3758
3759        let code = if let Some(event) =
3760            Waitable::Transmit(transmit.write_handle).take_event(state)?
3761        {
3762            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3763                unreachable!();
3764            };
3765            match (code, event) {
3766                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3767                    ReturnCode::Cancelled(count)
3768                }
3769                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3770                _ => unreachable!(),
3771            }
3772        } else if let ReadState::HostReady {
3773            cancel,
3774            cancel_waker,
3775            ..
3776        } = &mut state.get_mut(transmit_id)?.read
3777        {
3778            *cancel = true;
3779            if let Some(waker) = cancel_waker.take() {
3780                waker.wake();
3781            }
3782
3783            if async_ {
3784                ReturnCode::Blocked
3785            } else {
3786                let handle = store
3787                    .concurrent_state_mut()
3788                    .get_mut(transmit_id)?
3789                    .write_handle;
3790                self.wait_for_write(store, handle)?
3791            }
3792        } else {
3793            ReturnCode::Cancelled(0)
3794        };
3795
3796        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3797
3798        match &transmit.write {
3799            WriteState::GuestReady { .. } => {
3800                transmit.write = WriteState::Open;
3801            }
3802            WriteState::HostReady { .. } => todo!("support host write cancellation"),
3803            WriteState::Open | WriteState::Dropped => {}
3804        }
3805
3806        log::trace!("cancelled write {transmit_id:?}: {code:?}");
3807
3808        Ok(code)
3809    }
3810
3811    fn wait_for_read(
3812        self,
3813        store: &mut StoreOpaque,
3814        handle: TableId<TransmitHandle>,
3815    ) -> Result<ReturnCode> {
3816        let waitable = Waitable::Transmit(handle);
3817        store.wait_for_event(waitable)?;
3818        let event = waitable.take_event(store.concurrent_state_mut())?;
3819        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3820            event
3821        {
3822            waitable.on_delivery(store, self, event);
3823            Ok(code)
3824        } else {
3825            unreachable!()
3826        }
3827    }
3828
3829    /// Cancel a pending stream or future read.
3830    fn cancel_read(
3831        self,
3832        store: &mut StoreOpaque,
3833        transmit_id: TableId<TransmitState>,
3834        async_: bool,
3835    ) -> Result<ReturnCode> {
3836        let state = store.concurrent_state_mut();
3837        let transmit = state.get_mut(transmit_id)?;
3838        log::trace!(
3839            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3840            transmit.read,
3841            transmit.write
3842        );
3843
3844        let code = if let Some(event) =
3845            Waitable::Transmit(transmit.read_handle).take_event(state)?
3846        {
3847            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3848                unreachable!();
3849            };
3850            match (code, event) {
3851                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3852                    ReturnCode::Cancelled(count)
3853                }
3854                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3855                _ => unreachable!(),
3856            }
3857        } else if let WriteState::HostReady {
3858            cancel,
3859            cancel_waker,
3860            ..
3861        } = &mut state.get_mut(transmit_id)?.write
3862        {
3863            *cancel = true;
3864            if let Some(waker) = cancel_waker.take() {
3865                waker.wake();
3866            }
3867
3868            if async_ {
3869                ReturnCode::Blocked
3870            } else {
3871                let handle = store
3872                    .concurrent_state_mut()
3873                    .get_mut(transmit_id)?
3874                    .read_handle;
3875                self.wait_for_read(store, handle)?
3876            }
3877        } else {
3878            ReturnCode::Cancelled(0)
3879        };
3880
3881        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3882
3883        match &transmit.read {
3884            ReadState::GuestReady { .. } => {
3885                transmit.read = ReadState::Open;
3886            }
3887            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3888                todo!("support host read cancellation")
3889            }
3890            ReadState::Open | ReadState::Dropped => {}
3891        }
3892
3893        log::trace!("cancelled read {transmit_id:?}: {code:?}");
3894
3895        Ok(code)
3896    }
3897
3898    /// Cancel a pending write for the specified stream or future from the guest.
3899    fn guest_cancel_write(
3900        self,
3901        store: &mut StoreOpaque,
3902        ty: TransmitIndex,
3903        async_: bool,
3904        writer: u32,
3905    ) -> Result<ReturnCode> {
3906        if !async_ {
3907            // The caller may only sync call `{stream,future}.cancel-write` from
3908            // an async task (i.e. a task created via a call to an async
3909            // export).  Otherwise, we'll trap.
3910            store.check_blocking()?;
3911        }
3912
3913        let (rep, state) =
3914            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3915        let id = TableId::<TransmitHandle>::new(rep);
3916        log::trace!("guest cancel write {id:?} (handle {writer})");
3917        match state {
3918            TransmitLocalState::Write { .. } => {
3919                bail!("stream or future write cancelled when no write is pending")
3920            }
3921            TransmitLocalState::Read { .. } => {
3922                bail!("passed read end to `{{stream|future}}.cancel-write`")
3923            }
3924            TransmitLocalState::Busy => {}
3925        }
3926        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3927        let code = self.cancel_write(store, transmit_id, async_)?;
3928        if !matches!(code, ReturnCode::Blocked) {
3929            let state =
3930                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3931                    .1;
3932            if let TransmitLocalState::Busy = state {
3933                *state = TransmitLocalState::Write { done: false };
3934            }
3935        }
3936        Ok(code)
3937    }
3938
3939    /// Cancel a pending read for the specified stream or future from the guest.
3940    fn guest_cancel_read(
3941        self,
3942        store: &mut StoreOpaque,
3943        ty: TransmitIndex,
3944        async_: bool,
3945        reader: u32,
3946    ) -> Result<ReturnCode> {
3947        if !async_ {
3948            // The caller may only sync call `{stream,future}.cancel-read` from
3949            // an async task (i.e. a task created via a call to an async
3950            // export).  Otherwise, we'll trap.
3951            store.check_blocking()?;
3952        }
3953
3954        let (rep, state) =
3955            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3956        let id = TableId::<TransmitHandle>::new(rep);
3957        log::trace!("guest cancel read {id:?} (handle {reader})");
3958        match state {
3959            TransmitLocalState::Read { .. } => {
3960                bail!("stream or future read cancelled when no read is pending")
3961            }
3962            TransmitLocalState::Write { .. } => {
3963                bail!("passed write end to `{{stream|future}}.cancel-read`")
3964            }
3965            TransmitLocalState::Busy => {}
3966        }
3967        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3968        let code = self.cancel_read(store, transmit_id, async_)?;
3969        if !matches!(code, ReturnCode::Blocked) {
3970            let state =
3971                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3972                    .1;
3973            if let TransmitLocalState::Busy = state {
3974                *state = TransmitLocalState::Read { done: false };
3975            }
3976        }
3977        Ok(code)
3978    }
3979
3980    /// Drop the readable end of the specified stream or future from the guest.
3981    fn guest_drop_readable(
3982        self,
3983        store: &mut StoreOpaque,
3984        ty: TransmitIndex,
3985        reader: u32,
3986    ) -> Result<()> {
3987        let table = self.id().get_mut(store).table_for_transmit(ty);
3988        let (rep, _is_done) = match ty {
3989            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3990            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3991        };
3992        let kind = match ty {
3993            TransmitIndex::Stream(_) => TransmitKind::Stream,
3994            TransmitIndex::Future(_) => TransmitKind::Future,
3995        };
3996        let id = TableId::<TransmitHandle>::new(rep);
3997        log::trace!("guest_drop_readable: drop reader {id:?}");
3998        store.host_drop_reader(id, kind)
3999    }
4000
4001    /// Create a new error context for the given component.
4002    pub(crate) fn error_context_new(
4003        self,
4004        store: &mut StoreOpaque,
4005        caller: RuntimeComponentInstanceIndex,
4006        ty: TypeComponentLocalErrorContextTableIndex,
4007        options: OptionsIndex,
4008        debug_msg_address: u32,
4009        debug_msg_len: u32,
4010    ) -> Result<u32> {
4011        self.id().get(store).check_may_leave(caller)?;
4012        let lift_ctx = &mut LiftContext::new(store, options, self);
4013        let debug_msg = String::linear_lift_from_flat(
4014            lift_ctx,
4015            InterfaceType::String,
4016            &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
4017        )?;
4018
4019        // Create a new ErrorContext that is tracked along with other concurrent state
4020        let err_ctx = ErrorContextState { debug_msg };
4021        let state = store.concurrent_state_mut();
4022        let table_id = state.push(err_ctx)?;
4023        let global_ref_count_idx =
4024            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
4025
4026        // Add to the global error context ref counts
4027        let _ = state
4028            .global_error_context_ref_counts
4029            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
4030
4031        // Error context are tracked both locally (to a single component instance) and globally
4032        // the counts for both must stay in sync.
4033        //
4034        // Here we reflect the newly created global concurrent error context state into the
4035        // component instance's locally tracked count, along with the appropriate key into the global
4036        // ref tracking data structures to enable later lookup
4037        let local_idx = self
4038            .id()
4039            .get_mut(store)
4040            .table_for_error_context(ty)
4041            .error_context_insert(table_id.rep())?;
4042
4043        Ok(local_idx)
4044    }
4045
4046    /// Retrieve the debug message from the specified error context.
4047    pub(super) fn error_context_debug_message<T>(
4048        self,
4049        store: StoreContextMut<T>,
4050        ty: TypeComponentLocalErrorContextTableIndex,
4051        options: OptionsIndex,
4052        err_ctx_handle: u32,
4053        debug_msg_address: u32,
4054    ) -> Result<()> {
4055        // Retrieve the error context and internal debug message
4056        let handle_table_id_rep = self
4057            .id()
4058            .get_mut(store.0)
4059            .table_for_error_context(ty)
4060            .error_context_rep(err_ctx_handle)?;
4061
4062        let state = store.0.concurrent_state_mut();
4063        // Get the state associated with the error context
4064        let ErrorContextState { debug_msg } =
4065            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
4066        let debug_msg = debug_msg.clone();
4067
4068        let lower_cx = &mut LowerContext::new(store, options, self);
4069        let debug_msg_address = usize::try_from(debug_msg_address)?;
4070        // Lower the string into the component's memory
4071        let offset = lower_cx
4072            .as_slice_mut()
4073            .get(debug_msg_address..)
4074            .and_then(|b| b.get(..debug_msg.bytes().len()))
4075            .map(|_| debug_msg_address)
4076            .ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
4077        debug_msg
4078            .as_str()
4079            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
4080
4081        Ok(())
4082    }
4083
4084    /// Implements the `future.cancel-read` intrinsic.
4085    pub(crate) fn future_cancel_read(
4086        self,
4087        store: &mut StoreOpaque,
4088        caller: RuntimeComponentInstanceIndex,
4089        ty: TypeFutureTableIndex,
4090        async_: bool,
4091        reader: u32,
4092    ) -> Result<u32> {
4093        self.id().get(store).check_may_leave(caller)?;
4094        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
4095            .map(|v| v.encode())
4096    }
4097
4098    /// Implements the `future.cancel-write` intrinsic.
4099    pub(crate) fn future_cancel_write(
4100        self,
4101        store: &mut StoreOpaque,
4102        caller: RuntimeComponentInstanceIndex,
4103        ty: TypeFutureTableIndex,
4104        async_: bool,
4105        writer: u32,
4106    ) -> Result<u32> {
4107        self.id().get(store).check_may_leave(caller)?;
4108        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
4109            .map(|v| v.encode())
4110    }
4111
4112    /// Implements the `stream.cancel-read` intrinsic.
4113    pub(crate) fn stream_cancel_read(
4114        self,
4115        store: &mut StoreOpaque,
4116        caller: RuntimeComponentInstanceIndex,
4117        ty: TypeStreamTableIndex,
4118        async_: bool,
4119        reader: u32,
4120    ) -> Result<u32> {
4121        self.id().get(store).check_may_leave(caller)?;
4122        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
4123            .map(|v| v.encode())
4124    }
4125
4126    /// Implements the `stream.cancel-write` intrinsic.
4127    pub(crate) fn stream_cancel_write(
4128        self,
4129        store: &mut StoreOpaque,
4130        caller: RuntimeComponentInstanceIndex,
4131        ty: TypeStreamTableIndex,
4132        async_: bool,
4133        writer: u32,
4134    ) -> Result<u32> {
4135        self.id().get(store).check_may_leave(caller)?;
4136        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
4137            .map(|v| v.encode())
4138    }
4139
4140    /// Implements the `future.drop-readable` intrinsic.
4141    pub(crate) fn future_drop_readable(
4142        self,
4143        store: &mut StoreOpaque,
4144        caller: RuntimeComponentInstanceIndex,
4145        ty: TypeFutureTableIndex,
4146        reader: u32,
4147    ) -> Result<()> {
4148        self.id().get(store).check_may_leave(caller)?;
4149        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
4150    }
4151
4152    /// Implements the `stream.drop-readable` intrinsic.
4153    pub(crate) fn stream_drop_readable(
4154        self,
4155        store: &mut StoreOpaque,
4156        caller: RuntimeComponentInstanceIndex,
4157        ty: TypeStreamTableIndex,
4158        reader: u32,
4159    ) -> Result<()> {
4160        self.id().get(store).check_may_leave(caller)?;
4161        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
4162    }
4163
4164    /// Allocate a new future or stream and grant ownership of both the read and
4165    /// write ends to the (sub-)component instance to which the specified
4166    /// `TransmitIndex` belongs.
4167    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
4168        let (write, read) = store
4169            .concurrent_state_mut()
4170            .new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
4171
4172        let table = self.id().get_mut(store).table_for_transmit(ty);
4173        let (read_handle, write_handle) = match ty {
4174            TransmitIndex::Future(ty) => (
4175                table.future_insert_read(ty, read.rep())?,
4176                table.future_insert_write(ty, write.rep())?,
4177            ),
4178            TransmitIndex::Stream(ty) => (
4179                table.stream_insert_read(ty, read.rep())?,
4180                table.stream_insert_write(ty, write.rep())?,
4181            ),
4182        };
4183
4184        let state = store.concurrent_state_mut();
4185        state.get_mut(read)?.common.handle = Some(read_handle);
4186        state.get_mut(write)?.common.handle = Some(write_handle);
4187
4188        Ok(ResourcePair {
4189            write: write_handle,
4190            read: read_handle,
4191        })
4192    }
4193
4194    /// Drop the specified error context.
4195    pub(crate) fn error_context_drop(
4196        self,
4197        store: &mut StoreOpaque,
4198        caller: RuntimeComponentInstanceIndex,
4199        ty: TypeComponentLocalErrorContextTableIndex,
4200        error_context: u32,
4201    ) -> Result<()> {
4202        let instance = self.id().get_mut(store);
4203        instance.check_may_leave(caller)?;
4204
4205        let local_handle_table = instance.table_for_error_context(ty);
4206
4207        let rep = local_handle_table.error_context_drop(error_context)?;
4208
4209        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4210
4211        let state = store.concurrent_state_mut();
4212        let GlobalErrorContextRefCount(global_ref_count) = state
4213            .global_error_context_ref_counts
4214            .get_mut(&global_ref_count_idx)
4215            .expect("retrieve concurrent state for error context during drop");
4216
4217        // Reduce the component-global ref count, removing tracking if necessary
4218        assert!(*global_ref_count >= 1);
4219        *global_ref_count -= 1;
4220        if *global_ref_count == 0 {
4221            state
4222                .global_error_context_ref_counts
4223                .remove(&global_ref_count_idx);
4224
4225            state
4226                .delete(TableId::<ErrorContextState>::new(rep))
4227                .context("deleting component-global error context data")?;
4228        }
4229
4230        Ok(())
4231    }
4232
4233    /// Transfer ownership of the specified stream or future read end from one
4234    /// guest to another.
4235    fn guest_transfer(
4236        self,
4237        store: &mut StoreOpaque,
4238        src_idx: u32,
4239        src: TransmitIndex,
4240        dst: TransmitIndex,
4241    ) -> Result<u32> {
4242        let mut instance = self.id().get_mut(store);
4243        let src_table = instance.as_mut().table_for_transmit(src);
4244        let (rep, is_done) = match src {
4245            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4246            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4247        };
4248        if is_done {
4249            bail!("cannot lift after being notified that the writable end dropped");
4250        }
4251        let dst_table = instance.table_for_transmit(dst);
4252        let handle = match dst {
4253            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4254            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4255        }?;
4256        store
4257            .concurrent_state_mut()
4258            .get_mut(TableId::<TransmitHandle>::new(rep))?
4259            .common
4260            .handle = Some(handle);
4261        Ok(handle)
4262    }
4263
4264    /// Implements the `future.new` intrinsic.
4265    pub(crate) fn future_new(
4266        self,
4267        store: &mut StoreOpaque,
4268        caller: RuntimeComponentInstanceIndex,
4269        ty: TypeFutureTableIndex,
4270    ) -> Result<ResourcePair> {
4271        self.id().get(store).check_may_leave(caller)?;
4272        self.guest_new(store, TransmitIndex::Future(ty))
4273    }
4274
4275    /// Implements the `stream.new` intrinsic.
4276    pub(crate) fn stream_new(
4277        self,
4278        store: &mut StoreOpaque,
4279        caller: RuntimeComponentInstanceIndex,
4280        ty: TypeStreamTableIndex,
4281    ) -> Result<ResourcePair> {
4282        self.id().get(store).check_may_leave(caller)?;
4283        self.guest_new(store, TransmitIndex::Stream(ty))
4284    }
4285
4286    /// Transfer ownership of the specified future read end from one guest to
4287    /// another.
4288    pub(crate) fn future_transfer(
4289        self,
4290        store: &mut StoreOpaque,
4291        src_idx: u32,
4292        src: TypeFutureTableIndex,
4293        dst: TypeFutureTableIndex,
4294    ) -> Result<u32> {
4295        self.guest_transfer(
4296            store,
4297            src_idx,
4298            TransmitIndex::Future(src),
4299            TransmitIndex::Future(dst),
4300        )
4301    }
4302
4303    /// Transfer ownership of the specified stream read end from one guest to
4304    /// another.
4305    pub(crate) fn stream_transfer(
4306        self,
4307        store: &mut StoreOpaque,
4308        src_idx: u32,
4309        src: TypeStreamTableIndex,
4310        dst: TypeStreamTableIndex,
4311    ) -> Result<u32> {
4312        self.guest_transfer(
4313            store,
4314            src_idx,
4315            TransmitIndex::Stream(src),
4316            TransmitIndex::Stream(dst),
4317        )
4318    }
4319
4320    /// Copy the specified error context from one component to another.
4321    pub(crate) fn error_context_transfer(
4322        self,
4323        store: &mut StoreOpaque,
4324        src_idx: u32,
4325        src: TypeComponentLocalErrorContextTableIndex,
4326        dst: TypeComponentLocalErrorContextTableIndex,
4327    ) -> Result<u32> {
4328        let mut instance = self.id().get_mut(store);
4329        let rep = instance
4330            .as_mut()
4331            .table_for_error_context(src)
4332            .error_context_rep(src_idx)?;
4333        let dst_idx = instance
4334            .table_for_error_context(dst)
4335            .error_context_insert(rep)?;
4336
4337        // Update the global (cross-subcomponent) count for error contexts
4338        // as the new component has essentially created a new reference that will
4339        // be dropped/handled independently
4340        let global_ref_count = store
4341            .concurrent_state_mut()
4342            .global_error_context_ref_counts
4343            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4344            .context("global ref count present for existing (sub)component error context")?;
4345        global_ref_count.0 += 1;
4346
4347        Ok(dst_idx)
4348    }
4349}
4350
4351impl ComponentInstance {
4352    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4353        let (states, types) = self.instance_states();
4354        let runtime_instance = match ty {
4355            TransmitIndex::Stream(ty) => types[ty].instance,
4356            TransmitIndex::Future(ty) => types[ty].instance,
4357        };
4358        states[runtime_instance].handle_table()
4359    }
4360
4361    fn table_for_error_context(
4362        self: Pin<&mut Self>,
4363        ty: TypeComponentLocalErrorContextTableIndex,
4364    ) -> &mut HandleTable {
4365        let (states, types) = self.instance_states();
4366        let runtime_instance = types[ty].instance;
4367        states[runtime_instance].handle_table()
4368    }
4369
4370    fn get_mut_by_index(
4371        self: Pin<&mut Self>,
4372        ty: TransmitIndex,
4373        index: u32,
4374    ) -> Result<(u32, &mut TransmitLocalState)> {
4375        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4376    }
4377}
4378
4379impl ConcurrentState {
4380    fn send_write_result(
4381        &mut self,
4382        ty: TransmitIndex,
4383        id: TableId<TransmitState>,
4384        handle: u32,
4385        code: ReturnCode,
4386    ) -> Result<()> {
4387        let write_handle = self.get_mut(id)?.write_handle.rep();
4388        self.set_event(
4389            write_handle,
4390            match ty {
4391                TransmitIndex::Future(ty) => Event::FutureWrite {
4392                    code,
4393                    pending: Some((ty, handle)),
4394                },
4395                TransmitIndex::Stream(ty) => Event::StreamWrite {
4396                    code,
4397                    pending: Some((ty, handle)),
4398                },
4399            },
4400        )
4401    }
4402
4403    fn send_read_result(
4404        &mut self,
4405        ty: TransmitIndex,
4406        id: TableId<TransmitState>,
4407        handle: u32,
4408        code: ReturnCode,
4409    ) -> Result<()> {
4410        let read_handle = self.get_mut(id)?.read_handle.rep();
4411        self.set_event(
4412            read_handle,
4413            match ty {
4414                TransmitIndex::Future(ty) => Event::FutureRead {
4415                    code,
4416                    pending: Some((ty, handle)),
4417                },
4418                TransmitIndex::Stream(ty) => Event::StreamRead {
4419                    code,
4420                    pending: Some((ty, handle)),
4421                },
4422            },
4423        )
4424    }
4425
4426    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4427        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4428    }
4429
4430    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4431        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4432    }
4433
4434    /// Set or update the event for the specified waitable.
4435    ///
4436    /// If there is already an event set for this waitable, we assert that it is
4437    /// of the same variant as the new one and reuse the `ReturnCode` count and
4438    /// the `pending` field if applicable.
4439    // TODO: This is a bit awkward due to how
4440    // `Event::{Stream,Future}{Write,Read}` and
4441    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4442    // Consider updating those representations in a way that allows this
4443    // function to be simplified.
4444    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4445        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4446
4447        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4448            let (ReturnCode::Completed(count)
4449            | ReturnCode::Dropped(count)
4450            | ReturnCode::Cancelled(count)) = old
4451            else {
4452                unreachable!()
4453            };
4454
4455            match new {
4456                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4457                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4458                _ => unreachable!(),
4459            }
4460        }
4461
4462        let event = match (waitable.take_event(self)?, event) {
4463            (None, _) => event,
4464            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4465            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4466            (
4467                Some(Event::StreamWrite {
4468                    code: old_code,
4469                    pending: old_pending,
4470                }),
4471                Event::StreamWrite { code, pending },
4472            ) => Event::StreamWrite {
4473                code: update_code(old_code, code),
4474                pending: old_pending.or(pending),
4475            },
4476            (
4477                Some(Event::StreamRead {
4478                    code: old_code,
4479                    pending: old_pending,
4480                }),
4481                Event::StreamRead { code, pending },
4482            ) => Event::StreamRead {
4483                code: update_code(old_code, code),
4484                pending: old_pending.or(pending),
4485            },
4486            _ => unreachable!(),
4487        };
4488
4489        waitable.set_event(self, Some(event))
4490    }
4491
4492    /// Allocate a new future or stream, including the `TransmitState` and the
4493    /// `TransmitHandle`s corresponding to the read and write ends.
4494    fn new_transmit(
4495        &mut self,
4496        origin: TransmitOrigin,
4497    ) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4498        let state_id = self.push(TransmitState::new(origin))?;
4499
4500        let write = self.push(TransmitHandle::new(state_id))?;
4501        let read = self.push(TransmitHandle::new(state_id))?;
4502
4503        let state = self.get_mut(state_id)?;
4504        state.write_handle = write;
4505        state.read_handle = read;
4506
4507        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4508
4509        Ok((write, read))
4510    }
4511
4512    /// Delete the specified future or stream, including the read and write ends.
4513    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4514        let state = self.delete(state_id)?;
4515        self.delete(state.write_handle)?;
4516        self.delete(state.read_handle)?;
4517
4518        log::trace!(
4519            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4520            state.write_handle,
4521            state.read_handle,
4522        );
4523
4524        Ok(())
4525    }
4526}
4527
4528pub(crate) struct ResourcePair {
4529    pub(crate) write: u32,
4530    pub(crate) read: u32,
4531}
4532
4533impl Waitable {
4534    /// Handle the imminent delivery of the specified event, e.g. by updating
4535    /// the state of the stream or future.
4536    pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4537        match event {
4538            Event::FutureRead {
4539                pending: Some((ty, handle)),
4540                ..
4541            }
4542            | Event::FutureWrite {
4543                pending: Some((ty, handle)),
4544                ..
4545            } => {
4546                let instance = instance.id().get_mut(store);
4547                let runtime_instance = instance.component().types()[ty].instance;
4548                let (rep, state) = instance.instance_states().0[runtime_instance]
4549                    .handle_table()
4550                    .future_rep(ty, handle)
4551                    .unwrap();
4552                assert_eq!(rep, self.rep());
4553                assert_eq!(*state, TransmitLocalState::Busy);
4554                *state = match event {
4555                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4556                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4557                    _ => unreachable!(),
4558                };
4559            }
4560            Event::StreamRead {
4561                pending: Some((ty, handle)),
4562                code,
4563            }
4564            | Event::StreamWrite {
4565                pending: Some((ty, handle)),
4566                code,
4567            } => {
4568                let instance = instance.id().get_mut(store);
4569                let runtime_instance = instance.component().types()[ty].instance;
4570                let (rep, state) = instance.instance_states().0[runtime_instance]
4571                    .handle_table()
4572                    .stream_rep(ty, handle)
4573                    .unwrap();
4574                assert_eq!(rep, self.rep());
4575                assert_eq!(*state, TransmitLocalState::Busy);
4576                let done = matches!(code, ReturnCode::Dropped(_));
4577                *state = match event {
4578                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
4579                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4580                    _ => unreachable!(),
4581                };
4582
4583                let transmit_handle = TableId::<TransmitHandle>::new(rep);
4584                let state = store.concurrent_state_mut();
4585                let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4586                let transmit = state.get_mut(transmit_id).unwrap();
4587
4588                match event {
4589                    Event::StreamRead { .. } => {
4590                        transmit.read = ReadState::Open;
4591                    }
4592                    Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4593                    _ => unreachable!(),
4594                };
4595            }
4596            _ => {}
4597        }
4598    }
4599}
4600
4601/// Determine whether an intra-component read/write is allowed for the specified
4602/// `stream` or `future` payload type according to the component model
4603/// specification.
4604fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4605    matches!(
4606        ty,
4607        None | Some(
4608            InterfaceType::S8
4609                | InterfaceType::U8
4610                | InterfaceType::S16
4611                | InterfaceType::U16
4612                | InterfaceType::S32
4613                | InterfaceType::U32
4614                | InterfaceType::S64
4615                | InterfaceType::U64
4616                | InterfaceType::Float32
4617                | InterfaceType::Float64
4618        )
4619    )
4620}
4621
4622#[cfg(test)]
4623mod tests {
4624    use super::*;
4625    use crate::{Engine, Store};
4626    use core::future::pending;
4627    use core::pin::pin;
4628    use std::sync::LazyLock;
4629
4630    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4631
4632    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4633    where
4634        T: FutureProducer<()>,
4635    {
4636        rx.poll_produce(
4637            &mut Context::from_waker(Waker::noop()),
4638            Store::new(&ENGINE, ()).as_context_mut(),
4639            finish,
4640        )
4641    }
4642
4643    #[test]
4644    fn future_producer() {
4645        let mut fut = pin!(async { crate::error::Ok(()) });
4646        assert!(matches!(
4647            poll_future_producer(fut.as_mut(), false),
4648            Poll::Ready(Ok(Some(()))),
4649        ));
4650
4651        let mut fut = pin!(async { crate::error::Ok(()) });
4652        assert!(matches!(
4653            poll_future_producer(fut.as_mut(), true),
4654            Poll::Ready(Ok(Some(()))),
4655        ));
4656
4657        let mut fut = pin!(pending::<Result<()>>());
4658        assert!(matches!(
4659            poll_future_producer(fut.as_mut(), false),
4660            Poll::Pending,
4661        ));
4662        assert!(matches!(
4663            poll_future_producer(fut.as_mut(), true),
4664            Poll::Ready(Ok(None)),
4665        ));
4666
4667        let (tx, rx) = oneshot::channel();
4668        let mut rx = pin!(rx);
4669        assert!(matches!(
4670            poll_future_producer(rx.as_mut(), false),
4671            Poll::Pending,
4672        ));
4673        assert!(matches!(
4674            poll_future_producer(rx.as_mut(), true),
4675            Poll::Ready(Ok(None)),
4676        ));
4677        tx.send(()).unwrap();
4678        assert!(matches!(
4679            poll_future_producer(rx.as_mut(), true),
4680            Poll::Ready(Ok(Some(()))),
4681        ));
4682
4683        let (tx, rx) = oneshot::channel();
4684        let mut rx = pin!(rx);
4685        tx.send(()).unwrap();
4686        assert!(matches!(
4687            poll_future_producer(rx.as_mut(), false),
4688            Poll::Ready(Ok(Some(()))),
4689        ));
4690
4691        let (tx, rx) = oneshot::channel::<()>();
4692        let mut rx = pin!(rx);
4693        drop(tx);
4694        assert!(matches!(
4695            poll_future_producer(rx.as_mut(), false),
4696            Poll::Ready(Err(..)),
4697        ));
4698
4699        let (tx, rx) = oneshot::channel::<()>();
4700        let mut rx = pin!(rx);
4701        drop(tx);
4702        assert!(matches!(
4703            poll_future_producer(rx.as_mut(), true),
4704            Poll::Ready(Err(..)),
4705        ));
4706    }
4707}