StreamProducer

Trait StreamProducer 

Source
pub trait StreamProducer<D>: Send + 'static {
    type Item;
    type Buffer: WriteBuffer<Self::Item> + Default;

    // Required method
    fn poll_produce<'a>(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        store: StoreContextMut<'a, D>,
        destination: Destination<'a, Self::Item, Self::Buffer>,
        finish: bool,
    ) -> Poll<Result<StreamResult>>;

    // Provided method
    fn try_into(
        me: Pin<Box<Self>>,
        _ty: TypeId,
    ) -> Result<Box<dyn Any>, Pin<Box<Self>>> { ... }
}
Available on crate features runtime and component-model and component-model-async only.
Expand description

Represents the host-owned write end of a stream.

Required Associated Types§

Source

type Item

The payload type of this stream.

Source

type Buffer: WriteBuffer<Self::Item> + Default

The WriteBuffer type to use when delivering items.

Required Methods§

Source

fn poll_produce<'a>( self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<'a, D>, destination: Destination<'a, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<Result<StreamResult>>

Handle a host- or guest-initiated read by delivering zero or more items to the specified destination.

This will be called whenever the reader starts a read.

§Arguments
  • self - a Pin’d version of self to perform Rust-level future-related operations on.
  • cx - a Rust-related Context which is passed to other future-related operations or used to acquire a waker.
  • store - the Wasmtime store that this operation is happening within. Used, for example, to consult the state D associated with the store.
  • destination - the location that items are to be written to.
  • finish - a flag indicating whether the host should strive to immediately complete/cancel any pending operation. See below for more details.
§Behavior

If the implementation is able to produce one or more items immediately, it should write them to destination and return either Poll::Ready(Ok(StreamResult::Completed)) if it expects to produce more items, or Poll::Ready(Ok(StreamResult::Dropped)) if it cannot produce any more items.

If the implementation is unable to produce any items immediately, but expects to do so later, and finish is false, it should store the waker from cx for later and return Poll::Pending without writing anything to destination. Later, it should alert the waker when either the items arrive, the stream has ended, or an error occurs.

If more items are written to destination than the reader has immediate capacity to accept, they will be retained in memory by the caller and used to satisfy future reads, in which case poll_produce will only be called again once all those items have been delivered.

§Zero-length reads

This function may be called with a zero-length capacity buffer (i.e. Destination::remaining returns Some(0)). This indicates that the guest wants to wait to see if an item is ready without actually reading the item. For example think of a UNIX poll function run on a TCP stream, seeing if it’s readable without actually reading it.

In this situation the host is allowed to either return immediately or wait for readiness. Note that waiting for readiness is not always possible. For example it’s impossible to test if a Rust-native Future is ready without actually reading the item. Stream-specific optimizations, such as testing if a TCP stream is readable, may be possible however.

For a zero-length read, the host is allowed to:

  • Return Poll::Ready(Ok(StreamResult::Completed)) without writing anything if it expects to be able to produce items immediately (i.e. without first returning Poll::Pending) the next time poll_produce is called with non-zero capacity. This is the best-case scenario of fulfilling the guest’s desire – items aren’t read/buffered but the host is saying it’s ready when the guest is.

  • Return Poll::Ready(Ok(StreamResult::Completed)) without actually testing for readiness. The guest doesn’t know this yet, but the guest will realize that zero-length reads won’t work on this stream when a subsequent nonzero read attempt is made which returns Poll::Pending here.

  • Return Poll::Pending if the host has performed necessary async work to wait for this stream to be readable without actually reading anything. This is also a best-case scenario where the host is letting the guest know that nothing is ready yet. Later the zero-length read will complete and then the guest will attempt a nonzero-length read to actually read some bytes.

  • Return Poll::Ready(Ok(StreamResult::Completed)) after calling Destination::set_buffer with one more more items. Note, however, that this creates the hazard that the items will never be received by the guest if it decides not to do another non-zero-length read before closing the stream. Moreover, if Self::Item is e.g. a Resource<_>, they may end up leaking in that scenario. It is not recommended to do this and it’s better to return StreamResult::Completed without buffering anything instead.

For more discussion on zero-length reads see the documentation in the component-model repo itself.

§Return

