Trait StreamConsumer

Source
pub trait StreamConsumer<D>: Send + 'static {
    type Item;

    // Required method
    fn poll_consume(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        store: StoreContextMut<'_, D>,
        source: Source<'_, Self::Item>,
        finish: bool,
    ) -> Poll<Result<StreamResult>>;
}
Available on crate features runtime and component-model and component-model-async only.
Expand description

Represents the host-owned read end of a stream.

Required Associated Types§

Source

type Item

The payload type of this stream.

Required Methods§

Source

fn poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<'_, D>, source: Source<'_, Self::Item>, finish: bool, ) -> Poll<Result<StreamResult>>

Handle a host- or guest-initiated write by accepting zero or more items from the specified source.

This will be called whenever the writer starts a write.

If the implementation is able to consume one or more items immediately, it should take them from source and return either Poll::Ready(Ok(StreamResult::Completed)) if it expects to be able to consume more items, or Poll::Ready(Ok(StreamResult::Dropped)) if it cannot accept any more items. Alternatively, it may return Poll::Pending to indicate that the caller should delay sending a COMPLETED event to the writer until a later call to this function returns Poll::Ready(_). For more about that, see the Backpressure section below.

If the implementation cannot consume any items immediately and finish is false, it should store the waker from cx for later and return Poll::Pending without writing anything to destination. Later, it should alert the waker when either (1) the items arrive, (2) the stream has ended, or (3) an error occurs.

If the implementation cannot consume any items immediately and finish is true, it should, if possible, return Poll::Ready(Ok(StreamResult::Cancelled)) immediately without taking anything from source. However, that might not be possible if an earlier call to poll_consume kicked off an asynchronous operation which needs to be completed (and possibly interrupted) gracefully, in which case the implementation may return Poll::Pending and later alert the waker as described above. In other words, when finish is true, the implementation should prioritize returning a result to the reader (even if no items can be consumed) rather than wait indefinitely for at capacity to free up.

In all of the above cases, the implementation may alternatively choose to return Err(_) to indicate an unrecoverable error. This will cause the guest (if any) to trap and render the component instance (if any) unusable. The implementation should report errors that are recoverable by other means (e.g. by writing to a future) and return Poll::Ready(Ok(StreamResult::Dropped)).

Note that the implementation should only return Poll::Ready(Ok(StreamResult::Cancelled)) without having taken any items from source if called with finish set to true. If it does so when finish is false, the caller will trap. Additionally, it should only return Poll::Ready(Ok(StreamResult::Completed)) after taking at least one item from source if there is an item available; otherwise, the caller will trap. If poll_consume is called with no items in source, it should only return Poll::Ready(_) once it is able to accept at least one item during the next call to poll_consume.

Note that any items which the implementation of this trait takes from source become the responsibility of that implementation. For that reason, an implementation which forwards items to an upstream sink should reserve capacity in that sink before taking items out of source, if possible. Alternatively, it might buffer items which can’t be forwarded immediately and send them once capacity is freed up.

§Backpressure

As mentioned above, an implementation might choose to return Poll::Pending after taking items from source, which tells the caller to delay sending a COMPLETED event to the writer. This can be used as a form of backpressure when the items are forwarded to an upstream sink asynchronously. Note, however, that it’s not possible to “put back” items into source once they’ve been taken out, so if the upstream sink is unable to accept all the items, that cannot be communicated to the writer at this level of abstraction. Just as with application-specific, recoverable errors, information about which items could be forwarded and which could not must be communicated out-of-band, e.g. by writing to an application-specific future.

Similarly, if the writer cancels the write after items have been taken from source but before the items have all been forwarded to an upstream sink, poll_consume will be called with finish set to true, and the implementation may either:

  • Interrupt the forwarding process gracefully. This may be preferable if there is an out-of-band channel for communicating to the writer how many items were forwarded before being interrupted.

  • Allow the forwarding to complete without interrupting it. This is usually preferable if there’s no out-of-band channel for reporting back to the writer how many items were forwarded.

Implementors§