pub trait StreamConsumer<D>: Send + 'static {
type Item;
// Required method
fn poll_consume(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<'_, D>,
source: Source<'_, Self::Item>,
finish: bool,
) -> Poll<Result<StreamResult>>;
}
runtime
and component-model
and component-model-async
only.Expand description
Represents the host-owned read end of a stream.
Required Associated Types§
Required Methods§
Sourcefn poll_consume(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<'_, D>,
source: Source<'_, Self::Item>,
finish: bool,
) -> Poll<Result<StreamResult>>
fn poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<'_, D>, source: Source<'_, Self::Item>, finish: bool, ) -> Poll<Result<StreamResult>>
Handle a host- or guest-initiated write by accepting zero or more items from the specified source.
This will be called whenever the writer starts a write.
If the implementation is able to consume one or more items immediately,
it should take them from source
and return either
Poll::Ready(Ok(StreamResult::Completed))
if it expects to be able to consume
more items, or Poll::Ready(Ok(StreamResult::Dropped))
if it cannot
accept any more items. Alternatively, it may return Poll::Pending
to
indicate that the caller should delay sending a COMPLETED
event to the
writer until a later call to this function returns Poll::Ready(_)
.
For more about that, see the Backpressure
section below.
If the implementation cannot consume any items immediately and finish
is false, it should store the waker from cx
for later and return
Poll::Pending
without writing anything to destination
. Later, it
should alert the waker when either (1) the items arrive, (2) the stream
has ended, or (3) an error occurs.
If the implementation cannot consume any items immediately and finish
is true, it should, if possible, return
Poll::Ready(Ok(StreamResult::Cancelled))
immediately without taking
anything from source
. However, that might not be possible if an
earlier call to poll_consume
kicked off an asynchronous operation
which needs to be completed (and possibly interrupted) gracefully, in
which case the implementation may return Poll::Pending
and later alert
the waker as described above. In other words, when finish
is true,
the implementation should prioritize returning a result to the reader
(even if no items can be consumed) rather than wait indefinitely for at
capacity to free up.
In all of the above cases, the implementation may alternatively choose
to return Err(_)
to indicate an unrecoverable error. This will cause
the guest (if any) to trap and render the component instance (if any)
unusable. The implementation should report errors that are
recoverable by other means (e.g. by writing to a future
) and return
Poll::Ready(Ok(StreamResult::Dropped))
.
Note that the implementation should only return
Poll::Ready(Ok(StreamResult::Cancelled))
without having taken any
items from source
if called with finish
set to true. If it does so
when finish
is false, the caller will trap. Additionally, it should
only return Poll::Ready(Ok(StreamResult::Completed))
after taking at
least one item from source
if there is an item available; otherwise,
the caller will trap. If poll_consume
is called with no items in
source
, it should only return Poll::Ready(_)
once it is able to
accept at least one item during the next call to poll_consume
.
Note that any items which the implementation of this trait takes from
source
become the responsibility of that implementation. For that
reason, an implementation which forwards items to an upstream sink
should reserve capacity in that sink before taking items out of
source
, if possible. Alternatively, it might buffer items which can’t
be forwarded immediately and send them once capacity is freed up.
§Backpressure
As mentioned above, an implementation might choose to return
Poll::Pending
after taking items from source
, which tells the caller
to delay sending a COMPLETED
event to the writer. This can be used as
a form of backpressure when the items are forwarded to an upstream sink
asynchronously. Note, however, that it’s not possible to “put back”
items into source
once they’ve been taken out, so if the upstream sink
is unable to accept all the items, that cannot be communicated to the
writer at this level of abstraction. Just as with application-specific,
recoverable errors, information about which items could be forwarded and
which could not must be communicated out-of-band, e.g. by writing to an
application-specific future
.
Similarly, if the writer cancels the write after items have been taken
from source
but before the items have all been forwarded to an
upstream sink, poll_consume
will be called with finish
set to true,
and the implementation may either:
-
Interrupt the forwarding process gracefully. This may be preferable if there is an out-of-band channel for communicating to the writer how many items were forwarded before being interrupted.
-
Allow the forwarding to complete without interrupting it. This is usually preferable if there’s no out-of-band channel for reporting back to the writer how many items were forwarded.