This function can return a number of possible cases from this function:

  • Poll::Pending - this operation cannot complete at this time. The Rust-level Future::poll contract applies here where a waker should be stored from the cx argument and be arranged to receive a notification when this implementation can make progress. For example if you call Future::poll on a sub-future, that’s enough. If items were written to destination then a trap in the guest will be raised.

    Note that implementations should strive to avoid this return value when finish is true. In such a situation the guest is attempting to, for example, cancel a previous operation. By returning Poll::Pending the guest will be blocked during the cancellation request. If finish is true then StreamResult::Cancelled is favored to indicate that no items were read. If a short read happened, however, it’s ok to return StreamResult::Completed indicating some items were read.

  • Poll::Ok(StreamResult::Completed) - items, if applicable, were written to the destination.

  • Poll::Ok(StreamResult::Cancelled) - used when finish is true and the implementation was able to successfully cancel any async work that a previous read kicked off, if any. The host should not buffer values received after returning Cancelled because the guest will not be aware of these values and the guest could close the stream after cancelling a read. Hosts should only return Cancelled when there are no more async operations in flight for a previous read.

    If items were written to destination then a trap in the guest will be raised. If finish is false then this return value will raise a trap in the guest.

  • Poll::Ok(StreamResult::Dropped) - end-of-stream marker, indicating that this producer should not be polled again. Note that items may still be written to destination.

§Errors

The implementation may alternatively choose to return Err(_) to indicate an unrecoverable error. This will cause the guest (if any) to trap and render the component instance (if any) unusable. The implementation should report errors that are recoverable by other means (e.g. by writing to a future) and return Poll::Ready(Ok(StreamResult::Dropped)).

Provided Methods§

Source

fn try_into( me: Pin<Box<Self>>, _ty: TypeId, ) -> Result<Box<dyn Any>, Pin<Box<Self>>>

Attempt to convert the specified object into a Box<dyn Any> which may be downcast to the specified type.

The implementation must ensure that, if it returns Ok(_), a downcast to the specified type is guaranteed to succeed.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementations on Foreign Types§

Source§

impl<D> StreamProducer<D> for Bytes

Available on crate feature component-model-async-bytes only.
Source§

type Item = u8

Source§

type Buffer = Cursor<Bytes>

Source§

fn poll_produce<'a>( self: Pin<&mut Self>, _: &mut Context<'_>, store: StoreContextMut<'a, D>, dst: Destination<'a, Self::Item, Self::Buffer>, _: bool, ) -> Poll<Result<StreamResult>>

Source§

impl<D> StreamProducer<D> for BytesMut

Available on crate feature component-model-async-bytes only.
Source§

type Item = u8

Source§

type Buffer = Cursor<BytesMut>

Source§

fn poll_produce<'a>( self: Pin<&mut Self>, _: &mut Context<'_>, store: StoreContextMut<'a, D>, dst: Destination<'a, Self::Item, Self::Buffer>, _: bool, ) -> Poll<Result<StreamResult>>

Source§

impl<T, D> StreamProducer<D> for Box<[T]>
where T: Unpin + Send + Sync + 'static,

Source§

type Item = T

Source§

type Buffer = VecBuffer<T>

Source§

fn poll_produce<'a>( self: Pin<&mut Self>, _: &mut Context<'_>, _: StoreContextMut<'a, D>, dst: Destination<'a, Self::Item, Self::Buffer>, _: bool, ) -> Poll<Result<StreamResult>>

Source§

impl<T, D> StreamProducer<D> for Vec<T>
where T: Unpin + Send + Sync + 'static,

Source§

type Item = T

Source§

type Buffer = VecBuffer<T>

Source§

fn poll_produce<'a>( self: Pin<&mut Self>, _: &mut Context<'_>, _: StoreContextMut<'a, D>, dst: Destination<'a, Self::Item, Self::Buffer>, _: bool, ) -> Poll<Result<StreamResult>>

Source§

impl<T, D> StreamProducer<D> for Empty<T>
where T: Send + Sync + 'static,

Source§

type Item = T

Source§

type Buffer = Option<<Empty<T> as StreamProducer<D>>::Item>

Source§

fn poll_produce<'a>( self: Pin<&mut Self>, _: &mut Context<'_>, _: StoreContextMut<'a, D>, _: Destination<'a, Self::Item, Self::Buffer>, _: bool, ) -> Poll<Result<StreamResult>>

Source§

impl<T, D> StreamProducer<D> for Empty<T>
where T: Send + Sync + 'static,

Source§

type Item = T

Source§

type Buffer = Option<<Empty<T> as StreamProducer<D>>::Item>

Source§

fn poll_produce<'a>( self: Pin<&mut Self>, _: &mut Context<'_>, _: StoreContextMut<'a, D>, _: Destination<'a, Self::Item, Self::Buffer>, _: bool, ) -> Poll<Result<StreamResult>>

Implementors§