wasmtime/runtime/component/concurrent/
futures_and_streams.rs

1use super::table::{TableDebug, TableId};
2use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
3use crate::component::concurrent::{ConcurrentState, WorkItem, tls};
4use crate::component::func::{self, LiftContext, LowerContext};
5use crate::component::matching::InstanceType;
6use crate::component::values::{ErrorContextAny, FutureAny, StreamAny};
7use crate::component::{AsAccessor, Instance, Lift, Lower, Val, WasmList};
8use crate::store::{StoreOpaque, StoreToken};
9use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
10use crate::vm::{AlwaysMut, VMStore};
11use crate::{AsContextMut, StoreContextMut, ValRaw};
12use anyhow::{Context as _, Error, Result, anyhow, bail};
13use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
14use core::fmt;
15use core::future;
16use core::iter;
17use core::marker::PhantomData;
18use core::mem::{self, MaybeUninit};
19use core::pin::Pin;
20use core::task::{Context, Poll, Waker, ready};
21use futures::channel::oneshot;
22use futures::{FutureExt as _, stream};
23use std::any::{Any, TypeId};
24use std::boxed::Box;
25use std::io::Cursor;
26use std::string::String;
27use std::sync::{Arc, Mutex};
28use std::vec::Vec;
29use wasmtime_environ::component::{
30    CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
31    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
32    TypeFutureTableIndex, TypeStreamTableIndex,
33};
34
35pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
36
37mod buffers;
38
39/// Enum for distinguishing between a stream or future in functions that handle
40/// both.
41#[derive(Copy, Clone, Debug)]
42pub enum TransmitKind {
43    Stream,
44    Future,
45}
46
47/// Represents `{stream,future}.{read,write}` results.
48#[derive(Copy, Clone, Debug, PartialEq)]
49pub enum ReturnCode {
50    Blocked,
51    Completed(u32),
52    Dropped(u32),
53    Cancelled(u32),
54}
55
56impl ReturnCode {
57    /// Pack `self` into a single 32-bit integer that may be returned to the
58    /// guest.
59    ///
60    /// This corresponds to `pack_copy_result` in the Component Model spec.
61    pub fn encode(&self) -> u32 {
62        const BLOCKED: u32 = 0xffff_ffff;
63        const COMPLETED: u32 = 0x0;
64        const DROPPED: u32 = 0x1;
65        const CANCELLED: u32 = 0x2;
66        match self {
67            ReturnCode::Blocked => BLOCKED,
68            ReturnCode::Completed(n) => {
69                debug_assert!(*n < (1 << 28));
70                (n << 4) | COMPLETED
71            }
72            ReturnCode::Dropped(n) => {
73                debug_assert!(*n < (1 << 28));
74                (n << 4) | DROPPED
75            }
76            ReturnCode::Cancelled(n) => {
77                debug_assert!(*n < (1 << 28));
78                (n << 4) | CANCELLED
79            }
80        }
81    }
82
83    /// Returns `Self::Completed` with the specified count (or zero if
84    /// `matches!(kind, TransmitKind::Future)`)
85    fn completed(kind: TransmitKind, count: u32) -> Self {
86        Self::Completed(if let TransmitKind::Future = kind {
87            0
88        } else {
89            count
90        })
91    }
92}
93
94/// Represents a stream or future type index.
95///
96/// This is useful as a parameter type for functions which operate on either a
97/// future or a stream.
98#[derive(Copy, Clone, Debug)]
99pub enum TransmitIndex {
100    Stream(TypeStreamTableIndex),
101    Future(TypeFutureTableIndex),
102}
103
104impl TransmitIndex {
105    pub fn kind(&self) -> TransmitKind {
106        match self {
107            TransmitIndex::Stream(_) => TransmitKind::Stream,
108            TransmitIndex::Future(_) => TransmitKind::Future,
109        }
110    }
111}
112
113/// Retrieve the payload type of the specified stream or future, or `None` if it
114/// has no payload type.
115fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
116    match ty {
117        TransmitIndex::Future(ty) => types[types[ty].ty].payload,
118        TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
119    }
120}
121
122/// Retrieve the host rep and state for the specified guest-visible waitable
123/// handle.
124fn get_mut_by_index_from(
125    handle_table: &mut HandleTable,
126    ty: TransmitIndex,
127    index: u32,
128) -> Result<(u32, &mut TransmitLocalState)> {
129    match ty {
130        TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
131        TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
132    }
133}
134
135fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
136    mut store: StoreContextMut<U>,
137    instance: Instance,
138    options: OptionsIndex,
139    ty: TransmitIndex,
140    address: usize,
141    count: usize,
142    buffer: &mut B,
143) -> Result<()> {
144    let count = buffer.remaining().len().min(count);
145
146    let lower = &mut if T::MAY_REQUIRE_REALLOC {
147        LowerContext::new
148    } else {
149        LowerContext::new_without_realloc
150    }(store.as_context_mut(), options, instance);
151
152    if address % usize::try_from(T::ALIGN32)? != 0 {
153        bail!("read pointer not aligned");
154    }
155    lower
156        .as_slice_mut()
157        .get_mut(address..)
158        .and_then(|b| b.get_mut(..T::SIZE32 * count))
159        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
160
161    if let Some(ty) = payload(ty, lower.types) {
162        T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
163    }
164
165    buffer.skip(count);
166
167    Ok(())
168}
169
170fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
171    lift: &mut LiftContext<'_>,
172    ty: Option<InterfaceType>,
173    buffer: &mut B,
174    address: usize,
175    count: usize,
176) -> Result<()> {
177    let count = count.min(buffer.remaining_capacity());
178    if T::IS_RUST_UNIT_TYPE {
179        // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a
180        // zero-sized type, so `MaybeUninit::uninit().assume_init()`
181        // is a valid way to populate the zero-sized buffer.
182        buffer.extend(
183            iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
184        )
185    } else {
186        let ty = ty.unwrap();
187        if address % usize::try_from(T::ALIGN32)? != 0 {
188            bail!("write pointer not aligned");
189        }
190        lift.memory()
191            .get(address..)
192            .and_then(|b| b.get(..T::SIZE32 * count))
193            .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
194
195        let list = &WasmList::new(address, count, lift, ty)?;
196        T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
197    }
198    Ok(())
199}
200
201/// Represents the state associated with an error context
202#[derive(Debug, PartialEq, Eq, PartialOrd)]
203pub(super) struct ErrorContextState {
204    /// Debug message associated with the error context
205    pub(crate) debug_msg: String,
206}
207
208/// Represents the size and alignment for a "flat" Component Model type,
209/// i.e. one containing no pointers or handles.
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub(super) struct FlatAbi {
212    pub(super) size: u32,
213    pub(super) align: u32,
214}
215
216/// Represents the buffer for a host- or guest-initiated stream read.
217pub struct Destination<'a, T, B> {
218    id: TableId<TransmitState>,
219    buffer: &'a mut B,
220    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
221    _phantom: PhantomData<fn() -> T>,
222}
223
224impl<'a, T, B> Destination<'a, T, B> {
225    /// Reborrow `self` so it can be used again later.
226    pub fn reborrow(&mut self) -> Destination<'_, T, B> {
227        Destination {
228            id: self.id,
229            buffer: &mut *self.buffer,
230            host_buffer: self.host_buffer.as_deref_mut(),
231            _phantom: PhantomData,
232        }
233    }
234
235    /// Take the buffer out of `self`, leaving a default-initialized one in its
236    /// place.
237    ///
238    /// This can be useful for reusing the previously-stored buffer's capacity
239    /// instead of allocating a fresh one.
240    pub fn take_buffer(&mut self) -> B
241    where
242        B: Default,
243    {
244        mem::take(self.buffer)
245    }
246
247    /// Store the specified buffer in `self`.
248    ///
249    /// Any items contained in the buffer will be delivered to the reader after
250    /// the `StreamProducer::poll_produce` call to which this `Destination` was
251    /// passed returns (unless overwritten by another call to `set_buffer`).
252    ///
253    /// If items are stored via this buffer _and_ written via a
254    /// `DirectDestination` view of `self`, then the items in the buffer will be
255    /// delivered after the ones written using `DirectDestination`.
256    pub fn set_buffer(&mut self, buffer: B) {
257        *self.buffer = buffer;
258    }
259
260    /// Return the remaining number of items the current read has capacity to
261    /// accept, if known.
262    ///
263    /// This will return `Some(_)` if the reader is a guest; it will return
264    /// `None` if the reader is the host.
265    ///
266    /// Note that this can return `Some(0)`. This means that the guest is
267    /// attempting to perform a zero-length read which typically means that it's
268    /// trying to wait for this stream to be ready-to-read but is not actually
269    /// ready to receive the items yet. The host in this case is allowed to
270    /// either block waiting for readiness or immediately complete the
271    /// operation. The guest is expected to handle both cases. Some more
272    /// discussion about this case can be found in the discussion of ["Stream
273    /// Readiness" in the component-model repo][docs].
274    ///
275    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
276    pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
277        let transmit = store
278            .as_context_mut()
279            .0
280            .concurrent_state_mut()
281            .get_mut(self.id)
282            .unwrap();
283
284        if let &ReadState::GuestReady { count, .. } = &transmit.read {
285            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
286                unreachable!()
287            };
288
289            Some(count - guest_offset)
290        } else {
291            None
292        }
293    }
294}
295
296impl<'a, B> Destination<'a, u8, B> {
297    /// Return a `DirectDestination` view of `self`.
298    ///
299    /// If the reader is a guest, this will provide direct access to the guest's
300    /// read buffer.  If the reader is a host, this will provide access to a
301    /// buffer which will be delivered to the host before any items stored using
302    /// `Destination::set_buffer`.
303    ///
304    /// `capacity` will only be used if the reader is a host, in which case it
305    /// will update the length of the buffer, possibly zero-initializing the new
306    /// elements if the new length is larger than the old length.
307    pub fn as_direct<D>(
308        mut self,
309        store: StoreContextMut<'a, D>,
310        capacity: usize,
311    ) -> DirectDestination<'a, D> {
312        if let Some(buffer) = self.host_buffer.as_deref_mut() {
313            buffer.set_position(0);
314            if buffer.get_mut().is_empty() {
315                buffer.get_mut().resize(capacity, 0);
316            }
317        }
318
319        DirectDestination {
320            id: self.id,
321            host_buffer: self.host_buffer,
322            store,
323        }
324    }
325}
326
327/// Represents a read from a `stream<u8>`, providing direct access to the
328/// writer's buffer.
329pub struct DirectDestination<'a, D: 'static> {
330    id: TableId<TransmitState>,
331    host_buffer: Option<&'a mut Cursor<Vec<u8>>>,
332    store: StoreContextMut<'a, D>,
333}
334
335impl<D: 'static> DirectDestination<'_, D> {
336    /// Provide direct access to the writer's buffer.
337    pub fn remaining(&mut self) -> &mut [u8] {
338        if let Some(buffer) = self.host_buffer.as_deref_mut() {
339            buffer.get_mut()
340        } else {
341            let transmit = self
342                .store
343                .as_context_mut()
344                .0
345                .concurrent_state_mut()
346                .get_mut(self.id)
347                .unwrap();
348
349            let &ReadState::GuestReady {
350                address,
351                count,
352                options,
353                instance,
354                ..
355            } = &transmit.read
356            else {
357                unreachable!();
358            };
359
360            let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
361                unreachable!()
362            };
363
364            instance
365                .options_memory_mut(self.store.0, options)
366                .get_mut((address + guest_offset)..)
367                .and_then(|b| b.get_mut(..(count - guest_offset)))
368                .unwrap()
369        }
370    }
371
372    /// Mark the specified number of bytes as written to the writer's buffer.
373    ///
374    /// This will panic if the count is larger than the size of the
375    /// buffer returned by `Self::remaining`.
376    pub fn mark_written(&mut self, count: usize) {
377        if let Some(buffer) = self.host_buffer.as_deref_mut() {
378            buffer.set_position(
379                buffer
380                    .position()
381                    .checked_add(u64::try_from(count).unwrap())
382                    .unwrap(),
383            );
384        } else {
385            let transmit = self
386                .store
387                .as_context_mut()
388                .0
389                .concurrent_state_mut()
390                .get_mut(self.id)
391                .unwrap();
392
393            let ReadState::GuestReady {
394                count: read_count, ..
395            } = &transmit.read
396            else {
397                unreachable!();
398            };
399
400            let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
401                unreachable!()
402            };
403
404            if *guest_offset + count > *read_count {
405                panic!(
406                    "write count ({count}) must be less than or equal to read count ({read_count})"
407                )
408            } else {
409                *guest_offset += count;
410            }
411        }
412    }
413}
414
415/// Represents the state of a `Stream{Producer,Consumer}`.
416#[derive(Copy, Clone, Debug)]
417pub enum StreamResult {
418    /// The operation completed normally, and the producer or consumer may be
419    /// able to produce or consume more items, respectively.
420    Completed,
421    /// The operation was interrupted (i.e. it wrapped up early after receiving
422    /// a `finish` parameter value of true in a call to `poll_produce` or
423    /// `poll_consume`), and the producer or consumer may be able to produce or
424    /// consume more items, respectively.
425    Cancelled,
426    /// The operation completed normally, but the producer or consumer will
427    /// _not_ able to produce or consume more items, respectively.
428    Dropped,
429}
430
431/// Represents the host-owned write end of a stream.
432pub trait StreamProducer<D>: Send + 'static {
433    /// The payload type of this stream.
434    type Item;
435
436    /// The `WriteBuffer` type to use when delivering items.
437    type Buffer: WriteBuffer<Self::Item> + Default;
438
439    /// Handle a host- or guest-initiated read by delivering zero or more items
440    /// to the specified destination.
441    ///
442    /// This will be called whenever the reader starts a read.
443    ///
444    /// # Arguments
445    ///
446    /// * `self` - a `Pin`'d version of self to perform Rust-level
447    ///   future-related operations on.
448    /// * `cx` - a Rust-related [`Context`] which is passed to other
449    ///   future-related operations or used to acquire a waker.
450    /// * `store` - the Wasmtime store that this operation is happening within.
451    ///   Used, for example, to consult the state `D` associated with the store.
452    /// * `destination` - the location that items are to be written to.
453    /// * `finish` - a flag indicating whether the host should strive to
454    ///   immediately complete/cancel any pending operation. See below for more
455    ///   details.
456    ///
457    /// # Behavior
458    ///
459    /// If the implementation is able to produce one or more items immediately,
460    /// it should write them to `destination` and return either
461    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to produce more
462    /// items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot produce
463    /// any more items.
464    ///
465    /// If the implementation is unable to produce any items immediately, but
466    /// expects to do so later, and `finish` is _false_, it should store the
467    /// waker from `cx` for later and return `Poll::Pending` without writing
468    /// anything to `destination`.  Later, it should alert the waker when either
469    /// the items arrive, the stream has ended, or an error occurs.
470    ///
471    /// If more items are written to `destination` than the reader has immediate
472    /// capacity to accept, they will be retained in memory by the caller and
473    /// used to satisfy future reads, in which case `poll_produce` will only be
474    /// called again once all those items have been delivered.
475    ///
476    /// # Zero-length reads
477    ///
478    /// This function may be called with a zero-length capacity buffer
479    /// (i.e. `Destination::remaining` returns `Some(0)`). This indicates that
480    /// the guest wants to wait to see if an item is ready without actually
481    /// reading the item. For example think of a UNIX `poll` function run on a
482    /// TCP stream, seeing if it's readable without actually reading it.
483    ///
484    /// In this situation the host is allowed to either return immediately or
485    /// wait for readiness. Note that waiting for readiness is not always
486    /// possible. For example it's impossible to test if a Rust-native `Future`
487    /// is ready without actually reading the item. Stream-specific
488    /// optimizations, such as testing if a TCP stream is readable, may be
489    /// possible however.
490    ///
491    /// For a zero-length read, the host is allowed to:
492    ///
493    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without writing
494    ///   anything if it expects to be able to produce items immediately (i.e.
495    ///   without first returning `Poll::Pending`) the next time `poll_produce`
496    ///   is called with non-zero capacity. This is the best-case scenario of
497    ///   fulfilling the guest's desire -- items aren't read/buffered but the
498    ///   host is saying it's ready when the guest is.
499    ///
500    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` without actually
501    ///   testing for readiness. The guest doesn't know this yet, but the guest
502    ///   will realize that zero-length reads won't work on this stream when a
503    ///   subsequent nonzero read attempt is made which returns `Poll::Pending`
504    ///   here.
505    ///
506    /// - Return `Poll::Pending` if the host has performed necessary async work
507    ///   to wait for this stream to be readable without actually reading
508    ///   anything. This is also a best-case scenario where the host is letting
509    ///   the guest know that nothing is ready yet. Later the zero-length read
510    ///   will complete and then the guest will attempt a nonzero-length read to
511    ///   actually read some bytes.
512    ///
513    /// - Return `Poll::Ready(Ok(StreamResult::Completed))` after calling
514    ///   `Destination::set_buffer` with one more more items. Note, however,
515    ///   that this creates the hazard that the items will never be received by
516    ///   the guest if it decides not to do another non-zero-length read before
517    ///   closing the stream.  Moreover, if `Self::Item` is e.g. a
518    ///   `Resource<_>`, they may end up leaking in that scenario. It is not
519    ///   recommended to do this and it's better to return
520    ///   `StreamResult::Completed` without buffering anything instead.
521    ///
522    /// For more discussion on zero-length reads see the [documentation in the
523    /// component-model repo itself][docs].
524    ///
525    /// [docs]: https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md#stream-readiness
526    ///
527    /// # Return
528    ///
529    /// This function can return a number of possible cases from this function:
530    ///
531    /// * `Poll::Pending` - this operation cannot complete at this time. The
532    ///   Rust-level `Future::poll` contract applies here where a waker should
533    ///   be stored from the `cx` argument and be arranged to receive a
534    ///   notification when this implementation can make progress. For example
535    ///   if you call `Future::poll` on a sub-future, that's enough. If items
536    ///   were written to `destination` then a trap in the guest will be raised.
537    ///
538    ///   Note that implementations should strive to avoid this return value
539    ///   when `finish` is `true`. In such a situation the guest is attempting
540    ///   to, for example, cancel a previous operation. By returning
541    ///   `Poll::Pending` the guest will be blocked during the cancellation
542    ///   request. If `finish` is `true` then `StreamResult::Cancelled` is
543    ///   favored to indicate that no items were read. If a short read happened,
544    ///   however, it's ok to return `StreamResult::Completed` indicating some
545    ///   items were read.
546    ///
547    /// * `Poll::Ok(StreamResult::Completed)` - items, if applicable, were
548    ///   written to the `destination`.
549    ///
550    /// * `Poll::Ok(StreamResult::Cancelled)` - used when `finish` is `true` and
551    ///   the implementation was able to successfully cancel any async work that
552    ///   a previous read kicked off, if any. The host should not buffer values
553    ///   received after returning `Cancelled` because the guest will not be
554    ///   aware of these values and the guest could close the stream after
555    ///   cancelling a read. Hosts should only return `Cancelled` when there are
556    ///   no more async operations in flight for a previous read.
557    ///
558    ///   If items were written to `destination` then a trap in the guest will
559    ///   be raised. If `finish` is `false` then this return value will raise a
560    ///   trap in the guest.
561    ///
562    /// * `Poll::Ok(StreamResult::Dropped)` - end-of-stream marker, indicating
563    ///   that this producer should not be polled again. Note that items may
564    ///   still be written to `destination`.
565    ///
566    /// # Errors
567    ///
568    /// The implementation may alternatively choose to return `Err(_)` to
569    /// indicate an unrecoverable error. This will cause the guest (if any) to
570    /// trap and render the component instance (if any) unusable. The
571    /// implementation should report errors that _are_ recoverable by other
572    /// means (e.g. by writing to a `future`) and return
573    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
574    fn poll_produce<'a>(
575        self: Pin<&mut Self>,
576        cx: &mut Context<'_>,
577        store: StoreContextMut<'a, D>,
578        destination: Destination<'a, Self::Item, Self::Buffer>,
579        finish: bool,
580    ) -> Poll<Result<StreamResult>>;
581
582    /// Attempt to convert the specified object into a `Box<dyn Any>` which may
583    /// be downcast to the specified type.
584    ///
585    /// The implementation must ensure that, if it returns `Ok(_)`, a downcast
586    /// to the specified type is guaranteed to succeed.
587    fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
588        Err(me)
589    }
590}
591
592impl<T, D> StreamProducer<D> for iter::Empty<T>
593where
594    T: Send + Sync + 'static,
595{
596    type Item = T;
597    type Buffer = Option<Self::Item>;
598
599    fn poll_produce<'a>(
600        self: Pin<&mut Self>,
601        _: &mut Context<'_>,
602        _: StoreContextMut<'a, D>,
603        _: Destination<'a, Self::Item, Self::Buffer>,
604        _: bool,
605    ) -> Poll<Result<StreamResult>> {
606        Poll::Ready(Ok(StreamResult::Dropped))
607    }
608}
609
610impl<T, D> StreamProducer<D> for stream::Empty<T>
611where
612    T: Send + Sync + 'static,
613{
614    type Item = T;
615    type Buffer = Option<Self::Item>;
616
617    fn poll_produce<'a>(
618        self: Pin<&mut Self>,
619        _: &mut Context<'_>,
620        _: StoreContextMut<'a, D>,
621        _: Destination<'a, Self::Item, Self::Buffer>,
622        _: bool,
623    ) -> Poll<Result<StreamResult>> {
624        Poll::Ready(Ok(StreamResult::Dropped))
625    }
626}
627
628impl<T, D> StreamProducer<D> for Vec<T>
629where
630    T: Unpin + Send + Sync + 'static,
631{
632    type Item = T;
633    type Buffer = VecBuffer<T>;
634
635    fn poll_produce<'a>(
636        self: Pin<&mut Self>,
637        _: &mut Context<'_>,
638        _: StoreContextMut<'a, D>,
639        mut dst: Destination<'a, Self::Item, Self::Buffer>,
640        _: bool,
641    ) -> Poll<Result<StreamResult>> {
642        dst.set_buffer(mem::take(self.get_mut()).into());
643        Poll::Ready(Ok(StreamResult::Dropped))
644    }
645}
646
647impl<T, D> StreamProducer<D> for Box<[T]>
648where
649    T: Unpin + Send + Sync + 'static,
650{
651    type Item = T;
652    type Buffer = VecBuffer<T>;
653
654    fn poll_produce<'a>(
655        self: Pin<&mut Self>,
656        _: &mut Context<'_>,
657        _: StoreContextMut<'a, D>,
658        mut dst: Destination<'a, Self::Item, Self::Buffer>,
659        _: bool,
660    ) -> Poll<Result<StreamResult>> {
661        dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
662        Poll::Ready(Ok(StreamResult::Dropped))
663    }
664}
665
666#[cfg(feature = "component-model-async-bytes")]
667impl<D> StreamProducer<D> for bytes::Bytes {
668    type Item = u8;
669    type Buffer = Cursor<Self>;
670
671    fn poll_produce<'a>(
672        mut self: Pin<&mut Self>,
673        _: &mut Context<'_>,
674        mut store: StoreContextMut<'a, D>,
675        mut dst: Destination<'a, Self::Item, Self::Buffer>,
676        _: bool,
677    ) -> Poll<Result<StreamResult>> {
678        let cap = dst.remaining(&mut store);
679        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
680            // on 0-length or host reads, buffer the bytes
681            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
682            return Poll::Ready(Ok(StreamResult::Dropped));
683        };
684        let cap = cap.into();
685        // data does not fit in destination, fill it and buffer the rest
686        dst.set_buffer(Cursor::new(self.split_off(cap)));
687        let mut dst = dst.as_direct(store, cap);
688        dst.remaining().copy_from_slice(&self);
689        dst.mark_written(cap);
690        Poll::Ready(Ok(StreamResult::Dropped))
691    }
692}
693
694#[cfg(feature = "component-model-async-bytes")]
695impl<D> StreamProducer<D> for bytes::BytesMut {
696    type Item = u8;
697    type Buffer = Cursor<Self>;
698
699    fn poll_produce<'a>(
700        mut self: Pin<&mut Self>,
701        _: &mut Context<'_>,
702        mut store: StoreContextMut<'a, D>,
703        mut dst: Destination<'a, Self::Item, Self::Buffer>,
704        _: bool,
705    ) -> Poll<Result<StreamResult>> {
706        let cap = dst.remaining(&mut store);
707        let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
708            // on 0-length or host reads, buffer the bytes
709            dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
710            return Poll::Ready(Ok(StreamResult::Dropped));
711        };
712        let cap = cap.into();
713        // data does not fit in destination, fill it and buffer the rest
714        dst.set_buffer(Cursor::new(self.split_off(cap)));
715        let mut dst = dst.as_direct(store, cap);
716        dst.remaining().copy_from_slice(&self);
717        dst.mark_written(cap);
718        Poll::Ready(Ok(StreamResult::Dropped))
719    }
720}
721
722/// Represents the buffer for a host- or guest-initiated stream write.
723pub struct Source<'a, T> {
724    id: TableId<TransmitState>,
725    host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
726}
727
728impl<'a, T> Source<'a, T> {
729    /// Reborrow `self` so it can be used again later.
730    pub fn reborrow(&mut self) -> Source<'_, T> {
731        Source {
732            id: self.id,
733            host_buffer: self.host_buffer.as_deref_mut(),
734        }
735    }
736
737    /// Accept zero or more items from the writer.
738    pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
739    where
740        T: func::Lift + 'static,
741        B: ReadBuffer<T>,
742    {
743        if let Some(input) = &mut self.host_buffer {
744            let count = input.remaining().len().min(buffer.remaining_capacity());
745            buffer.move_from(*input, count);
746        } else {
747            let store = store.as_context_mut();
748            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
749
750            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
751                unreachable!();
752            };
753
754            let &WriteState::GuestReady {
755                ty,
756                address,
757                count,
758                options,
759                instance,
760                ..
761            } = &transmit.write
762            else {
763                unreachable!()
764            };
765
766            let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
767            let ty = payload(ty, cx.types);
768            let old_remaining = buffer.remaining_capacity();
769            lift::<T, B>(
770                cx,
771                ty,
772                buffer,
773                address + (T::SIZE32 * guest_offset),
774                count - guest_offset,
775            )?;
776
777            let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
778
779            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
780                unreachable!();
781            };
782
783            *guest_offset += old_remaining - buffer.remaining_capacity();
784        }
785
786        Ok(())
787    }
788
789    /// Return the number of items remaining to be read from the current write
790    /// operation.
791    pub fn remaining(&self, mut store: impl AsContextMut) -> usize
792    where
793        T: 'static,
794    {
795        let transmit = store
796            .as_context_mut()
797            .0
798            .concurrent_state_mut()
799            .get_mut(self.id)
800            .unwrap();
801
802        if let &WriteState::GuestReady { count, .. } = &transmit.write {
803            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
804                unreachable!()
805            };
806
807            count - guest_offset
808        } else if let Some(host_buffer) = &self.host_buffer {
809            host_buffer.remaining().len()
810        } else {
811            unreachable!()
812        }
813    }
814}
815
816impl<'a> Source<'a, u8> {
817    /// Return a `DirectSource` view of `self`.
818    pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
819        DirectSource {
820            id: self.id,
821            host_buffer: self.host_buffer,
822            store,
823        }
824    }
825}
826
827/// Represents a write to a `stream<u8>`, providing direct access to the
828/// writer's buffer.
829pub struct DirectSource<'a, D: 'static> {
830    id: TableId<TransmitState>,
831    host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
832    store: StoreContextMut<'a, D>,
833}
834
835impl<D: 'static> DirectSource<'_, D> {
836    /// Provide direct access to the writer's buffer.
837    pub fn remaining(&mut self) -> &[u8] {
838        if let Some(buffer) = self.host_buffer.as_deref_mut() {
839            buffer.remaining()
840        } else {
841            let transmit = self
842                .store
843                .as_context_mut()
844                .0
845                .concurrent_state_mut()
846                .get_mut(self.id)
847                .unwrap();
848
849            let &WriteState::GuestReady {
850                address,
851                count,
852                options,
853                instance,
854                ..
855            } = &transmit.write
856            else {
857                unreachable!()
858            };
859
860            let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
861                unreachable!()
862            };
863
864            instance
865                .options_memory(self.store.0, options)
866                .get((address + guest_offset)..)
867                .and_then(|b| b.get(..(count - guest_offset)))
868                .unwrap()
869        }
870    }
871
872    /// Mark the specified number of bytes as read from the writer's buffer.
873    ///
874    /// This will panic if the count is larger than the size of the buffer
875    /// returned by `Self::remaining`.
876    pub fn mark_read(&mut self, count: usize) {
877        if let Some(buffer) = self.host_buffer.as_deref_mut() {
878            buffer.skip(count);
879        } else {
880            let transmit = self
881                .store
882                .as_context_mut()
883                .0
884                .concurrent_state_mut()
885                .get_mut(self.id)
886                .unwrap();
887
888            let WriteState::GuestReady {
889                count: write_count, ..
890            } = &transmit.write
891            else {
892                unreachable!()
893            };
894
895            let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
896                unreachable!()
897            };
898
899            if *guest_offset + count > *write_count {
900                panic!(
901                    "read count ({count}) must be less than or equal to write count ({write_count})"
902                )
903            } else {
904                *guest_offset += count;
905            }
906        }
907    }
908}
909
910/// Represents the host-owned read end of a stream.
911pub trait StreamConsumer<D>: Send + 'static {
912    /// The payload type of this stream.
913    type Item;
914
915    /// Handle a host- or guest-initiated write by accepting zero or more items
916    /// from the specified source.
917    ///
918    /// This will be called whenever the writer starts a write.
919    ///
920    /// If the implementation is able to consume one or more items immediately,
921    /// it should take them from `source` and return either
922    /// `Poll::Ready(Ok(StreamResult::Completed))` if it expects to be able to consume
923    /// more items, or `Poll::Ready(Ok(StreamResult::Dropped))` if it cannot
924    /// accept any more items.  Alternatively, it may return `Poll::Pending` to
925    /// indicate that the caller should delay sending a `COMPLETED` event to the
926    /// writer until a later call to this function returns `Poll::Ready(_)`.
927    /// For more about that, see the `Backpressure` section below.
928    ///
929    /// If the implementation cannot consume any items immediately and `finish`
930    /// is _false_, it should store the waker from `cx` for later and return
931    /// `Poll::Pending` without writing anything to `destination`.  Later, it
932    /// should alert the waker when either (1) the items arrive, (2) the stream
933    /// has ended, or (3) an error occurs.
934    ///
935    /// If the implementation cannot consume any items immediately and `finish`
936    /// is _true_, it should, if possible, return
937    /// `Poll::Ready(Ok(StreamResult::Cancelled))` immediately without taking
938    /// anything from `source`.  However, that might not be possible if an
939    /// earlier call to `poll_consume` kicked off an asynchronous operation
940    /// which needs to be completed (and possibly interrupted) gracefully, in
941    /// which case the implementation may return `Poll::Pending` and later alert
942    /// the waker as described above.  In other words, when `finish` is true,
943    /// the implementation should prioritize returning a result to the reader
944    /// (even if no items can be consumed) rather than wait indefinitely for at
945    /// capacity to free up.
946    ///
947    /// In all of the above cases, the implementation may alternatively choose
948    /// to return `Err(_)` to indicate an unrecoverable error.  This will cause
949    /// the guest (if any) to trap and render the component instance (if any)
950    /// unusable.  The implementation should report errors that _are_
951    /// recoverable by other means (e.g. by writing to a `future`) and return
952    /// `Poll::Ready(Ok(StreamResult::Dropped))`.
953    ///
954    /// Note that the implementation should only return
955    /// `Poll::Ready(Ok(StreamResult::Cancelled))` without having taken any
956    /// items from `source` if called with `finish` set to true.  If it does so
957    /// when `finish` is false, the caller will trap.  Additionally, it should
958    /// only return `Poll::Ready(Ok(StreamResult::Completed))` after taking at
959    /// least one item from `source` if there is an item available; otherwise,
960    /// the caller will trap.  If `poll_consume` is called with no items in
961    /// `source`, it should only return `Poll::Ready(_)` once it is able to
962    /// accept at least one item during the next call to `poll_consume`.
963    ///
964    /// Note that any items which the implementation of this trait takes from
965    /// `source` become the responsibility of that implementation.  For that
966    /// reason, an implementation which forwards items to an upstream sink
967    /// should reserve capacity in that sink before taking items out of
968    /// `source`, if possible.  Alternatively, it might buffer items which can't
969    /// be forwarded immediately and send them once capacity is freed up.
970    ///
971    /// ## Backpressure
972    ///
973    /// As mentioned above, an implementation might choose to return
974    /// `Poll::Pending` after taking items from `source`, which tells the caller
975    /// to delay sending a `COMPLETED` event to the writer.  This can be used as
976    /// a form of backpressure when the items are forwarded to an upstream sink
977    /// asynchronously.  Note, however, that it's not possible to "put back"
978    /// items into `source` once they've been taken out, so if the upstream sink
979    /// is unable to accept all the items, that cannot be communicated to the
980    /// writer at this level of abstraction.  Just as with application-specific,
981    /// recoverable errors, information about which items could be forwarded and
982    /// which could not must be communicated out-of-band, e.g. by writing to an
983    /// application-specific `future`.
984    ///
985    /// Similarly, if the writer cancels the write after items have been taken
986    /// from `source` but before the items have all been forwarded to an
987    /// upstream sink, `poll_consume` will be called with `finish` set to true,
988    /// and the implementation may either:
989    ///
990    /// - Interrupt the forwarding process gracefully.  This may be preferable
991    /// if there is an out-of-band channel for communicating to the writer how
992    /// many items were forwarded before being interrupted.
993    ///
994    /// - Allow the forwarding to complete without interrupting it.  This is
995    /// usually preferable if there's no out-of-band channel for reporting back
996    /// to the writer how many items were forwarded.
997    fn poll_consume(
998        self: Pin<&mut Self>,
999        cx: &mut Context<'_>,
1000        store: StoreContextMut<D>,
1001        source: Source<'_, Self::Item>,
1002        finish: bool,
1003    ) -> Poll<Result<StreamResult>>;
1004}
1005
1006/// Represents a host-owned write end of a future.
1007pub trait FutureProducer<D>: Send + 'static {
1008    /// The payload type of this future.
1009    type Item;
1010
1011    /// Handle a host- or guest-initiated read by producing a value.
1012    ///
1013    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1014    /// simplified interface for futures.
1015    ///
1016    /// If `finish` is true, the implementation may return
1017    /// `Poll::Ready(Ok(None))` to indicate the operation was canceled before it
1018    /// could produce a value.  Otherwise, it must either return
1019    /// `Poll::Ready(Ok(Some(_)))`, `Poll::Ready(Err(_))`, or `Poll::Pending`.
1020    fn poll_produce(
1021        self: Pin<&mut Self>,
1022        cx: &mut Context<'_>,
1023        store: StoreContextMut<D>,
1024        finish: bool,
1025    ) -> Poll<Result<Option<Self::Item>>>;
1026}
1027
1028impl<T, E, D, Fut> FutureProducer<D> for Fut
1029where
1030    E: Into<Error>,
1031    Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
1032{
1033    type Item = T;
1034
1035    fn poll_produce<'a>(
1036        self: Pin<&mut Self>,
1037        cx: &mut Context<'_>,
1038        _: StoreContextMut<'a, D>,
1039        finish: bool,
1040    ) -> Poll<Result<Option<T>>> {
1041        match self.poll(cx) {
1042            Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
1043            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
1044            Poll::Pending if finish => Poll::Ready(Ok(None)),
1045            Poll::Pending => Poll::Pending,
1046        }
1047    }
1048}
1049
1050/// Represents a host-owned read end of a future.
1051pub trait FutureConsumer<D>: Send + 'static {
1052    /// The payload type of this future.
1053    type Item;
1054
1055    /// Handle a host- or guest-initiated write by consuming a value.
1056    ///
1057    /// This is equivalent to `StreamProducer::poll_produce`, but with a
1058    /// simplified interface for futures.
1059    ///
1060    /// If `finish` is true, the implementation may return `Poll::Ready(Ok(()))`
1061    /// without taking the item from `source`, which indicates the operation was
1062    /// canceled before it could consume the value.  Otherwise, it must either
1063    /// take the item from `source` and return `Poll::Ready(Ok(()))`, or else
1064    /// return `Poll::Ready(Err(_))` or `Poll::Pending` (with or without taking
1065    /// the item).
1066    fn poll_consume(
1067        self: Pin<&mut Self>,
1068        cx: &mut Context<'_>,
1069        store: StoreContextMut<D>,
1070        source: Source<'_, Self::Item>,
1071        finish: bool,
1072    ) -> Poll<Result<()>>;
1073}
1074
1075/// Represents the readable end of a Component Model `future`.
1076///
1077/// Note that `FutureReader` instances must be disposed of using either `pipe`
1078/// or `close`; otherwise the in-store representation will leak and the writer
1079/// end will hang indefinitely.  Consider using [`GuardedFutureReader`] to
1080/// ensure that disposal happens automatically.
1081pub struct FutureReader<T> {
1082    id: TableId<TransmitHandle>,
1083    _phantom: PhantomData<T>,
1084}
1085
1086impl<T> FutureReader<T> {
1087    /// Create a new future with the specified producer.
1088    pub fn new<S: AsContextMut>(
1089        mut store: S,
1090        producer: impl FutureProducer<S::Data, Item = T>,
1091    ) -> Self
1092    where
1093        T: func::Lower + func::Lift + Send + Sync + 'static,
1094    {
1095        struct Producer<P>(P);
1096
1097        impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
1098            for Producer<P>
1099        {
1100            type Item = P::Item;
1101            type Buffer = Option<P::Item>;
1102
1103            fn poll_produce<'a>(
1104                self: Pin<&mut Self>,
1105                cx: &mut Context<'_>,
1106                store: StoreContextMut<D>,
1107                mut destination: Destination<'a, Self::Item, Self::Buffer>,
1108                finish: bool,
1109            ) -> Poll<Result<StreamResult>> {
1110                // SAFETY: This is a standard pin-projection, and we never move
1111                // out of `self`.
1112                let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1113
1114                Poll::Ready(Ok(
1115                    if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
1116                        destination.set_buffer(Some(value));
1117
1118                        // Here we return `StreamResult::Completed` even though
1119                        // we've produced the last item we'll ever produce.
1120                        // That's because the ABI expects
1121                        // `ReturnCode::Completed(1)` rather than
1122                        // `ReturnCode::Dropped(1)`.  In any case, we won't be
1123                        // called again since the future will have resolved.
1124                        StreamResult::Completed
1125                    } else {
1126                        StreamResult::Cancelled
1127                    },
1128                ))
1129            }
1130        }
1131
1132        Self::new_(
1133            store
1134                .as_context_mut()
1135                .new_transmit(TransmitKind::Future, Producer(producer)),
1136        )
1137    }
1138
1139    fn new_(id: TableId<TransmitHandle>) -> Self {
1140        Self {
1141            id,
1142            _phantom: PhantomData,
1143        }
1144    }
1145
1146    /// Set the consumer that accepts the result of this future.
1147    pub fn pipe<S: AsContextMut>(
1148        self,
1149        mut store: S,
1150        consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
1151    ) where
1152        T: func::Lift + 'static,
1153    {
1154        struct Consumer<C>(C);
1155
1156        impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
1157            for Consumer<C>
1158        {
1159            type Item = T;
1160
1161            fn poll_consume(
1162                self: Pin<&mut Self>,
1163                cx: &mut Context<'_>,
1164                mut store: StoreContextMut<D>,
1165                mut source: Source<Self::Item>,
1166                finish: bool,
1167            ) -> Poll<Result<StreamResult>> {
1168                // SAFETY: This is a standard pin-projection, and we never move
1169                // out of `self`.
1170                let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
1171
1172                ready!(consumer.poll_consume(
1173                    cx,
1174                    store.as_context_mut(),
1175                    source.reborrow(),
1176                    finish
1177                ))?;
1178
1179                Poll::Ready(Ok(if source.remaining(store) == 0 {
1180                    // Here we return `StreamResult::Completed` even though
1181                    // we've consumed the last item we'll ever consume.  That's
1182                    // because the ABI expects `ReturnCode::Completed(1)` rather
1183                    // than `ReturnCode::Dropped(1)`.  In any case, we won't be
1184                    // called again since the future will have resolved.
1185                    StreamResult::Completed
1186                } else {
1187                    StreamResult::Cancelled
1188                }))
1189            }
1190        }
1191
1192        store
1193            .as_context_mut()
1194            .set_consumer(self.id, TransmitKind::Future, Consumer(consumer));
1195    }
1196
1197    /// Convert this `FutureReader` into a [`Val`].
1198    // See TODO comment for `FutureAny`; this is prone to handle leakage.
1199    pub fn into_val(self) -> Val {
1200        Val::Future(FutureAny(self.id.rep()))
1201    }
1202
1203    /// Attempt to convert the specified [`Val`] to a `FutureReader`.
1204    pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1205        let Val::Future(FutureAny(rep)) = value else {
1206            bail!("expected `future`; got `{}`", value.desc());
1207        };
1208        let store = store.as_context_mut();
1209        let id = TableId::<TransmitHandle>::new(*rep);
1210        store.0.concurrent_state_mut().get_mut(id)?; // Just make sure it's present
1211        Ok(Self::new_(id))
1212    }
1213
1214    /// Transfer ownership of the read end of a future from a guest to the host.
1215    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1216        match ty {
1217            InterfaceType::Future(src) => {
1218                let handle_table = cx
1219                    .instance_mut()
1220                    .table_for_transmit(TransmitIndex::Future(src));
1221                let (rep, is_done) = handle_table.future_remove_readable(src, index)?;
1222                if is_done {
1223                    bail!("cannot lift future after being notified that the writable end dropped");
1224                }
1225                let id = TableId::<TransmitHandle>::new(rep);
1226                let concurrent_state = cx.concurrent_state_mut();
1227                let future = concurrent_state.get_mut(id)?;
1228                future.common.handle = None;
1229                let state = future.state;
1230
1231                if concurrent_state.get_mut(state)?.done {
1232                    bail!("cannot lift future after previous read succeeded");
1233                }
1234
1235                Ok(Self::new_(id))
1236            }
1237            _ => func::bad_type_info(),
1238        }
1239    }
1240
1241    /// Close this `FutureReader`, writing the default value.
1242    ///
1243    /// # Panics
1244    ///
1245    /// Panics if the store that the [`Accessor`] is derived from does not own
1246    /// this future. Usage of this future after calling `close` will also cause
1247    /// a panic.
1248    pub fn close(&mut self, mut store: impl AsContextMut) {
1249        // `self` should never be used again, but leave an invalid handle there just in case.
1250        let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1251        store
1252            .as_context_mut()
1253            .0
1254            .host_drop_reader(id, TransmitKind::Future)
1255            .unwrap();
1256    }
1257
1258    /// Convenience method around [`Self::close`].
1259    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1260        accessor.as_accessor().with(|access| self.close(access))
1261    }
1262
1263    /// Returns a [`GuardedFutureReader`] which will auto-close this future on
1264    /// drop and clean it up from the store.
1265    ///
1266    /// Note that the `accessor` provided must own this future and is
1267    /// additionally transferred to the `GuardedFutureReader` return value.
1268    pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
1269    where
1270        A: AsAccessor,
1271    {
1272        GuardedFutureReader::new(accessor, self)
1273    }
1274}
1275
1276impl<T> fmt::Debug for FutureReader<T> {
1277    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1278        f.debug_struct("FutureReader")
1279            .field("id", &self.id)
1280            .finish()
1281    }
1282}
1283
1284/// Transfer ownership of the read end of a future from the host to a guest.
1285pub(crate) fn lower_future_to_index<U>(
1286    rep: u32,
1287    cx: &mut LowerContext<'_, U>,
1288    ty: InterfaceType,
1289) -> Result<u32> {
1290    match ty {
1291        InterfaceType::Future(dst) => {
1292            let concurrent_state = cx.store.0.concurrent_state_mut();
1293            let id = TableId::<TransmitHandle>::new(rep);
1294            let state = concurrent_state.get_mut(id)?.state;
1295            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1296
1297            let handle = cx
1298                .instance_mut()
1299                .table_for_transmit(TransmitIndex::Future(dst))
1300                .future_insert_read(dst, rep)?;
1301
1302            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1303
1304            Ok(handle)
1305        }
1306        _ => func::bad_type_info(),
1307    }
1308}
1309
1310// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1311// safe and correct since we lift and lower future handles as `u32`s.
1312unsafe impl<T: Send + Sync> func::ComponentType for FutureReader<T> {
1313    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1314
1315    type Lower = <u32 as func::ComponentType>::Lower;
1316
1317    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1318        match ty {
1319            InterfaceType::Future(_) => Ok(()),
1320            other => bail!("expected `future`, found `{}`", func::desc(other)),
1321        }
1322    }
1323}
1324
1325// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1326unsafe impl<T: Send + Sync> func::Lower for FutureReader<T> {
1327    fn linear_lower_to_flat<U>(
1328        &self,
1329        cx: &mut LowerContext<'_, U>,
1330        ty: InterfaceType,
1331        dst: &mut MaybeUninit<Self::Lower>,
1332    ) -> Result<()> {
1333        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1334            cx,
1335            InterfaceType::U32,
1336            dst,
1337        )
1338    }
1339
1340    fn linear_lower_to_memory<U>(
1341        &self,
1342        cx: &mut LowerContext<'_, U>,
1343        ty: InterfaceType,
1344        offset: usize,
1345    ) -> Result<()> {
1346        lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1347            cx,
1348            InterfaceType::U32,
1349            offset,
1350        )
1351    }
1352}
1353
1354// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1355unsafe impl<T: Send + Sync> func::Lift for FutureReader<T> {
1356    fn linear_lift_from_flat(
1357        cx: &mut LiftContext<'_>,
1358        ty: InterfaceType,
1359        src: &Self::Lower,
1360    ) -> Result<Self> {
1361        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1362        Self::lift_from_index(cx, ty, index)
1363    }
1364
1365    fn linear_lift_from_memory(
1366        cx: &mut LiftContext<'_>,
1367        ty: InterfaceType,
1368        bytes: &[u8],
1369    ) -> Result<Self> {
1370        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1371        Self::lift_from_index(cx, ty, index)
1372    }
1373}
1374
1375/// A [`FutureReader`] paired with an [`Accessor`].
1376///
1377/// This is an RAII wrapper around [`FutureReader`] that ensures it is closed
1378/// when dropped. This can be created through [`GuardedFutureReader::new`] or
1379/// [`FutureReader::guard`].
1380pub struct GuardedFutureReader<T, A>
1381where
1382    A: AsAccessor,
1383{
1384    // This field is `None` to implement the conversion from this guard back to
1385    // `FutureReader`. When `None` is seen in the destructor it will cause the
1386    // destructor to do nothing.
1387    reader: Option<FutureReader<T>>,
1388    accessor: A,
1389}
1390
1391impl<T, A> GuardedFutureReader<T, A>
1392where
1393    A: AsAccessor,
1394{
1395    /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
1396    pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
1397        Self {
1398            reader: Some(reader),
1399            accessor,
1400        }
1401    }
1402
1403    /// Extracts the underlying [`FutureReader`] from this guard, returning it
1404    /// back.
1405    pub fn into_future(self) -> FutureReader<T> {
1406        self.into()
1407    }
1408}
1409
1410impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
1411where
1412    A: AsAccessor,
1413{
1414    fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
1415        guard.reader.take().unwrap()
1416    }
1417}
1418
1419impl<T, A> Drop for GuardedFutureReader<T, A>
1420where
1421    A: AsAccessor,
1422{
1423    fn drop(&mut self) {
1424        if let Some(reader) = &mut self.reader {
1425            reader.close_with(&self.accessor)
1426        }
1427    }
1428}
1429
1430/// Represents the readable end of a Component Model `stream`.
1431///
1432/// Note that `StreamReader` instances must be disposed of using `close`;
1433/// otherwise the in-store representation will leak and the writer end will hang
1434/// indefinitely.  Consider using [`GuardedStreamReader`] to ensure that
1435/// disposal happens automatically.
1436pub struct StreamReader<T> {
1437    id: TableId<TransmitHandle>,
1438    _phantom: PhantomData<T>,
1439}
1440
1441impl<T> StreamReader<T> {
1442    /// Create a new stream with the specified producer.
1443    pub fn new<S: AsContextMut>(
1444        mut store: S,
1445        producer: impl StreamProducer<S::Data, Item = T>,
1446    ) -> Self
1447    where
1448        T: func::Lower + func::Lift + Send + Sync + 'static,
1449    {
1450        Self::new_(
1451            store
1452                .as_context_mut()
1453                .new_transmit(TransmitKind::Stream, producer),
1454        )
1455    }
1456
1457    fn new_(id: TableId<TransmitHandle>) -> Self {
1458        Self {
1459            id,
1460            _phantom: PhantomData,
1461        }
1462    }
1463
1464    /// Attempt to consume this object by converting it into the specified type.
1465    ///
1466    /// This can be useful for "short-circuiting" host-to-host streams,
1467    /// bypassing the guest entirely.  For example, if a guest task returns a
1468    /// host-created stream and then exits, this function may be used to
1469    /// retrieve the write end, after which the guest instance and store may be
1470    /// disposed of if no longer needed.
1471    ///
1472    /// This will return `Ok(_)` if and only if the following conditions are
1473    /// met:
1474    ///
1475    /// - The stream was created by the host (i.e. not by the guest).
1476    ///
1477    /// - The `StreamProducer::try_into` function returns `Ok(_)` when given the
1478    /// producer provided to `StreamReader::new` when the stream was created,
1479    /// along with `TypeId::of::<V>()`.
1480    pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
1481        let store = store.as_context_mut();
1482        let state = store.0.concurrent_state_mut();
1483        let id = state.get_mut(self.id).unwrap().state;
1484        if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
1485            match try_into(TypeId::of::<V>()) {
1486                Some(result) => {
1487                    self.close(store);
1488                    Ok(*result.downcast::<V>().unwrap())
1489                }
1490                None => Err(self),
1491            }
1492        } else {
1493            Err(self)
1494        }
1495    }
1496
1497    /// Set the consumer that accepts the items delivered to this stream.
1498    pub fn pipe<S: AsContextMut>(
1499        self,
1500        mut store: S,
1501        consumer: impl StreamConsumer<S::Data, Item = T>,
1502    ) where
1503        T: 'static,
1504    {
1505        store
1506            .as_context_mut()
1507            .set_consumer(self.id, TransmitKind::Stream, consumer);
1508    }
1509
1510    /// Convert this `StreamReader` into a [`Val`].
1511    // See TODO comment for `StreamAny`; this is prone to handle leakage.
1512    pub fn into_val(self) -> Val {
1513        Val::Stream(StreamAny(self.id.rep()))
1514    }
1515
1516    /// Attempt to convert the specified [`Val`] to a `StreamReader`.
1517    pub fn from_val(mut store: impl AsContextMut<Data: Send>, value: &Val) -> Result<Self> {
1518        let Val::Stream(StreamAny(rep)) = value else {
1519            bail!("expected `stream`; got `{}`", value.desc());
1520        };
1521        let store = store.as_context_mut();
1522        let id = TableId::<TransmitHandle>::new(*rep);
1523        store.0.concurrent_state_mut().get_mut(id)?; // Just make sure it's present
1524        Ok(Self::new_(id))
1525    }
1526
1527    /// Transfer ownership of the read end of a stream from a guest to the host.
1528    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1529        match ty {
1530            InterfaceType::Stream(src) => {
1531                let handle_table = cx
1532                    .instance_mut()
1533                    .table_for_transmit(TransmitIndex::Stream(src));
1534                let (rep, is_done) = handle_table.stream_remove_readable(src, index)?;
1535                if is_done {
1536                    bail!("cannot lift stream after being notified that the writable end dropped");
1537                }
1538                let id = TableId::<TransmitHandle>::new(rep);
1539                cx.concurrent_state_mut().get_mut(id)?.common.handle = None;
1540                Ok(Self::new_(id))
1541            }
1542            _ => func::bad_type_info(),
1543        }
1544    }
1545
1546    /// Close this `StreamReader`, writing the default value.
1547    ///
1548    /// # Panics
1549    ///
1550    /// Panics if the store that the [`Accessor`] is derived from does not own
1551    /// this future. Usage of this future after calling `close` will also cause
1552    /// a panic.
1553    pub fn close(&mut self, mut store: impl AsContextMut) {
1554        // `self` should never be used again, but leave an invalid handle there just in case.
1555        let id = mem::replace(&mut self.id, TableId::new(u32::MAX));
1556        store
1557            .as_context_mut()
1558            .0
1559            .host_drop_reader(id, TransmitKind::Stream)
1560            .unwrap()
1561    }
1562
1563    /// Convenience method around [`Self::close`].
1564    pub fn close_with(&mut self, accessor: impl AsAccessor) {
1565        accessor.as_accessor().with(|access| self.close(access))
1566    }
1567
1568    /// Returns a [`GuardedStreamReader`] which will auto-close this stream on
1569    /// drop and clean it up from the store.
1570    ///
1571    /// Note that the `accessor` provided must own this future and is
1572    /// additionally transferred to the `GuardedStreamReader` return value.
1573    pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
1574    where
1575        A: AsAccessor,
1576    {
1577        GuardedStreamReader::new(accessor, self)
1578    }
1579}
1580
1581impl<T> fmt::Debug for StreamReader<T> {
1582    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1583        f.debug_struct("StreamReader")
1584            .field("id", &self.id)
1585            .finish()
1586    }
1587}
1588
1589/// Transfer ownership of the read end of a stream from the host to a guest.
1590pub(crate) fn lower_stream_to_index<U>(
1591    rep: u32,
1592    cx: &mut LowerContext<'_, U>,
1593    ty: InterfaceType,
1594) -> Result<u32> {
1595    match ty {
1596        InterfaceType::Stream(dst) => {
1597            let concurrent_state = cx.store.0.concurrent_state_mut();
1598            let id = TableId::<TransmitHandle>::new(rep);
1599            let state = concurrent_state.get_mut(id)?.state;
1600            let rep = concurrent_state.get_mut(state)?.read_handle.rep();
1601
1602            let handle = cx
1603                .instance_mut()
1604                .table_for_transmit(TransmitIndex::Stream(dst))
1605                .stream_insert_read(dst, rep)?;
1606
1607            cx.store.0.concurrent_state_mut().get_mut(id)?.common.handle = Some(handle);
1608
1609            Ok(handle)
1610        }
1611        _ => func::bad_type_info(),
1612    }
1613}
1614
1615// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1616// safe and correct since we lift and lower stream handles as `u32`s.
1617unsafe impl<T: Send + Sync> func::ComponentType for StreamReader<T> {
1618    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1619
1620    type Lower = <u32 as func::ComponentType>::Lower;
1621
1622    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1623        match ty {
1624            InterfaceType::Stream(_) => Ok(()),
1625            other => bail!("expected `stream`, found `{}`", func::desc(other)),
1626        }
1627    }
1628}
1629
1630// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1631unsafe impl<T: Send + Sync> func::Lower for StreamReader<T> {
1632    fn linear_lower_to_flat<U>(
1633        &self,
1634        cx: &mut LowerContext<'_, U>,
1635        ty: InterfaceType,
1636        dst: &mut MaybeUninit<Self::Lower>,
1637    ) -> Result<()> {
1638        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat(
1639            cx,
1640            InterfaceType::U32,
1641            dst,
1642        )
1643    }
1644
1645    fn linear_lower_to_memory<U>(
1646        &self,
1647        cx: &mut LowerContext<'_, U>,
1648        ty: InterfaceType,
1649        offset: usize,
1650    ) -> Result<()> {
1651        lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory(
1652            cx,
1653            InterfaceType::U32,
1654            offset,
1655        )
1656    }
1657}
1658
1659// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1660unsafe impl<T: Send + Sync> func::Lift for StreamReader<T> {
1661    fn linear_lift_from_flat(
1662        cx: &mut LiftContext<'_>,
1663        ty: InterfaceType,
1664        src: &Self::Lower,
1665    ) -> Result<Self> {
1666        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1667        Self::lift_from_index(cx, ty, index)
1668    }
1669
1670    fn linear_lift_from_memory(
1671        cx: &mut LiftContext<'_>,
1672        ty: InterfaceType,
1673        bytes: &[u8],
1674    ) -> Result<Self> {
1675        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1676        Self::lift_from_index(cx, ty, index)
1677    }
1678}
1679
1680/// A [`StreamReader`] paired with an [`Accessor`].
1681///
1682/// This is an RAII wrapper around [`StreamReader`] that ensures it is closed
1683/// when dropped. This can be created through [`GuardedStreamReader::new`] or
1684/// [`StreamReader::guard`].
1685pub struct GuardedStreamReader<T, A>
1686where
1687    A: AsAccessor,
1688{
1689    // This field is `None` to implement the conversion from this guard back to
1690    // `StreamReader`. When `None` is seen in the destructor it will cause the
1691    // destructor to do nothing.
1692    reader: Option<StreamReader<T>>,
1693    accessor: A,
1694}
1695
1696impl<T, A> GuardedStreamReader<T, A>
1697where
1698    A: AsAccessor,
1699{
1700    /// Create a new `GuardedStreamReader` with the specified `accessor` and
1701    /// `reader`.
1702    pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
1703        Self {
1704            reader: Some(reader),
1705            accessor,
1706        }
1707    }
1708
1709    /// Extracts the underlying [`StreamReader`] from this guard, returning it
1710    /// back.
1711    pub fn into_stream(self) -> StreamReader<T> {
1712        self.into()
1713    }
1714}
1715
1716impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
1717where
1718    A: AsAccessor,
1719{
1720    fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
1721        guard.reader.take().unwrap()
1722    }
1723}
1724
1725impl<T, A> Drop for GuardedStreamReader<T, A>
1726where
1727    A: AsAccessor,
1728{
1729    fn drop(&mut self) {
1730        if let Some(reader) = &mut self.reader {
1731            reader.close_with(&self.accessor)
1732        }
1733    }
1734}
1735
1736/// Represents a Component Model `error-context`.
1737pub struct ErrorContext {
1738    rep: u32,
1739}
1740
1741impl ErrorContext {
1742    pub(crate) fn new(rep: u32) -> Self {
1743        Self { rep }
1744    }
1745
1746    /// Convert this `ErrorContext` into a [`Val`].
1747    pub fn into_val(self) -> Val {
1748        Val::ErrorContext(ErrorContextAny(self.rep))
1749    }
1750
1751    /// Attempt to convert the specified [`Val`] to a `ErrorContext`.
1752    pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
1753        let Val::ErrorContext(ErrorContextAny(rep)) = value else {
1754            bail!("expected `error-context`; got `{}`", value.desc());
1755        };
1756        Ok(Self::new(*rep))
1757    }
1758
1759    fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
1760        match ty {
1761            InterfaceType::ErrorContext(src) => {
1762                let rep = cx
1763                    .instance_mut()
1764                    .table_for_error_context(src)
1765                    .error_context_rep(index)?;
1766
1767                Ok(Self { rep })
1768            }
1769            _ => func::bad_type_info(),
1770        }
1771    }
1772}
1773
1774pub(crate) fn lower_error_context_to_index<U>(
1775    rep: u32,
1776    cx: &mut LowerContext<'_, U>,
1777    ty: InterfaceType,
1778) -> Result<u32> {
1779    match ty {
1780        InterfaceType::ErrorContext(dst) => {
1781            let tbl = cx.instance_mut().table_for_error_context(dst);
1782            tbl.error_context_insert(rep)
1783        }
1784        _ => func::bad_type_info(),
1785    }
1786}
1787// SAFETY: This relies on the `ComponentType` implementation for `u32` being
1788// safe and correct since we lift and lower future handles as `u32`s.
1789unsafe impl func::ComponentType for ErrorContext {
1790    const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
1791
1792    type Lower = <u32 as func::ComponentType>::Lower;
1793
1794    fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
1795        match ty {
1796            InterfaceType::ErrorContext(_) => Ok(()),
1797            other => bail!("expected `error`, found `{}`", func::desc(other)),
1798        }
1799    }
1800}
1801
1802// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1803unsafe impl func::Lower for ErrorContext {
1804    fn linear_lower_to_flat<T>(
1805        &self,
1806        cx: &mut LowerContext<'_, T>,
1807        ty: InterfaceType,
1808        dst: &mut MaybeUninit<Self::Lower>,
1809    ) -> Result<()> {
1810        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
1811            cx,
1812            InterfaceType::U32,
1813            dst,
1814        )
1815    }
1816
1817    fn linear_lower_to_memory<T>(
1818        &self,
1819        cx: &mut LowerContext<'_, T>,
1820        ty: InterfaceType,
1821        offset: usize,
1822    ) -> Result<()> {
1823        lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
1824            cx,
1825            InterfaceType::U32,
1826            offset,
1827        )
1828    }
1829}
1830
1831// SAFETY: See the comment on the `ComponentType` `impl` for this type.
1832unsafe impl func::Lift for ErrorContext {
1833    fn linear_lift_from_flat(
1834        cx: &mut LiftContext<'_>,
1835        ty: InterfaceType,
1836        src: &Self::Lower,
1837    ) -> Result<Self> {
1838        let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
1839        Self::lift_from_index(cx, ty, index)
1840    }
1841
1842    fn linear_lift_from_memory(
1843        cx: &mut LiftContext<'_>,
1844        ty: InterfaceType,
1845        bytes: &[u8],
1846    ) -> Result<Self> {
1847        let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
1848        Self::lift_from_index(cx, ty, index)
1849    }
1850}
1851
1852/// Represents the read or write end of a stream or future.
1853pub(super) struct TransmitHandle {
1854    pub(super) common: WaitableCommon,
1855    /// See `TransmitState`
1856    state: TableId<TransmitState>,
1857}
1858
1859impl TransmitHandle {
1860    fn new(state: TableId<TransmitState>) -> Self {
1861        Self {
1862            common: WaitableCommon::default(),
1863            state,
1864        }
1865    }
1866}
1867
1868impl TableDebug for TransmitHandle {
1869    fn type_name() -> &'static str {
1870        "TransmitHandle"
1871    }
1872}
1873
1874/// Represents the state of a stream or future.
1875struct TransmitState {
1876    /// The write end of the stream or future.
1877    write_handle: TableId<TransmitHandle>,
1878    /// The read end of the stream or future.
1879    read_handle: TableId<TransmitHandle>,
1880    /// See `WriteState`
1881    write: WriteState,
1882    /// See `ReadState`
1883    read: ReadState,
1884    /// Whether futher values may be transmitted via this stream or future.
1885    done: bool,
1886}
1887
1888impl Default for TransmitState {
1889    fn default() -> Self {
1890        Self {
1891            write_handle: TableId::new(u32::MAX),
1892            read_handle: TableId::new(u32::MAX),
1893            read: ReadState::Open,
1894            write: WriteState::Open,
1895            done: false,
1896        }
1897    }
1898}
1899
1900impl TableDebug for TransmitState {
1901    fn type_name() -> &'static str {
1902        "TransmitState"
1903    }
1904}
1905
1906type PollStream = Box<
1907    dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
1908>;
1909
1910type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
1911
1912/// Represents the state of the write end of a stream or future.
1913enum WriteState {
1914    /// The write end is open, but no write is pending.
1915    Open,
1916    /// The write end is owned by a guest task and a write is pending.
1917    GuestReady {
1918        instance: Instance,
1919        caller: RuntimeComponentInstanceIndex,
1920        ty: TransmitIndex,
1921        flat_abi: Option<FlatAbi>,
1922        options: OptionsIndex,
1923        address: usize,
1924        count: usize,
1925        handle: u32,
1926    },
1927    /// The write end is owned by the host, which is ready to produce items.
1928    HostReady {
1929        produce: PollStream,
1930        try_into: TryInto,
1931        guest_offset: usize,
1932        cancel: bool,
1933        cancel_waker: Option<Waker>,
1934    },
1935    /// The write end has been dropped.
1936    Dropped,
1937}
1938
1939impl fmt::Debug for WriteState {
1940    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1941        match self {
1942            Self::Open => f.debug_tuple("Open").finish(),
1943            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1944            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1945            Self::Dropped => f.debug_tuple("Dropped").finish(),
1946        }
1947    }
1948}
1949
1950/// Represents the state of the read end of a stream or future.
1951enum ReadState {
1952    /// The read end is open, but no read is pending.
1953    Open,
1954    /// The read end is owned by a guest task and a read is pending.
1955    GuestReady {
1956        ty: TransmitIndex,
1957        caller: RuntimeComponentInstanceIndex,
1958        flat_abi: Option<FlatAbi>,
1959        instance: Instance,
1960        options: OptionsIndex,
1961        address: usize,
1962        count: usize,
1963        handle: u32,
1964    },
1965    /// The read end is owned by a host task, and it is ready to consume items.
1966    HostReady {
1967        consume: PollStream,
1968        guest_offset: usize,
1969        cancel: bool,
1970        cancel_waker: Option<Waker>,
1971    },
1972    /// Both the read and write ends are owned by the host.
1973    HostToHost {
1974        accept: Box<
1975            dyn for<'a> Fn(
1976                    &'a mut UntypedWriteBuffer<'a>,
1977                )
1978                    -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
1979                + Send
1980                + Sync,
1981        >,
1982        buffer: Vec<u8>,
1983        limit: usize,
1984    },
1985    /// The read end has been dropped.
1986    Dropped,
1987}
1988
1989impl fmt::Debug for ReadState {
1990    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1991        match self {
1992            Self::Open => f.debug_tuple("Open").finish(),
1993            Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
1994            Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
1995            Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
1996            Self::Dropped => f.debug_tuple("Dropped").finish(),
1997        }
1998    }
1999}
2000
2001fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> ReturnCode {
2002    let count = guest_offset.try_into().unwrap();
2003    match state {
2004        StreamResult::Dropped => ReturnCode::Dropped(count),
2005        StreamResult::Completed => ReturnCode::completed(kind, count),
2006        StreamResult::Cancelled => ReturnCode::Cancelled(count),
2007    }
2008}
2009
2010impl StoreOpaque {
2011    fn pipe_from_guest(
2012        &mut self,
2013        kind: TransmitKind,
2014        id: TableId<TransmitState>,
2015        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2016    ) {
2017        let future = async move {
2018            let stream_state = future.await?;
2019            tls::get(|store| {
2020                let state = store.concurrent_state_mut();
2021                let transmit = state.get_mut(id)?;
2022                let ReadState::HostReady {
2023                    consume,
2024                    guest_offset,
2025                    ..
2026                } = mem::replace(&mut transmit.read, ReadState::Open)
2027                else {
2028                    unreachable!();
2029                };
2030                let code = return_code(kind, stream_state, guest_offset);
2031                transmit.read = match stream_state {
2032                    StreamResult::Dropped => ReadState::Dropped,
2033                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
2034                        consume,
2035                        guest_offset: 0,
2036                        cancel: false,
2037                        cancel_waker: None,
2038                    },
2039                };
2040                let WriteState::GuestReady { ty, handle, .. } =
2041                    mem::replace(&mut transmit.write, WriteState::Open)
2042                else {
2043                    unreachable!();
2044                };
2045                state.send_write_result(ty, id, handle, code)?;
2046                Ok(())
2047            })
2048        };
2049
2050        self.concurrent_state_mut().push_future(future.boxed());
2051    }
2052
2053    fn pipe_to_guest(
2054        &mut self,
2055        kind: TransmitKind,
2056        id: TableId<TransmitState>,
2057        future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
2058    ) {
2059        let future = async move {
2060            let stream_state = future.await?;
2061            tls::get(|store| {
2062                let state = store.concurrent_state_mut();
2063                let transmit = state.get_mut(id)?;
2064                let WriteState::HostReady {
2065                    produce,
2066                    try_into,
2067                    guest_offset,
2068                    ..
2069                } = mem::replace(&mut transmit.write, WriteState::Open)
2070                else {
2071                    unreachable!();
2072                };
2073                let code = return_code(kind, stream_state, guest_offset);
2074                transmit.write = match stream_state {
2075                    StreamResult::Dropped => WriteState::Dropped,
2076                    StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
2077                        produce,
2078                        try_into,
2079                        guest_offset: 0,
2080                        cancel: false,
2081                        cancel_waker: None,
2082                    },
2083                };
2084                let ReadState::GuestReady { ty, handle, .. } =
2085                    mem::replace(&mut transmit.read, ReadState::Open)
2086                else {
2087                    unreachable!();
2088                };
2089                state.send_read_result(ty, id, handle, code)?;
2090                Ok(())
2091            })
2092        };
2093
2094        self.concurrent_state_mut().push_future(future.boxed());
2095    }
2096
2097    /// Drop the read end of a stream or future read from the host.
2098    fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
2099        let state = self.concurrent_state_mut();
2100        let transmit_id = state.get_mut(id)?.state;
2101        let transmit = state
2102            .get_mut(transmit_id)
2103            .with_context(|| format!("error closing reader {transmit_id:?}"))?;
2104        log::trace!(
2105            "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
2106            transmit.read,
2107            transmit.write
2108        );
2109
2110        transmit.read = ReadState::Dropped;
2111
2112        // If the write end is already dropped, it should stay dropped,
2113        // otherwise, it should be opened.
2114        let new_state = if let WriteState::Dropped = &transmit.write {
2115            WriteState::Dropped
2116        } else {
2117            WriteState::Open
2118        };
2119
2120        let write_handle = transmit.write_handle;
2121
2122        match mem::replace(&mut transmit.write, new_state) {
2123            // If a guest is waiting to write, notify it that the read end has
2124            // been dropped.
2125            WriteState::GuestReady { ty, handle, .. } => {
2126                state.update_event(
2127                    write_handle.rep(),
2128                    match ty {
2129                        TransmitIndex::Future(ty) => Event::FutureWrite {
2130                            code: ReturnCode::Dropped(0),
2131                            pending: Some((ty, handle)),
2132                        },
2133                        TransmitIndex::Stream(ty) => Event::StreamWrite {
2134                            code: ReturnCode::Dropped(0),
2135                            pending: Some((ty, handle)),
2136                        },
2137                    },
2138                )?;
2139            }
2140
2141            WriteState::HostReady { .. } => {}
2142
2143            WriteState::Open => {
2144                state.update_event(
2145                    write_handle.rep(),
2146                    match kind {
2147                        TransmitKind::Future => Event::FutureWrite {
2148                            code: ReturnCode::Dropped(0),
2149                            pending: None,
2150                        },
2151                        TransmitKind::Stream => Event::StreamWrite {
2152                            code: ReturnCode::Dropped(0),
2153                            pending: None,
2154                        },
2155                    },
2156                )?;
2157            }
2158
2159            WriteState::Dropped => {
2160                log::trace!("host_drop_reader delete {transmit_id:?}");
2161                state.delete_transmit(transmit_id)?;
2162            }
2163        }
2164        Ok(())
2165    }
2166
2167    /// Drop the write end of a stream or future read from the host.
2168    fn host_drop_writer(
2169        &mut self,
2170        id: TableId<TransmitHandle>,
2171        on_drop_open: Option<fn() -> Result<()>>,
2172    ) -> Result<()> {
2173        let state = self.concurrent_state_mut();
2174        let transmit_id = state.get_mut(id)?.state;
2175        let transmit = state
2176            .get_mut(transmit_id)
2177            .with_context(|| format!("error closing writer {transmit_id:?}"))?;
2178        log::trace!(
2179            "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
2180            transmit.read,
2181            transmit.write
2182        );
2183
2184        // Existing queued transmits must be updated with information for the impending writer closure
2185        match &mut transmit.write {
2186            WriteState::GuestReady { .. } => {
2187                unreachable!("can't call `host_drop_writer` on a guest-owned writer");
2188            }
2189            WriteState::HostReady { .. } => {}
2190            v @ WriteState::Open => {
2191                if let (Some(on_drop_open), false) = (
2192                    on_drop_open,
2193                    transmit.done || matches!(transmit.read, ReadState::Dropped),
2194                ) {
2195                    on_drop_open()?;
2196                } else {
2197                    *v = WriteState::Dropped;
2198                }
2199            }
2200            WriteState::Dropped => unreachable!("write state is already dropped"),
2201        }
2202
2203        let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
2204
2205        // If the existing read state is dropped, then there's nothing to read
2206        // and we can keep it that way.
2207        //
2208        // If the read state was any other state, then we must set the new state to open
2209        // to indicate that there *is* data to be read
2210        let new_state = if let ReadState::Dropped = &transmit.read {
2211            ReadState::Dropped
2212        } else {
2213            ReadState::Open
2214        };
2215
2216        let read_handle = transmit.read_handle;
2217
2218        // Swap in the new read state
2219        match mem::replace(&mut transmit.read, new_state) {
2220            // If the guest was ready to read, then we cannot drop the reader (or writer);
2221            // we must deliver the event, and update the state associated with the handle to
2222            // represent that a read must be performed
2223            ReadState::GuestReady { ty, handle, .. } => {
2224                // Ensure the final read of the guest is queued, with appropriate closure indicator
2225                self.concurrent_state_mut().update_event(
2226                    read_handle.rep(),
2227                    match ty {
2228                        TransmitIndex::Future(ty) => Event::FutureRead {
2229                            code: ReturnCode::Dropped(0),
2230                            pending: Some((ty, handle)),
2231                        },
2232                        TransmitIndex::Stream(ty) => Event::StreamRead {
2233                            code: ReturnCode::Dropped(0),
2234                            pending: Some((ty, handle)),
2235                        },
2236                    },
2237                )?;
2238            }
2239
2240            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}
2241
2242            // If the read state is open, then there are no registered readers of the stream/future
2243            ReadState::Open => {
2244                self.concurrent_state_mut().update_event(
2245                    read_handle.rep(),
2246                    match on_drop_open {
2247                        Some(_) => Event::FutureRead {
2248                            code: ReturnCode::Dropped(0),
2249                            pending: None,
2250                        },
2251                        None => Event::StreamRead {
2252                            code: ReturnCode::Dropped(0),
2253                            pending: None,
2254                        },
2255                    },
2256                )?;
2257            }
2258
2259            // If the read state was already dropped, then we can remove the transmit state completely
2260            // (both writer and reader have been dropped)
2261            ReadState::Dropped => {
2262                log::trace!("host_drop_writer delete {transmit_id:?}");
2263                self.concurrent_state_mut().delete_transmit(transmit_id)?;
2264            }
2265        }
2266        Ok(())
2267    }
2268}
2269
2270impl<T> StoreContextMut<'_, T> {
2271    fn new_transmit<P: StreamProducer<T>>(
2272        mut self,
2273        kind: TransmitKind,
2274        producer: P,
2275    ) -> TableId<TransmitHandle>
2276    where
2277        P::Item: func::Lower,
2278    {
2279        let token = StoreToken::new(self.as_context_mut());
2280        let state = self.0.concurrent_state_mut();
2281        let (_, read) = state.new_transmit().unwrap();
2282        let producer = Arc::new(Mutex::new(Some((Box::pin(producer), P::Buffer::default()))));
2283        let id = state.get_mut(read).unwrap().state;
2284        let mut dropped = false;
2285        let produce = Box::new({
2286            let producer = producer.clone();
2287            move || {
2288                let producer = producer.clone();
2289                async move {
2290                    let (mut mine, mut buffer) = producer.lock().unwrap().take().unwrap();
2291
2292                    let (result, cancelled) = if buffer.remaining().is_empty() {
2293                        future::poll_fn(|cx| {
2294                            tls::get(|store| {
2295                                let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2296
2297                                let &WriteState::HostReady { cancel, .. } = &transmit.write else {
2298                                    unreachable!();
2299                                };
2300
2301                                let mut host_buffer =
2302                                    if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
2303                                        Some(Cursor::new(mem::take(buffer)))
2304                                    } else {
2305                                        None
2306                                    };
2307
2308                                let poll = mine.as_mut().poll_produce(
2309                                    cx,
2310                                    token.as_context_mut(store),
2311                                    Destination {
2312                                        id,
2313                                        buffer: &mut buffer,
2314                                        host_buffer: host_buffer.as_mut(),
2315                                        _phantom: PhantomData,
2316                                    },
2317                                    cancel,
2318                                );
2319
2320                                let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2321
2322                                let host_offset = if let (
2323                                    Some(host_buffer),
2324                                    ReadState::HostToHost { buffer, limit, .. },
2325                                ) = (host_buffer, &mut transmit.read)
2326                                {
2327                                    *limit = usize::try_from(host_buffer.position()).unwrap();
2328                                    *buffer = host_buffer.into_inner();
2329                                    *limit
2330                                } else {
2331                                    0
2332                                };
2333
2334                                {
2335                                    let WriteState::HostReady {
2336                                        guest_offset,
2337                                        cancel,
2338                                        cancel_waker,
2339                                        ..
2340                                    } = &mut transmit.write
2341                                    else {
2342                                        unreachable!();
2343                                    };
2344
2345                                    if poll.is_pending() {
2346                                        if !buffer.remaining().is_empty()
2347                                            || *guest_offset > 0
2348                                            || host_offset > 0
2349                                        {
2350                                            return Poll::Ready(Err(anyhow!(
2351                                                "StreamProducer::poll_produce returned Poll::Pending \
2352                                                 after producing at least one item"
2353                                            )));
2354                                        }
2355                                        *cancel_waker = Some(cx.waker().clone());
2356                                    } else {
2357                                        *cancel_waker = None;
2358                                        *cancel = false;
2359                                    }
2360                                }
2361
2362                                poll.map(|v| v.map(|result| (result, cancel)))
2363                            })
2364                        })
2365                            .await?
2366                    } else {
2367                        (StreamResult::Completed, false)
2368                    };
2369
2370                    let (guest_offset, host_offset, count) = tls::get(|store| {
2371                        let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2372                        let (count, host_offset) = match &transmit.read {
2373                            &ReadState::GuestReady { count, .. } => (count, 0),
2374                            &ReadState::HostToHost { limit, .. } => (1, limit),
2375                            _ => unreachable!(),
2376                        };
2377                        let guest_offset = match &transmit.write {
2378                            &WriteState::HostReady { guest_offset, .. } => guest_offset,
2379                            _ => unreachable!(),
2380                        };
2381                        (guest_offset, host_offset, count)
2382                    });
2383
2384                    match result {
2385                        StreamResult::Completed => {
2386                            if count > 1
2387                                && buffer.remaining().is_empty()
2388                                && guest_offset == 0
2389                                && host_offset == 0
2390                            {
2391                                bail!(
2392                                    "StreamProducer::poll_produce returned StreamResult::Completed \
2393                                     without producing any items"
2394                                );
2395                            }
2396                        }
2397                        StreamResult::Cancelled => {
2398                            if !cancelled {
2399                                bail!(
2400                                    "StreamProducer::poll_produce returned StreamResult::Cancelled \
2401                                     without being given a `finish` parameter value of true"
2402                                );
2403                            }
2404                        }
2405                        StreamResult::Dropped => {
2406                            dropped = true;
2407                        }
2408                    }
2409
2410                    let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
2411
2412                    *producer.lock().unwrap() = Some((mine, buffer));
2413
2414                    if write_buffer {
2415                        write( token, id, producer.clone(), kind).await?;
2416                    }
2417
2418                    Ok(if dropped {
2419                        if producer.lock().unwrap().as_ref().unwrap().1.remaining().is_empty()
2420                        {
2421                            StreamResult::Dropped
2422                        } else {
2423                            StreamResult::Completed
2424                        }
2425                    } else {
2426                        result
2427                    })
2428                }
2429                .boxed()
2430            }
2431        });
2432        let try_into = Box::new(move |ty| {
2433            let (mine, buffer) = producer.lock().unwrap().take().unwrap();
2434            match P::try_into(mine, ty) {
2435                Ok(value) => Some(value),
2436                Err(mine) => {
2437                    *producer.lock().unwrap() = Some((mine, buffer));
2438                    None
2439                }
2440            }
2441        });
2442        state.get_mut(id).unwrap().write = WriteState::HostReady {
2443            produce,
2444            try_into,
2445            guest_offset: 0,
2446            cancel: false,
2447            cancel_waker: None,
2448        };
2449        read
2450    }
2451
2452    fn set_consumer<C: StreamConsumer<T>>(
2453        mut self,
2454        id: TableId<TransmitHandle>,
2455        kind: TransmitKind,
2456        consumer: C,
2457    ) {
2458        let token = StoreToken::new(self.as_context_mut());
2459        let state = self.0.concurrent_state_mut();
2460        let id = state.get_mut(id).unwrap().state;
2461        let transmit = state.get_mut(id).unwrap();
2462        let consumer = Arc::new(Mutex::new(Some(Box::pin(consumer))));
2463        let consume_with_buffer = {
2464            let consumer = consumer.clone();
2465            async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
2466                let mut mine = consumer.lock().unwrap().take().unwrap();
2467
2468                let host_buffer_remaining_before =
2469                    host_buffer.as_deref_mut().map(|v| v.remaining().len());
2470
2471                let (result, cancelled) = future::poll_fn(|cx| {
2472                    tls::get(|store| {
2473                        let cancel = match &store.concurrent_state_mut().get_mut(id).unwrap().read {
2474                            &ReadState::HostReady { cancel, .. } => cancel,
2475                            ReadState::Open => false,
2476                            _ => unreachable!(),
2477                        };
2478
2479                        let poll = mine.as_mut().poll_consume(
2480                            cx,
2481                            token.as_context_mut(store),
2482                            Source {
2483                                id,
2484                                host_buffer: host_buffer.as_deref_mut(),
2485                            },
2486                            cancel,
2487                        );
2488
2489                        if let ReadState::HostReady {
2490                            cancel_waker,
2491                            cancel,
2492                            ..
2493                        } = &mut store.concurrent_state_mut().get_mut(id).unwrap().read
2494                        {
2495                            if poll.is_pending() {
2496                                *cancel_waker = Some(cx.waker().clone());
2497                            } else {
2498                                *cancel_waker = None;
2499                                *cancel = false;
2500                            }
2501                        }
2502
2503                        poll.map(|v| v.map(|result| (result, cancel)))
2504                    })
2505                })
2506                .await?;
2507
2508                let (guest_offset, count) = tls::get(|store| {
2509                    let transmit = store.concurrent_state_mut().get_mut(id).unwrap();
2510                    (
2511                        match &transmit.read {
2512                            &ReadState::HostReady { guest_offset, .. } => guest_offset,
2513                            ReadState::Open => 0,
2514                            _ => unreachable!(),
2515                        },
2516                        match &transmit.write {
2517                            &WriteState::GuestReady { count, .. } => count,
2518                            WriteState::HostReady { .. } => host_buffer_remaining_before.unwrap(),
2519                            _ => unreachable!(),
2520                        },
2521                    )
2522                });
2523
2524                match result {
2525                    StreamResult::Completed => {
2526                        if count > 0
2527                            && guest_offset == 0
2528                            && host_buffer_remaining_before
2529                                .zip(host_buffer.map(|v| v.remaining().len()))
2530                                .map(|(before, after)| before == after)
2531                                .unwrap_or(false)
2532                        {
2533                            bail!(
2534                                "StreamConsumer::poll_consume returned StreamResult::Completed \
2535                                 without consuming any items"
2536                            );
2537                        }
2538
2539                        if let TransmitKind::Future = kind {
2540                            tls::get(|store| {
2541                                store.concurrent_state_mut().get_mut(id).unwrap().done = true;
2542                            });
2543                        }
2544                    }
2545                    StreamResult::Cancelled => {
2546                        if !cancelled {
2547                            bail!(
2548                                "StreamConsumer::poll_consume returned StreamResult::Cancelled \
2549                                 without being given a `finish` parameter value of true"
2550                            );
2551                        }
2552                    }
2553                    StreamResult::Dropped => {}
2554                }
2555
2556                *consumer.lock().unwrap() = Some(mine);
2557
2558                Ok(result)
2559            }
2560        };
2561        let consume = {
2562            let consume = consume_with_buffer.clone();
2563            Box::new(move || {
2564                let consume = consume.clone();
2565                async move { consume(None).await }.boxed()
2566            })
2567        };
2568
2569        match &transmit.write {
2570            WriteState::Open => {
2571                transmit.read = ReadState::HostReady {
2572                    consume,
2573                    guest_offset: 0,
2574                    cancel: false,
2575                    cancel_waker: None,
2576                };
2577            }
2578            &WriteState::GuestReady { .. } => {
2579                let future = consume();
2580                transmit.read = ReadState::HostReady {
2581                    consume,
2582                    guest_offset: 0,
2583                    cancel: false,
2584                    cancel_waker: None,
2585                };
2586                self.0.pipe_from_guest(kind, id, future);
2587            }
2588            WriteState::HostReady { .. } => {
2589                let WriteState::HostReady { produce, .. } = mem::replace(
2590                    &mut transmit.write,
2591                    WriteState::HostReady {
2592                        produce: Box::new(|| unreachable!()),
2593                        try_into: Box::new(|_| unreachable!()),
2594                        guest_offset: 0,
2595                        cancel: false,
2596                        cancel_waker: None,
2597                    },
2598                ) else {
2599                    unreachable!();
2600                };
2601
2602                transmit.read = ReadState::HostToHost {
2603                    accept: Box::new(move |input| {
2604                        let consume = consume_with_buffer.clone();
2605                        async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
2606                    }),
2607                    buffer: Vec::new(),
2608                    limit: 0,
2609                };
2610
2611                let future = async move {
2612                    loop {
2613                        if tls::get(|store| {
2614                            anyhow::Ok(matches!(
2615                                store.concurrent_state_mut().get_mut(id)?.read,
2616                                ReadState::Dropped
2617                            ))
2618                        })? {
2619                            break Ok(());
2620                        }
2621
2622                        match produce().await? {
2623                            StreamResult::Completed | StreamResult::Cancelled => {}
2624                            StreamResult::Dropped => break Ok(()),
2625                        }
2626
2627                        if let TransmitKind::Future = kind {
2628                            break Ok(());
2629                        }
2630                    }
2631                }
2632                .map(move |result| {
2633                    tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
2634                    result
2635                });
2636
2637                state.push_future(Box::pin(future));
2638            }
2639            WriteState::Dropped => {
2640                let reader = transmit.read_handle;
2641                self.0.host_drop_reader(reader, kind).unwrap();
2642            }
2643        }
2644    }
2645}
2646
2647async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
2648    token: StoreToken<D>,
2649    id: TableId<TransmitState>,
2650    pair: Arc<Mutex<Option<(P, B)>>>,
2651    kind: TransmitKind,
2652) -> Result<()> {
2653    let (read, guest_offset) = tls::get(|store| {
2654        let transmit = store.concurrent_state_mut().get_mut(id)?;
2655
2656        let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
2657            Some(guest_offset)
2658        } else {
2659            None
2660        };
2661
2662        anyhow::Ok((
2663            mem::replace(&mut transmit.read, ReadState::Open),
2664            guest_offset,
2665        ))
2666    })?;
2667
2668    match read {
2669        ReadState::GuestReady {
2670            ty,
2671            flat_abi,
2672            options,
2673            address,
2674            count,
2675            handle,
2676            instance,
2677            caller,
2678        } => {
2679            let guest_offset = guest_offset.unwrap();
2680
2681            if let TransmitKind::Future = kind {
2682                tls::get(|store| {
2683                    store.concurrent_state_mut().get_mut(id)?.done = true;
2684                    anyhow::Ok(())
2685                })?;
2686            }
2687
2688            let old_remaining = pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2689            let accept = {
2690                let pair = pair.clone();
2691                move |mut store: StoreContextMut<D>| {
2692                    lower::<T, B, D>(
2693                        store.as_context_mut(),
2694                        instance,
2695                        options,
2696                        ty,
2697                        address + (T::SIZE32 * guest_offset),
2698                        count - guest_offset,
2699                        &mut pair.lock().unwrap().as_mut().unwrap().1,
2700                    )?;
2701                    anyhow::Ok(())
2702                }
2703            };
2704
2705            if guest_offset < count {
2706                if T::MAY_REQUIRE_REALLOC {
2707                    // For payloads which may require a realloc call, use a
2708                    // oneshot::channel and background task.  This is
2709                    // necessary because calling the guest while there are
2710                    // host embedder frames on the stack is unsound.
2711                    let (tx, rx) = oneshot::channel();
2712                    tls::get(move |store| {
2713                        store
2714                            .concurrent_state_mut()
2715                            .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2716                                move |store| {
2717                                    _ = tx.send(accept(token.as_context_mut(store))?);
2718                                    Ok(())
2719                                },
2720                            ))))
2721                    });
2722                    rx.await?
2723                } else {
2724                    // Optimize flat payloads (i.e. those which do not
2725                    // require calling the guest's realloc function) by
2726                    // lowering directly instead of using a oneshot::channel
2727                    // and background task.
2728                    tls::get(|store| accept(token.as_context_mut(store)))?
2729                };
2730            }
2731
2732            tls::get(|store| {
2733                let count =
2734                    old_remaining - pair.lock().unwrap().as_mut().unwrap().1.remaining().len();
2735
2736                let transmit = store.concurrent_state_mut().get_mut(id)?;
2737
2738                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2739                    unreachable!();
2740                };
2741
2742                *guest_offset += count;
2743
2744                transmit.read = ReadState::GuestReady {
2745                    ty,
2746                    flat_abi,
2747                    options,
2748                    address,
2749                    count,
2750                    handle,
2751                    instance,
2752                    caller,
2753                };
2754
2755                anyhow::Ok(())
2756            })?;
2757
2758            Ok(())
2759        }
2760
2761        ReadState::HostToHost {
2762            accept,
2763            mut buffer,
2764            limit,
2765        } => {
2766            let mut state = StreamResult::Completed;
2767            let mut position = 0;
2768
2769            while !matches!(state, StreamResult::Dropped) && position < limit {
2770                let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
2771                state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
2772                (buffer, position, _) = slice_buffer.into_parts();
2773            }
2774
2775            {
2776                let (mine, mut buffer) = pair.lock().unwrap().take().unwrap();
2777
2778                while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
2779                    state = accept(&mut UntypedWriteBuffer::new(&mut buffer)).await?;
2780                }
2781
2782                *pair.lock().unwrap() = Some((mine, buffer));
2783            }
2784
2785            tls::get(|store| {
2786                store.concurrent_state_mut().get_mut(id)?.read = match state {
2787                    StreamResult::Dropped => ReadState::Dropped,
2788                    StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
2789                        accept,
2790                        buffer,
2791                        limit: 0,
2792                    },
2793                };
2794
2795                anyhow::Ok(())
2796            })?;
2797            Ok(())
2798        }
2799
2800        _ => unreachable!(),
2801    }
2802}
2803
2804impl Instance {
2805    /// Handle a host- or guest-initiated write by delivering the item(s) to the
2806    /// `StreamConsumer` for the specified stream or future.
2807    fn consume(
2808        self,
2809        store: &mut dyn VMStore,
2810        kind: TransmitKind,
2811        transmit_id: TableId<TransmitState>,
2812        consume: PollStream,
2813        guest_offset: usize,
2814        cancel: bool,
2815    ) -> Result<ReturnCode> {
2816        let mut future = consume();
2817        store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
2818            consume,
2819            guest_offset,
2820            cancel,
2821            cancel_waker: None,
2822        };
2823        let poll = tls::set(store, || {
2824            future
2825                .as_mut()
2826                .poll(&mut Context::from_waker(&Waker::noop()))
2827        });
2828
2829        Ok(match poll {
2830            Poll::Ready(state) => {
2831                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2832                let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
2833                    unreachable!();
2834                };
2835                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2836                transmit.write = WriteState::Open;
2837                code
2838            }
2839            Poll::Pending => {
2840                store.pipe_from_guest(kind, transmit_id, future);
2841                ReturnCode::Blocked
2842            }
2843        })
2844    }
2845
2846    /// Handle a host- or guest-initiated read by polling the `StreamProducer`
2847    /// for the specified stream or future for items.
2848    fn produce(
2849        self,
2850        store: &mut dyn VMStore,
2851        kind: TransmitKind,
2852        transmit_id: TableId<TransmitState>,
2853        produce: PollStream,
2854        try_into: TryInto,
2855        guest_offset: usize,
2856        cancel: bool,
2857    ) -> Result<ReturnCode> {
2858        let mut future = produce();
2859        store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
2860            produce,
2861            try_into,
2862            guest_offset,
2863            cancel,
2864            cancel_waker: None,
2865        };
2866        let poll = tls::set(store, || {
2867            future
2868                .as_mut()
2869                .poll(&mut Context::from_waker(&Waker::noop()))
2870        });
2871
2872        Ok(match poll {
2873            Poll::Ready(state) => {
2874                let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
2875                let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
2876                    unreachable!();
2877                };
2878                let code = return_code(kind, state?, mem::replace(guest_offset, 0));
2879                transmit.read = ReadState::Open;
2880                code
2881            }
2882            Poll::Pending => {
2883                store.pipe_to_guest(kind, transmit_id, future);
2884                ReturnCode::Blocked
2885            }
2886        })
2887    }
2888
2889    /// Drop the writable end of the specified stream or future from the guest.
2890    pub(super) fn guest_drop_writable(
2891        self,
2892        store: &mut StoreOpaque,
2893        ty: TransmitIndex,
2894        writer: u32,
2895    ) -> Result<()> {
2896        let table = self.id().get_mut(store).table_for_transmit(ty);
2897        let transmit_rep = match ty {
2898            TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
2899            TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
2900        };
2901
2902        let id = TableId::<TransmitHandle>::new(transmit_rep);
2903        log::trace!("guest_drop_writable: drop writer {id:?}");
2904        match ty {
2905            TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
2906            TransmitIndex::Future(_) => store.host_drop_writer(
2907                id,
2908                Some(|| {
2909                    Err(anyhow!(
2910                        "cannot drop future write end without first writing a value"
2911                    ))
2912                }),
2913            ),
2914        }
2915    }
2916
2917    /// Copy `count` items from `read_address` to `write_address` for the
2918    /// specified stream or future.
2919    fn copy<T: 'static>(
2920        self,
2921        mut store: StoreContextMut<T>,
2922        flat_abi: Option<FlatAbi>,
2923        write_caller: RuntimeComponentInstanceIndex,
2924        write_ty: TransmitIndex,
2925        write_options: OptionsIndex,
2926        write_address: usize,
2927        read_caller: RuntimeComponentInstanceIndex,
2928        read_ty: TransmitIndex,
2929        read_options: OptionsIndex,
2930        read_address: usize,
2931        count: usize,
2932        rep: u32,
2933    ) -> Result<()> {
2934        let types = self.id().get(store.0).component().types();
2935        match (write_ty, read_ty) {
2936            (TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
2937                assert_eq!(count, 1);
2938
2939                let payload = types[types[write_ty].ty].payload;
2940
2941                if write_caller == read_caller && payload.is_some() {
2942                    bail!(
2943                        "cannot read from and write to intra-component future with non-unit payload"
2944                    )
2945                }
2946
2947                let val = payload
2948                    .map(|ty| {
2949                        let lift =
2950                            &mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
2951
2952                        let abi = lift.types.canonical_abi(&ty);
2953                        // FIXME: needs to read an i64 for memory64
2954                        if write_address % usize::try_from(abi.align32)? != 0 {
2955                            bail!("write pointer not aligned");
2956                        }
2957
2958                        let bytes = lift
2959                            .memory()
2960                            .get(write_address..)
2961                            .and_then(|b| b.get(..usize::try_from(abi.size32).unwrap()))
2962                            .ok_or_else(|| {
2963                                anyhow::anyhow!("write pointer out of bounds of memory")
2964                            })?;
2965
2966                        Val::load(lift, ty, bytes)
2967                    })
2968                    .transpose()?;
2969
2970                if let Some(val) = val {
2971                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
2972                    let types = lower.types;
2973                    let ty = types[types[read_ty].ty].payload.unwrap();
2974                    let ptr = func::validate_inbounds_dynamic(
2975                        types.canonical_abi(&ty),
2976                        lower.as_slice_mut(),
2977                        &ValRaw::u32(read_address.try_into().unwrap()),
2978                    )?;
2979                    val.store(lower, ty, ptr)?;
2980                }
2981            }
2982            (TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
2983                if let Some(flat_abi) = flat_abi {
2984                    if write_caller == read_caller && types[types[write_ty].ty].payload.is_some() {
2985                        bail!(
2986                            "cannot read from and write to intra-component stream with non-unit payload"
2987                        )
2988                    }
2989
2990                    // Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
2991                    let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
2992                    if length_in_bytes > 0 {
2993                        if write_address % usize::try_from(flat_abi.align)? != 0 {
2994                            bail!("write pointer not aligned");
2995                        }
2996                        if read_address % usize::try_from(flat_abi.align)? != 0 {
2997                            bail!("read pointer not aligned");
2998                        }
2999
3000                        let store_opaque = store.0.store_opaque_mut();
3001
3002                        {
3003                            let src = self
3004                                .options_memory(store_opaque, write_options)
3005                                .get(write_address..)
3006                                .and_then(|b| b.get(..length_in_bytes))
3007                                .ok_or_else(|| {
3008                                    anyhow::anyhow!("write pointer out of bounds of memory")
3009                                })?
3010                                .as_ptr();
3011                            let dst = self
3012                                .options_memory_mut(store_opaque, read_options)
3013                                .get_mut(read_address..)
3014                                .and_then(|b| b.get_mut(..length_in_bytes))
3015                                .ok_or_else(|| {
3016                                    anyhow::anyhow!("read pointer out of bounds of memory")
3017                                })?
3018                                .as_mut_ptr();
3019                            // SAFETY: Both `src` and `dst` have been validated
3020                            // above.
3021                            unsafe { src.copy_to(dst, length_in_bytes) };
3022                        }
3023                    }
3024                } else {
3025                    let store_opaque = store.0.store_opaque_mut();
3026                    let lift = &mut LiftContext::new(store_opaque, write_options, self);
3027                    let ty = lift.types[lift.types[write_ty].ty].payload.unwrap();
3028                    let abi = lift.types.canonical_abi(&ty);
3029                    let size = usize::try_from(abi.size32).unwrap();
3030                    if write_address % usize::try_from(abi.align32)? != 0 {
3031                        bail!("write pointer not aligned");
3032                    }
3033                    let bytes = lift
3034                        .memory()
3035                        .get(write_address..)
3036                        .and_then(|b| b.get(..size * count))
3037                        .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?;
3038
3039                    let values = (0..count)
3040                        .map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3041                        .collect::<Result<Vec<_>>>()?;
3042
3043                    let id = TableId::<TransmitHandle>::new(rep);
3044                    log::trace!("copy values {values:?} for {id:?}");
3045
3046                    let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3047                    let ty = lower.types[lower.types[read_ty].ty].payload.unwrap();
3048                    let abi = lower.types.canonical_abi(&ty);
3049                    if read_address % usize::try_from(abi.align32)? != 0 {
3050                        bail!("read pointer not aligned");
3051                    }
3052                    let size = usize::try_from(abi.size32).unwrap();
3053                    lower
3054                        .as_slice_mut()
3055                        .get_mut(read_address..)
3056                        .and_then(|b| b.get_mut(..size * count))
3057                        .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?;
3058                    let mut ptr = read_address;
3059                    for value in values {
3060                        value.store(lower, ty, ptr)?;
3061                        ptr += size
3062                    }
3063                }
3064            }
3065            _ => unreachable!(),
3066        }
3067
3068        Ok(())
3069    }
3070
3071    fn check_bounds(
3072        self,
3073        store: &StoreOpaque,
3074        options: OptionsIndex,
3075        ty: TransmitIndex,
3076        address: usize,
3077        count: usize,
3078    ) -> Result<()> {
3079        let types = self.id().get(store).component().types();
3080        let size = usize::try_from(
3081            match ty {
3082                TransmitIndex::Future(ty) => types[types[ty].ty]
3083                    .payload
3084                    .map(|ty| types.canonical_abi(&ty).size32),
3085                TransmitIndex::Stream(ty) => types[types[ty].ty]
3086                    .payload
3087                    .map(|ty| types.canonical_abi(&ty).size32),
3088            }
3089            .unwrap_or(0),
3090        )
3091        .unwrap();
3092
3093        if count > 0 && size > 0 {
3094            self.options_memory(store, options)
3095                .get(address..)
3096                .and_then(|b| b.get(..(size * count)))
3097                .map(drop)
3098                .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))
3099        } else {
3100            Ok(())
3101        }
3102    }
3103
3104    /// Write to the specified stream or future from the guest.
3105    pub(super) fn guest_write<T: 'static>(
3106        self,
3107        mut store: StoreContextMut<T>,
3108        caller: RuntimeComponentInstanceIndex,
3109        ty: TransmitIndex,
3110        options: OptionsIndex,
3111        flat_abi: Option<FlatAbi>,
3112        handle: u32,
3113        address: u32,
3114        count: u32,
3115    ) -> Result<ReturnCode> {
3116        let address = usize::try_from(address).unwrap();
3117        let count = usize::try_from(count).unwrap();
3118        self.check_bounds(store.0, options, ty, address, count)?;
3119        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3120        let TransmitLocalState::Write { done } = *state else {
3121            bail!(
3122                "invalid handle {handle}; expected `Write`; got {:?}",
3123                *state
3124            );
3125        };
3126
3127        if done {
3128            bail!("cannot write to stream after being notified that the readable end dropped");
3129        }
3130
3131        *state = TransmitLocalState::Busy;
3132        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3133        let concurrent_state = store.0.concurrent_state_mut();
3134        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3135        let transmit = concurrent_state.get_mut(transmit_id)?;
3136        log::trace!(
3137            "guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3138            transmit.read
3139        );
3140
3141        if transmit.done {
3142            bail!("cannot write to future after previous write succeeded or readable end dropped");
3143        }
3144
3145        let new_state = if let ReadState::Dropped = &transmit.read {
3146            ReadState::Dropped
3147        } else {
3148            ReadState::Open
3149        };
3150
3151        let set_guest_ready = |me: &mut ConcurrentState| {
3152            let transmit = me.get_mut(transmit_id)?;
3153            assert!(
3154                matches!(&transmit.write, WriteState::Open),
3155                "expected `WriteState::Open`; got `{:?}`",
3156                transmit.write
3157            );
3158            transmit.write = WriteState::GuestReady {
3159                instance: self,
3160                caller,
3161                ty,
3162                flat_abi,
3163                options,
3164                address,
3165                count,
3166                handle,
3167            };
3168            Ok::<_, crate::Error>(())
3169        };
3170
3171        let mut result = match mem::replace(&mut transmit.read, new_state) {
3172            ReadState::GuestReady {
3173                ty: read_ty,
3174                flat_abi: read_flat_abi,
3175                options: read_options,
3176                address: read_address,
3177                count: read_count,
3178                handle: read_handle,
3179                instance: read_instance,
3180                caller: read_caller,
3181            } => {
3182                assert_eq!(flat_abi, read_flat_abi);
3183
3184                if let TransmitIndex::Future(_) = ty {
3185                    transmit.done = true;
3186                }
3187
3188                // Note that zero-length reads and writes are handling specially
3189                // by the spec to allow each end to signal readiness to the
3190                // other.  Quoting the spec:
3191                //
3192                // ```
3193                // The meaning of a read or write when the length is 0 is that
3194                // the caller is querying the "readiness" of the other
3195                // side. When a 0-length read/write rendezvous with a
3196                // non-0-length read/write, only the 0-length read/write
3197                // completes; the non-0-length read/write is kept pending (and
3198                // ready for a subsequent rendezvous).
3199                //
3200                // In the corner case where a 0-length read and write
3201                // rendezvous, only the writer is notified of readiness. To
3202                // avoid livelock, the Canonical ABI requires that a writer must
3203                // (eventually) follow a completed 0-length write with a
3204                // non-0-length write that is allowed to block (allowing the
3205                // reader end to run and rendezvous with its own non-0-length
3206                // read).
3207                // ```
3208
3209                let write_complete = count == 0 || read_count > 0;
3210                let read_complete = count > 0;
3211                let read_buffer_remaining = count < read_count;
3212
3213                let read_handle_rep = transmit.read_handle.rep();
3214
3215                let count = count.min(read_count);
3216
3217                self.copy(
3218                    store.as_context_mut(),
3219                    flat_abi,
3220                    caller,
3221                    ty,
3222                    options,
3223                    address,
3224                    read_caller,
3225                    read_ty,
3226                    read_options,
3227                    read_address,
3228                    count,
3229                    rep,
3230                )?;
3231
3232                let instance = self.id().get_mut(store.0);
3233                let types = instance.component().types();
3234                let item_size = payload(ty, types)
3235                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3236                    .unwrap_or(0);
3237                let concurrent_state = store.0.concurrent_state_mut();
3238                if read_complete {
3239                    let count = u32::try_from(count).unwrap();
3240                    let total = if let Some(Event::StreamRead {
3241                        code: ReturnCode::Completed(old_total),
3242                        ..
3243                    }) = concurrent_state.take_event(read_handle_rep)?
3244                    {
3245                        count + old_total
3246                    } else {
3247                        count
3248                    };
3249
3250                    let code = ReturnCode::completed(ty.kind(), total);
3251
3252                    concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
3253                }
3254
3255                if read_buffer_remaining {
3256                    let transmit = concurrent_state.get_mut(transmit_id)?;
3257                    transmit.read = ReadState::GuestReady {
3258                        ty: read_ty,
3259                        flat_abi: read_flat_abi,
3260                        options: read_options,
3261                        address: read_address + (count * item_size),
3262                        count: read_count - count,
3263                        handle: read_handle,
3264                        instance: read_instance,
3265                        caller: read_caller,
3266                    };
3267                }
3268
3269                if write_complete {
3270                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3271                } else {
3272                    set_guest_ready(concurrent_state)?;
3273                    ReturnCode::Blocked
3274                }
3275            }
3276
3277            ReadState::HostReady {
3278                consume,
3279                guest_offset,
3280                cancel,
3281                cancel_waker,
3282            } => {
3283                assert!(cancel_waker.is_none());
3284                assert!(!cancel);
3285                assert_eq!(0, guest_offset);
3286
3287                if let TransmitIndex::Future(_) = ty {
3288                    transmit.done = true;
3289                }
3290
3291                set_guest_ready(concurrent_state)?;
3292                self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)?
3293            }
3294
3295            ReadState::HostToHost { .. } => unreachable!(),
3296
3297            ReadState::Open => {
3298                set_guest_ready(concurrent_state)?;
3299                ReturnCode::Blocked
3300            }
3301
3302            ReadState::Dropped => {
3303                if let TransmitIndex::Future(_) = ty {
3304                    transmit.done = true;
3305                }
3306
3307                ReturnCode::Dropped(0)
3308            }
3309        };
3310
3311        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3312            result = self.wait_for_write(store.0, transmit_handle)?;
3313        }
3314
3315        if result != ReturnCode::Blocked {
3316            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3317                TransmitLocalState::Write {
3318                    done: matches!(
3319                        (result, ty),
3320                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3321                    ),
3322                };
3323        }
3324
3325        log::trace!(
3326            "guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3327        );
3328
3329        Ok(result)
3330    }
3331
3332    /// Read from the specified stream or future from the guest.
3333    pub(super) fn guest_read<T: 'static>(
3334        self,
3335        mut store: StoreContextMut<T>,
3336        caller: RuntimeComponentInstanceIndex,
3337        ty: TransmitIndex,
3338        options: OptionsIndex,
3339        flat_abi: Option<FlatAbi>,
3340        handle: u32,
3341        address: u32,
3342        count: u32,
3343    ) -> Result<ReturnCode> {
3344        let address = usize::try_from(address).unwrap();
3345        let count = usize::try_from(count).unwrap();
3346        self.check_bounds(store.0, options, ty, address, count)?;
3347        let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
3348        let TransmitLocalState::Read { done } = *state else {
3349            bail!("invalid handle {handle}; expected `Read`; got {:?}", *state);
3350        };
3351
3352        if done {
3353            bail!("cannot read from stream after being notified that the writable end dropped");
3354        }
3355
3356        *state = TransmitLocalState::Busy;
3357        let transmit_handle = TableId::<TransmitHandle>::new(rep);
3358        let concurrent_state = store.0.concurrent_state_mut();
3359        let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
3360        let transmit = concurrent_state.get_mut(transmit_id)?;
3361        log::trace!(
3362            "guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
3363            transmit.write
3364        );
3365
3366        if transmit.done {
3367            bail!("cannot read from future after previous read succeeded");
3368        }
3369
3370        let new_state = if let WriteState::Dropped = &transmit.write {
3371            WriteState::Dropped
3372        } else {
3373            WriteState::Open
3374        };
3375
3376        let set_guest_ready = |me: &mut ConcurrentState| {
3377            let transmit = me.get_mut(transmit_id)?;
3378            assert!(
3379                matches!(&transmit.read, ReadState::Open),
3380                "expected `ReadState::Open`; got `{:?}`",
3381                transmit.read
3382            );
3383            transmit.read = ReadState::GuestReady {
3384                ty,
3385                flat_abi,
3386                options,
3387                address,
3388                count,
3389                handle,
3390                instance: self,
3391                caller,
3392            };
3393            Ok::<_, crate::Error>(())
3394        };
3395
3396        let mut result = match mem::replace(&mut transmit.write, new_state) {
3397            WriteState::GuestReady {
3398                instance: _,
3399                ty: write_ty,
3400                flat_abi: write_flat_abi,
3401                options: write_options,
3402                address: write_address,
3403                count: write_count,
3404                handle: write_handle,
3405                caller: write_caller,
3406            } => {
3407                assert_eq!(flat_abi, write_flat_abi);
3408
3409                if let TransmitIndex::Future(_) = ty {
3410                    transmit.done = true;
3411                }
3412
3413                let write_handle_rep = transmit.write_handle.rep();
3414
3415                // See the comment in `guest_write` for the
3416                // `ReadState::GuestReady` case concerning zero-length reads and
3417                // writes.
3418
3419                let write_complete = write_count == 0 || count > 0;
3420                let read_complete = write_count > 0;
3421                let write_buffer_remaining = count < write_count;
3422
3423                let count = count.min(write_count);
3424
3425                self.copy(
3426                    store.as_context_mut(),
3427                    flat_abi,
3428                    write_caller,
3429                    write_ty,
3430                    write_options,
3431                    write_address,
3432                    caller,
3433                    ty,
3434                    options,
3435                    address,
3436                    count,
3437                    rep,
3438                )?;
3439
3440                let instance = self.id().get_mut(store.0);
3441                let types = instance.component().types();
3442                let item_size = payload(ty, types)
3443                    .map(|ty| usize::try_from(types.canonical_abi(&ty).size32).unwrap())
3444                    .unwrap_or(0);
3445                let concurrent_state = store.0.concurrent_state_mut();
3446
3447                if write_complete {
3448                    let count = u32::try_from(count).unwrap();
3449                    let total = if let Some(Event::StreamWrite {
3450                        code: ReturnCode::Completed(old_total),
3451                        ..
3452                    }) = concurrent_state.take_event(write_handle_rep)?
3453                    {
3454                        count + old_total
3455                    } else {
3456                        count
3457                    };
3458
3459                    let code = ReturnCode::completed(ty.kind(), total);
3460
3461                    concurrent_state.send_write_result(
3462                        write_ty,
3463                        transmit_id,
3464                        write_handle,
3465                        code,
3466                    )?;
3467                }
3468
3469                if write_buffer_remaining {
3470                    let transmit = concurrent_state.get_mut(transmit_id)?;
3471                    transmit.write = WriteState::GuestReady {
3472                        instance: self,
3473                        caller: write_caller,
3474                        ty: write_ty,
3475                        flat_abi: write_flat_abi,
3476                        options: write_options,
3477                        address: write_address + (count * item_size),
3478                        count: write_count - count,
3479                        handle: write_handle,
3480                    };
3481                }
3482
3483                if read_complete {
3484                    ReturnCode::completed(ty.kind(), count.try_into().unwrap())
3485                } else {
3486                    set_guest_ready(concurrent_state)?;
3487                    ReturnCode::Blocked
3488                }
3489            }
3490
3491            WriteState::HostReady {
3492                produce,
3493                try_into,
3494                guest_offset,
3495                cancel,
3496                cancel_waker,
3497            } => {
3498                assert!(cancel_waker.is_none());
3499                assert!(!cancel);
3500                assert_eq!(0, guest_offset);
3501
3502                set_guest_ready(concurrent_state)?;
3503
3504                let code =
3505                    self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?;
3506
3507                if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
3508                    store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
3509                }
3510
3511                code
3512            }
3513
3514            WriteState::Open => {
3515                set_guest_ready(concurrent_state)?;
3516                ReturnCode::Blocked
3517            }
3518
3519            WriteState::Dropped => ReturnCode::Dropped(0),
3520        };
3521
3522        if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
3523            result = self.wait_for_read(store.0, transmit_handle)?;
3524        }
3525
3526        if result != ReturnCode::Blocked {
3527            *self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
3528                TransmitLocalState::Read {
3529                    done: matches!(
3530                        (result, ty),
3531                        (ReturnCode::Dropped(_), TransmitIndex::Stream(_))
3532                    ),
3533                };
3534        }
3535
3536        log::trace!(
3537            "guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3538        );
3539
3540        Ok(result)
3541    }
3542
3543    fn wait_for_write(
3544        self,
3545        store: &mut StoreOpaque,
3546        handle: TableId<TransmitHandle>,
3547    ) -> Result<ReturnCode> {
3548        let waitable = Waitable::Transmit(handle);
3549        store.wait_for_event(waitable)?;
3550        let event = waitable.take_event(store.concurrent_state_mut())?;
3551        if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
3552            event
3553        {
3554            waitable.on_delivery(store, self, event);
3555            Ok(code)
3556        } else {
3557            unreachable!()
3558        }
3559    }
3560
3561    /// Cancel a pending stream or future write.
3562    fn cancel_write(
3563        self,
3564        store: &mut StoreOpaque,
3565        transmit_id: TableId<TransmitState>,
3566        async_: bool,
3567    ) -> Result<ReturnCode> {
3568        let state = store.concurrent_state_mut();
3569        let transmit = state.get_mut(transmit_id)?;
3570        log::trace!(
3571            "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
3572            transmit.read,
3573            transmit.write
3574        );
3575
3576        let code = if let Some(event) =
3577            Waitable::Transmit(transmit.write_handle).take_event(state)?
3578        {
3579            let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
3580                unreachable!();
3581            };
3582            match (code, event) {
3583                (ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
3584                    ReturnCode::Cancelled(count)
3585                }
3586                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3587                _ => unreachable!(),
3588            }
3589        } else if let ReadState::HostReady {
3590            cancel,
3591            cancel_waker,
3592            ..
3593        } = &mut state.get_mut(transmit_id)?.read
3594        {
3595            *cancel = true;
3596            if let Some(waker) = cancel_waker.take() {
3597                waker.wake();
3598            }
3599
3600            if async_ {
3601                ReturnCode::Blocked
3602            } else {
3603                let handle = store
3604                    .concurrent_state_mut()
3605                    .get_mut(transmit_id)?
3606                    .write_handle;
3607                self.wait_for_write(store, handle)?
3608            }
3609        } else {
3610            ReturnCode::Cancelled(0)
3611        };
3612
3613        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3614
3615        match &transmit.write {
3616            WriteState::GuestReady { .. } => {
3617                transmit.write = WriteState::Open;
3618            }
3619            WriteState::HostReady { .. } => todo!("support host write cancellation"),
3620            WriteState::Open | WriteState::Dropped => {}
3621        }
3622
3623        log::trace!("cancelled write {transmit_id:?}: {code:?}");
3624
3625        Ok(code)
3626    }
3627
3628    fn wait_for_read(
3629        self,
3630        store: &mut StoreOpaque,
3631        handle: TableId<TransmitHandle>,
3632    ) -> Result<ReturnCode> {
3633        let waitable = Waitable::Transmit(handle);
3634        store.wait_for_event(waitable)?;
3635        let event = waitable.take_event(store.concurrent_state_mut())?;
3636        if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
3637            event
3638        {
3639            waitable.on_delivery(store, self, event);
3640            Ok(code)
3641        } else {
3642            unreachable!()
3643        }
3644    }
3645
3646    /// Cancel a pending stream or future read.
3647    fn cancel_read(
3648        self,
3649        store: &mut StoreOpaque,
3650        transmit_id: TableId<TransmitState>,
3651        async_: bool,
3652    ) -> Result<ReturnCode> {
3653        let state = store.concurrent_state_mut();
3654        let transmit = state.get_mut(transmit_id)?;
3655        log::trace!(
3656            "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
3657            transmit.read,
3658            transmit.write
3659        );
3660
3661        let code = if let Some(event) =
3662            Waitable::Transmit(transmit.read_handle).take_event(state)?
3663        {
3664            let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
3665                unreachable!();
3666            };
3667            match (code, event) {
3668                (ReturnCode::Completed(count), Event::StreamRead { .. }) => {
3669                    ReturnCode::Cancelled(count)
3670                }
3671                (ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
3672                _ => unreachable!(),
3673            }
3674        } else if let WriteState::HostReady {
3675            cancel,
3676            cancel_waker,
3677            ..
3678        } = &mut state.get_mut(transmit_id)?.write
3679        {
3680            *cancel = true;
3681            if let Some(waker) = cancel_waker.take() {
3682                waker.wake();
3683            }
3684
3685            if async_ {
3686                ReturnCode::Blocked
3687            } else {
3688                let handle = store
3689                    .concurrent_state_mut()
3690                    .get_mut(transmit_id)?
3691                    .read_handle;
3692                self.wait_for_read(store, handle)?
3693            }
3694        } else {
3695            ReturnCode::Cancelled(0)
3696        };
3697
3698        let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3699
3700        match &transmit.read {
3701            ReadState::GuestReady { .. } => {
3702                transmit.read = ReadState::Open;
3703            }
3704            ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
3705                todo!("support host read cancellation")
3706            }
3707            ReadState::Open | ReadState::Dropped => {}
3708        }
3709
3710        log::trace!("cancelled read {transmit_id:?}: {code:?}");
3711
3712        Ok(code)
3713    }
3714
3715    /// Cancel a pending write for the specified stream or future from the guest.
3716    fn guest_cancel_write(
3717        self,
3718        store: &mut StoreOpaque,
3719        ty: TransmitIndex,
3720        async_: bool,
3721        writer: u32,
3722    ) -> Result<ReturnCode> {
3723        let (rep, state) =
3724            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
3725        let id = TableId::<TransmitHandle>::new(rep);
3726        log::trace!("guest cancel write {id:?} (handle {writer})");
3727        match state {
3728            TransmitLocalState::Write { .. } => {
3729                bail!("stream or future write cancelled when no write is pending")
3730            }
3731            TransmitLocalState::Read { .. } => {
3732                bail!("passed read end to `{{stream|future}}.cancel-write`")
3733            }
3734            TransmitLocalState::Busy => {}
3735        }
3736        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3737        let code = self.cancel_write(store, transmit_id, async_)?;
3738        if !matches!(code, ReturnCode::Blocked) {
3739            let state =
3740                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
3741                    .1;
3742            if let TransmitLocalState::Busy = state {
3743                *state = TransmitLocalState::Write { done: false };
3744            }
3745        }
3746        Ok(code)
3747    }
3748
3749    /// Cancel a pending read for the specified stream or future from the guest.
3750    fn guest_cancel_read(
3751        self,
3752        store: &mut StoreOpaque,
3753        ty: TransmitIndex,
3754        async_: bool,
3755        reader: u32,
3756    ) -> Result<ReturnCode> {
3757        let (rep, state) =
3758            get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
3759        let id = TableId::<TransmitHandle>::new(rep);
3760        log::trace!("guest cancel read {id:?} (handle {reader})");
3761        match state {
3762            TransmitLocalState::Read { .. } => {
3763                bail!("stream or future read cancelled when no read is pending")
3764            }
3765            TransmitLocalState::Write { .. } => {
3766                bail!("passed write end to `{{stream|future}}.cancel-read`")
3767            }
3768            TransmitLocalState::Busy => {}
3769        }
3770        let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
3771        let code = self.cancel_read(store, transmit_id, async_)?;
3772        if !matches!(code, ReturnCode::Blocked) {
3773            let state =
3774                get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
3775                    .1;
3776            if let TransmitLocalState::Busy = state {
3777                *state = TransmitLocalState::Read { done: false };
3778            }
3779        }
3780        Ok(code)
3781    }
3782
3783    /// Drop the readable end of the specified stream or future from the guest.
3784    fn guest_drop_readable(
3785        self,
3786        store: &mut StoreOpaque,
3787        ty: TransmitIndex,
3788        reader: u32,
3789    ) -> Result<()> {
3790        let table = self.id().get_mut(store).table_for_transmit(ty);
3791        let (rep, _is_done) = match ty {
3792            TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
3793            TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
3794        };
3795        let kind = match ty {
3796            TransmitIndex::Stream(_) => TransmitKind::Stream,
3797            TransmitIndex::Future(_) => TransmitKind::Future,
3798        };
3799        let id = TableId::<TransmitHandle>::new(rep);
3800        log::trace!("guest_drop_readable: drop reader {id:?}");
3801        store.host_drop_reader(id, kind)
3802    }
3803
3804    /// Create a new error context for the given component.
3805    pub(crate) fn error_context_new(
3806        self,
3807        store: &mut StoreOpaque,
3808        caller: RuntimeComponentInstanceIndex,
3809        ty: TypeComponentLocalErrorContextTableIndex,
3810        options: OptionsIndex,
3811        debug_msg_address: u32,
3812        debug_msg_len: u32,
3813    ) -> Result<u32> {
3814        self.id().get(store).check_may_leave(caller)?;
3815        let lift_ctx = &mut LiftContext::new(store, options, self);
3816        let debug_msg = String::linear_lift_from_flat(
3817            lift_ctx,
3818            InterfaceType::String,
3819            &[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
3820        )?;
3821
3822        // Create a new ErrorContext that is tracked along with other concurrent state
3823        let err_ctx = ErrorContextState { debug_msg };
3824        let state = store.concurrent_state_mut();
3825        let table_id = state.push(err_ctx)?;
3826        let global_ref_count_idx =
3827            TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
3828
3829        // Add to the global error context ref counts
3830        let _ = state
3831            .global_error_context_ref_counts
3832            .insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
3833
3834        // Error context are tracked both locally (to a single component instance) and globally
3835        // the counts for both must stay in sync.
3836        //
3837        // Here we reflect the newly created global concurrent error context state into the
3838        // component instance's locally tracked count, along with the appropriate key into the global
3839        // ref tracking data structures to enable later lookup
3840        let local_idx = self
3841            .id()
3842            .get_mut(store)
3843            .table_for_error_context(ty)
3844            .error_context_insert(table_id.rep())?;
3845
3846        Ok(local_idx)
3847    }
3848
3849    /// Retrieve the debug message from the specified error context.
3850    pub(super) fn error_context_debug_message<T>(
3851        self,
3852        store: StoreContextMut<T>,
3853        ty: TypeComponentLocalErrorContextTableIndex,
3854        options: OptionsIndex,
3855        err_ctx_handle: u32,
3856        debug_msg_address: u32,
3857    ) -> Result<()> {
3858        // Retrieve the error context and internal debug message
3859        let handle_table_id_rep = self
3860            .id()
3861            .get_mut(store.0)
3862            .table_for_error_context(ty)
3863            .error_context_rep(err_ctx_handle)?;
3864
3865        let state = store.0.concurrent_state_mut();
3866        // Get the state associated with the error context
3867        let ErrorContextState { debug_msg } =
3868            state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
3869        let debug_msg = debug_msg.clone();
3870
3871        let lower_cx = &mut LowerContext::new(store, options, self);
3872        let debug_msg_address = usize::try_from(debug_msg_address)?;
3873        // Lower the string into the component's memory
3874        let offset = lower_cx
3875            .as_slice_mut()
3876            .get(debug_msg_address..)
3877            .and_then(|b| b.get(..debug_msg.bytes().len()))
3878            .map(|_| debug_msg_address)
3879            .ok_or_else(|| anyhow::anyhow!("invalid debug message pointer: out of bounds"))?;
3880        debug_msg
3881            .as_str()
3882            .linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
3883
3884        Ok(())
3885    }
3886
3887    /// Implements the `future.cancel-read` intrinsic.
3888    pub(crate) fn future_cancel_read(
3889        self,
3890        store: &mut StoreOpaque,
3891        caller: RuntimeComponentInstanceIndex,
3892        ty: TypeFutureTableIndex,
3893        async_: bool,
3894        reader: u32,
3895    ) -> Result<u32> {
3896        self.id().get(store).check_may_leave(caller)?;
3897        self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
3898            .map(|v| v.encode())
3899    }
3900
3901    /// Implements the `future.cancel-write` intrinsic.
3902    pub(crate) fn future_cancel_write(
3903        self,
3904        store: &mut StoreOpaque,
3905        caller: RuntimeComponentInstanceIndex,
3906        ty: TypeFutureTableIndex,
3907        async_: bool,
3908        writer: u32,
3909    ) -> Result<u32> {
3910        self.id().get(store).check_may_leave(caller)?;
3911        self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
3912            .map(|v| v.encode())
3913    }
3914
3915    /// Implements the `stream.cancel-read` intrinsic.
3916    pub(crate) fn stream_cancel_read(
3917        self,
3918        store: &mut StoreOpaque,
3919        caller: RuntimeComponentInstanceIndex,
3920        ty: TypeStreamTableIndex,
3921        async_: bool,
3922        reader: u32,
3923    ) -> Result<u32> {
3924        self.id().get(store).check_may_leave(caller)?;
3925        self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
3926            .map(|v| v.encode())
3927    }
3928
3929    /// Implements the `stream.cancel-write` intrinsic.
3930    pub(crate) fn stream_cancel_write(
3931        self,
3932        store: &mut StoreOpaque,
3933        caller: RuntimeComponentInstanceIndex,
3934        ty: TypeStreamTableIndex,
3935        async_: bool,
3936        writer: u32,
3937    ) -> Result<u32> {
3938        self.id().get(store).check_may_leave(caller)?;
3939        self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
3940            .map(|v| v.encode())
3941    }
3942
3943    /// Implements the `future.drop-readable` intrinsic.
3944    pub(crate) fn future_drop_readable(
3945        self,
3946        store: &mut StoreOpaque,
3947        caller: RuntimeComponentInstanceIndex,
3948        ty: TypeFutureTableIndex,
3949        reader: u32,
3950    ) -> Result<()> {
3951        self.id().get(store).check_may_leave(caller)?;
3952        self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
3953    }
3954
3955    /// Implements the `stream.drop-readable` intrinsic.
3956    pub(crate) fn stream_drop_readable(
3957        self,
3958        store: &mut StoreOpaque,
3959        caller: RuntimeComponentInstanceIndex,
3960        ty: TypeStreamTableIndex,
3961        reader: u32,
3962    ) -> Result<()> {
3963        self.id().get(store).check_may_leave(caller)?;
3964        self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
3965    }
3966
3967    /// Allocate a new future or stream and grant ownership of both the read and
3968    /// write ends to the (sub-)component instance to which the specified
3969    /// `TransmitIndex` belongs.
3970    fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
3971        let (write, read) = store.concurrent_state_mut().new_transmit()?;
3972
3973        let table = self.id().get_mut(store).table_for_transmit(ty);
3974        let (read_handle, write_handle) = match ty {
3975            TransmitIndex::Future(ty) => (
3976                table.future_insert_read(ty, read.rep())?,
3977                table.future_insert_write(ty, write.rep())?,
3978            ),
3979            TransmitIndex::Stream(ty) => (
3980                table.stream_insert_read(ty, read.rep())?,
3981                table.stream_insert_write(ty, write.rep())?,
3982            ),
3983        };
3984
3985        let state = store.concurrent_state_mut();
3986        state.get_mut(read)?.common.handle = Some(read_handle);
3987        state.get_mut(write)?.common.handle = Some(write_handle);
3988
3989        Ok(ResourcePair {
3990            write: write_handle,
3991            read: read_handle,
3992        })
3993    }
3994
3995    /// Drop the specified error context.
3996    pub(crate) fn error_context_drop(
3997        self,
3998        store: &mut StoreOpaque,
3999        caller: RuntimeComponentInstanceIndex,
4000        ty: TypeComponentLocalErrorContextTableIndex,
4001        error_context: u32,
4002    ) -> Result<()> {
4003        let instance = self.id().get_mut(store);
4004        instance.check_may_leave(caller)?;
4005
4006        let local_handle_table = instance.table_for_error_context(ty);
4007
4008        let rep = local_handle_table.error_context_drop(error_context)?;
4009
4010        let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
4011
4012        let state = store.concurrent_state_mut();
4013        let GlobalErrorContextRefCount(global_ref_count) = state
4014            .global_error_context_ref_counts
4015            .get_mut(&global_ref_count_idx)
4016            .expect("retrieve concurrent state for error context during drop");
4017
4018        // Reduce the component-global ref count, removing tracking if necessary
4019        assert!(*global_ref_count >= 1);
4020        *global_ref_count -= 1;
4021        if *global_ref_count == 0 {
4022            state
4023                .global_error_context_ref_counts
4024                .remove(&global_ref_count_idx);
4025
4026            state
4027                .delete(TableId::<ErrorContextState>::new(rep))
4028                .context("deleting component-global error context data")?;
4029        }
4030
4031        Ok(())
4032    }
4033
4034    /// Transfer ownership of the specified stream or future read end from one
4035    /// guest to another.
4036    fn guest_transfer(
4037        self,
4038        store: &mut StoreOpaque,
4039        src_idx: u32,
4040        src: TransmitIndex,
4041        dst: TransmitIndex,
4042    ) -> Result<u32> {
4043        let mut instance = self.id().get_mut(store);
4044        let src_table = instance.as_mut().table_for_transmit(src);
4045        let (rep, is_done) = match src {
4046            TransmitIndex::Future(idx) => src_table.future_remove_readable(idx, src_idx)?,
4047            TransmitIndex::Stream(idx) => src_table.stream_remove_readable(idx, src_idx)?,
4048        };
4049        if is_done {
4050            bail!("cannot lift after being notified that the writable end dropped");
4051        }
4052        let dst_table = instance.table_for_transmit(dst);
4053        let handle = match dst {
4054            TransmitIndex::Future(idx) => dst_table.future_insert_read(idx, rep),
4055            TransmitIndex::Stream(idx) => dst_table.stream_insert_read(idx, rep),
4056        }?;
4057        store
4058            .concurrent_state_mut()
4059            .get_mut(TableId::<TransmitHandle>::new(rep))?
4060            .common
4061            .handle = Some(handle);
4062        Ok(handle)
4063    }
4064
4065    /// Implements the `future.new` intrinsic.
4066    pub(crate) fn future_new(
4067        self,
4068        store: &mut StoreOpaque,
4069        caller: RuntimeComponentInstanceIndex,
4070        ty: TypeFutureTableIndex,
4071    ) -> Result<ResourcePair> {
4072        self.id().get(store).check_may_leave(caller)?;
4073        self.guest_new(store, TransmitIndex::Future(ty))
4074    }
4075
4076    /// Implements the `stream.new` intrinsic.
4077    pub(crate) fn stream_new(
4078        self,
4079        store: &mut StoreOpaque,
4080        caller: RuntimeComponentInstanceIndex,
4081        ty: TypeStreamTableIndex,
4082    ) -> Result<ResourcePair> {
4083        self.id().get(store).check_may_leave(caller)?;
4084        self.guest_new(store, TransmitIndex::Stream(ty))
4085    }
4086
4087    /// Transfer ownership of the specified future read end from one guest to
4088    /// another.
4089    pub(crate) fn future_transfer(
4090        self,
4091        store: &mut StoreOpaque,
4092        src_idx: u32,
4093        src: TypeFutureTableIndex,
4094        dst: TypeFutureTableIndex,
4095    ) -> Result<u32> {
4096        self.guest_transfer(
4097            store,
4098            src_idx,
4099            TransmitIndex::Future(src),
4100            TransmitIndex::Future(dst),
4101        )
4102    }
4103
4104    /// Transfer ownership of the specified stream read end from one guest to
4105    /// another.
4106    pub(crate) fn stream_transfer(
4107        self,
4108        store: &mut StoreOpaque,
4109        src_idx: u32,
4110        src: TypeStreamTableIndex,
4111        dst: TypeStreamTableIndex,
4112    ) -> Result<u32> {
4113        self.guest_transfer(
4114            store,
4115            src_idx,
4116            TransmitIndex::Stream(src),
4117            TransmitIndex::Stream(dst),
4118        )
4119    }
4120
4121    /// Copy the specified error context from one component to another.
4122    pub(crate) fn error_context_transfer(
4123        self,
4124        store: &mut StoreOpaque,
4125        src_idx: u32,
4126        src: TypeComponentLocalErrorContextTableIndex,
4127        dst: TypeComponentLocalErrorContextTableIndex,
4128    ) -> Result<u32> {
4129        let mut instance = self.id().get_mut(store);
4130        let rep = instance
4131            .as_mut()
4132            .table_for_error_context(src)
4133            .error_context_rep(src_idx)?;
4134        let dst_idx = instance
4135            .table_for_error_context(dst)
4136            .error_context_insert(rep)?;
4137
4138        // Update the global (cross-subcomponent) count for error contexts
4139        // as the new component has essentially created a new reference that will
4140        // be dropped/handled independently
4141        let global_ref_count = store
4142            .concurrent_state_mut()
4143            .global_error_context_ref_counts
4144            .get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
4145            .context("global ref count present for existing (sub)component error context")?;
4146        global_ref_count.0 += 1;
4147
4148        Ok(dst_idx)
4149    }
4150}
4151
4152impl ComponentInstance {
4153    fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
4154        let (tables, types) = self.guest_tables();
4155        let runtime_instance = match ty {
4156            TransmitIndex::Stream(ty) => types[ty].instance,
4157            TransmitIndex::Future(ty) => types[ty].instance,
4158        };
4159        &mut tables[runtime_instance]
4160    }
4161
4162    fn table_for_error_context(
4163        self: Pin<&mut Self>,
4164        ty: TypeComponentLocalErrorContextTableIndex,
4165    ) -> &mut HandleTable {
4166        let (tables, types) = self.guest_tables();
4167        let runtime_instance = types[ty].instance;
4168        &mut tables[runtime_instance]
4169    }
4170
4171    fn get_mut_by_index(
4172        self: Pin<&mut Self>,
4173        ty: TransmitIndex,
4174        index: u32,
4175    ) -> Result<(u32, &mut TransmitLocalState)> {
4176        get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
4177    }
4178}
4179
4180impl ConcurrentState {
4181    fn send_write_result(
4182        &mut self,
4183        ty: TransmitIndex,
4184        id: TableId<TransmitState>,
4185        handle: u32,
4186        code: ReturnCode,
4187    ) -> Result<()> {
4188        let write_handle = self.get_mut(id)?.write_handle.rep();
4189        self.set_event(
4190            write_handle,
4191            match ty {
4192                TransmitIndex::Future(ty) => Event::FutureWrite {
4193                    code,
4194                    pending: Some((ty, handle)),
4195                },
4196                TransmitIndex::Stream(ty) => Event::StreamWrite {
4197                    code,
4198                    pending: Some((ty, handle)),
4199                },
4200            },
4201        )
4202    }
4203
4204    fn send_read_result(
4205        &mut self,
4206        ty: TransmitIndex,
4207        id: TableId<TransmitState>,
4208        handle: u32,
4209        code: ReturnCode,
4210    ) -> Result<()> {
4211        let read_handle = self.get_mut(id)?.read_handle.rep();
4212        self.set_event(
4213            read_handle,
4214            match ty {
4215                TransmitIndex::Future(ty) => Event::FutureRead {
4216                    code,
4217                    pending: Some((ty, handle)),
4218                },
4219                TransmitIndex::Stream(ty) => Event::StreamRead {
4220                    code,
4221                    pending: Some((ty, handle)),
4222                },
4223            },
4224        )
4225    }
4226
4227    fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
4228        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
4229    }
4230
4231    fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4232        Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
4233    }
4234
4235    /// Set or update the event for the specified waitable.
4236    ///
4237    /// If there is already an event set for this waitable, we assert that it is
4238    /// of the same variant as the new one and reuse the `ReturnCode` count and
4239    /// the `pending` field if applicable.
4240    // TODO: This is a bit awkward due to how
4241    // `Event::{Stream,Future}{Write,Read}` and
4242    // `ReturnCode::{Completed,Dropped,Cancelled}` are currently represented.
4243    // Consider updating those representations in a way that allows this
4244    // function to be simplified.
4245    fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
4246        let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
4247
4248        fn update_code(old: ReturnCode, new: ReturnCode) -> ReturnCode {
4249            let (ReturnCode::Completed(count)
4250            | ReturnCode::Dropped(count)
4251            | ReturnCode::Cancelled(count)) = old
4252            else {
4253                unreachable!()
4254            };
4255
4256            match new {
4257                ReturnCode::Dropped(0) => ReturnCode::Dropped(count),
4258                ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count),
4259                _ => unreachable!(),
4260            }
4261        }
4262
4263        let event = match (waitable.take_event(self)?, event) {
4264            (None, _) => event,
4265            (Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
4266            (Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
4267            (
4268                Some(Event::StreamWrite {
4269                    code: old_code,
4270                    pending: old_pending,
4271                }),
4272                Event::StreamWrite { code, pending },
4273            ) => Event::StreamWrite {
4274                code: update_code(old_code, code),
4275                pending: old_pending.or(pending),
4276            },
4277            (
4278                Some(Event::StreamRead {
4279                    code: old_code,
4280                    pending: old_pending,
4281                }),
4282                Event::StreamRead { code, pending },
4283            ) => Event::StreamRead {
4284                code: update_code(old_code, code),
4285                pending: old_pending.or(pending),
4286            },
4287            _ => unreachable!(),
4288        };
4289
4290        waitable.set_event(self, Some(event))
4291    }
4292
4293    /// Allocate a new future or stream, including the `TransmitState` and the
4294    /// `TransmitHandle`s corresponding to the read and write ends.
4295    fn new_transmit(&mut self) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
4296        let state_id = self.push(TransmitState::default())?;
4297
4298        let write = self.push(TransmitHandle::new(state_id))?;
4299        let read = self.push(TransmitHandle::new(state_id))?;
4300
4301        let state = self.get_mut(state_id)?;
4302        state.write_handle = write;
4303        state.read_handle = read;
4304
4305        log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
4306
4307        Ok((write, read))
4308    }
4309
4310    /// Delete the specified future or stream, including the read and write ends.
4311    fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
4312        let state = self.delete(state_id)?;
4313        self.delete(state.write_handle)?;
4314        self.delete(state.read_handle)?;
4315
4316        log::trace!(
4317            "delete transmit: state {state_id:?}; write {:?}; read {:?}",
4318            state.write_handle,
4319            state.read_handle,
4320        );
4321
4322        Ok(())
4323    }
4324}
4325
4326pub(crate) struct ResourcePair {
4327    pub(crate) write: u32,
4328    pub(crate) read: u32,
4329}
4330
4331impl Waitable {
4332    /// Handle the imminent delivery of the specified event, e.g. by updating
4333    /// the state of the stream or future.
4334    pub(super) fn on_delivery(&self, store: &mut StoreOpaque, instance: Instance, event: Event) {
4335        match event {
4336            Event::FutureRead {
4337                pending: Some((ty, handle)),
4338                ..
4339            }
4340            | Event::FutureWrite {
4341                pending: Some((ty, handle)),
4342                ..
4343            } => {
4344                let instance = instance.id().get_mut(store);
4345                let runtime_instance = instance.component().types()[ty].instance;
4346                let (rep, state) = instance.guest_tables().0[runtime_instance]
4347                    .future_rep(ty, handle)
4348                    .unwrap();
4349                assert_eq!(rep, self.rep());
4350                assert_eq!(*state, TransmitLocalState::Busy);
4351                *state = match event {
4352                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4353                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4354                    _ => unreachable!(),
4355                };
4356            }
4357            Event::StreamRead {
4358                pending: Some((ty, handle)),
4359                code,
4360            }
4361            | Event::StreamWrite {
4362                pending: Some((ty, handle)),
4363                code,
4364            } => {
4365                let instance = instance.id().get_mut(store);
4366                let runtime_instance = instance.component().types()[ty].instance;
4367                let (rep, state) = instance.guest_tables().0[runtime_instance]
4368                    .stream_rep(ty, handle)
4369                    .unwrap();
4370                assert_eq!(rep, self.rep());
4371                assert_eq!(*state, TransmitLocalState::Busy);
4372                let done = matches!(code, ReturnCode::Dropped(_));
4373                *state = match event {
4374                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
4375                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4376                    _ => unreachable!(),
4377                };
4378
4379                let transmit_handle = TableId::<TransmitHandle>::new(rep);
4380                let state = store.concurrent_state_mut();
4381                let transmit_id = state.get_mut(transmit_handle).unwrap().state;
4382                let transmit = state.get_mut(transmit_id).unwrap();
4383
4384                match event {
4385                    Event::StreamRead { .. } => {
4386                        transmit.read = ReadState::Open;
4387                    }
4388                    Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4389                    _ => unreachable!(),
4390                };
4391            }
4392            _ => {}
4393        }
4394    }
4395}
4396
4397#[cfg(test)]
4398mod tests {
4399    use super::*;
4400    use crate::{Engine, Store};
4401    use core::future::pending;
4402    use core::pin::pin;
4403    use std::sync::LazyLock;
4404
4405    static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
4406
4407    fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
4408    where
4409        T: FutureProducer<()>,
4410    {
4411        rx.poll_produce(
4412            &mut Context::from_waker(Waker::noop()),
4413            Store::new(&ENGINE, ()).as_context_mut(),
4414            finish,
4415        )
4416    }
4417
4418    #[test]
4419    fn future_producer() {
4420        let mut fut = pin!(async { anyhow::Ok(()) });
4421        assert!(matches!(
4422            poll_future_producer(fut.as_mut(), false),
4423            Poll::Ready(Ok(Some(()))),
4424        ));
4425
4426        let mut fut = pin!(async { anyhow::Ok(()) });
4427        assert!(matches!(
4428            poll_future_producer(fut.as_mut(), true),
4429            Poll::Ready(Ok(Some(()))),
4430        ));
4431
4432        let mut fut = pin!(pending::<Result<()>>());
4433        assert!(matches!(
4434            poll_future_producer(fut.as_mut(), false),
4435            Poll::Pending,
4436        ));
4437        assert!(matches!(
4438            poll_future_producer(fut.as_mut(), true),
4439            Poll::Ready(Ok(None)),
4440        ));
4441
4442        let (tx, rx) = oneshot::channel();
4443        let mut rx = pin!(rx);
4444        assert!(matches!(
4445            poll_future_producer(rx.as_mut(), false),
4446            Poll::Pending,
4447        ));
4448        assert!(matches!(
4449            poll_future_producer(rx.as_mut(), true),
4450            Poll::Ready(Ok(None)),
4451        ));
4452        tx.send(()).unwrap();
4453        assert!(matches!(
4454            poll_future_producer(rx.as_mut(), true),
4455            Poll::Ready(Ok(Some(()))),
4456        ));
4457
4458        let (tx, rx) = oneshot::channel();
4459        let mut rx = pin!(rx);
4460        tx.send(()).unwrap();
4461        assert!(matches!(
4462            poll_future_producer(rx.as_mut(), false),
4463            Poll::Ready(Ok(Some(()))),
4464        ));
4465
4466        let (tx, rx) = oneshot::channel::<()>();
4467        let mut rx = pin!(rx);
4468        drop(tx);
4469        assert!(matches!(
4470            poll_future_producer(rx.as_mut(), false),
4471            Poll::Ready(Err(..)),
4472        ));
4473
4474        let (tx, rx) = oneshot::channel::<()>();
4475        let mut rx = pin!(rx);
4476        drop(tx);
4477        assert!(matches!(
4478            poll_future_producer(rx.as_mut(), true),
4479            Poll::Ready(Err(..)),
4480        ));
4481    }
4482}