wasmtime_wasi_io/
streams.rs

1use crate::poll::Pollable;
2use alloc::boxed::Box;
3use anyhow::Result;
4use bytes::Bytes;
5
6/// `Pollable::ready()` for `InputStream` and `OutputStream` may return
7/// prematurely due to `io::ErrorKind::WouldBlock`.
8///
9/// To ensure that `blocking_` functions return a valid non-empty result,
10/// we use a loop with a maximum iteration limit.
11///
12/// This constant defines the maximum number of loop attempts allowed.
13const MAX_BLOCKING_ATTEMPTS: u8 = 10;
14
15/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
16/// bytestream which can be read from.
17#[async_trait::async_trait]
18pub trait InputStream: Pollable {
19    /// Reads up to `size` bytes, returning a buffer holding these bytes on
20    /// success.
21    ///
22    /// This function does not block the current thread and is the equivalent of
23    /// a non-blocking read. On success all bytes read are returned through
24    /// `Bytes`, which is no larger than the `size` provided. If the returned
25    /// list of `Bytes` is empty then no data is ready to be read at this time.
26    ///
27    /// # Errors
28    ///
29    /// The [`StreamError`] return value communicates when this stream is
30    /// closed, when a read fails, or when a trap should be generated.
31    fn read(&mut self, size: usize) -> StreamResult<Bytes>;
32
33    /// Similar to `read`, except that it blocks until at least one byte can be
34    /// read.
35    async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
36        if size == 0 {
37            self.ready().await;
38            return self.read(size);
39        }
40
41        let mut i = 0;
42        loop {
43            // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
44            self.ready().await;
45            let data = self.read(size)?;
46            if !data.is_empty() {
47                return Ok(data);
48            }
49            if i >= MAX_BLOCKING_ATTEMPTS {
50                return Err(StreamError::trap("max blocking attempts exceeded"));
51            }
52            i += 1;
53        }
54    }
55
56    /// Same as the `read` method except that bytes are skipped.
57    ///
58    /// Note that this method is non-blocking like `read` and returns the same
59    /// errors.
60    fn skip(&mut self, nelem: usize) -> StreamResult<usize> {
61        let bs = self.read(nelem)?;
62        Ok(bs.len())
63    }
64
65    /// Similar to `skip`, except that it blocks until at least one byte can be
66    /// skipped.
67    async fn blocking_skip(&mut self, nelem: usize) -> StreamResult<usize> {
68        let bs = self.blocking_read(nelem).await?;
69        Ok(bs.len())
70    }
71
72    /// Cancel any asynchronous work and wait for it to wrap up.
73    async fn cancel(&mut self) {}
74}
75
76/// Representation of the `error` resource type in the `wasi:io/error`
77/// interface.
78///
79/// This is currently `anyhow::Error` to retain full type information for
80/// errors.
81pub type Error = anyhow::Error;
82
83pub type StreamResult<T> = Result<T, StreamError>;
84
85#[derive(Debug)]
86pub enum StreamError {
87    Closed,
88    LastOperationFailed(anyhow::Error),
89    Trap(anyhow::Error),
90}
91
92impl StreamError {
93    pub fn trap(msg: &str) -> StreamError {
94        StreamError::Trap(anyhow::anyhow!("{msg}"))
95    }
96}
97
98impl alloc::fmt::Display for StreamError {
99    fn fmt(&self, f: &mut alloc::fmt::Formatter<'_>) -> alloc::fmt::Result {
100        match self {
101            StreamError::Closed => write!(f, "closed"),
102            StreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"),
103            StreamError::Trap(e) => write!(f, "trap: {e}"),
104        }
105    }
106}
107
108impl core::error::Error for StreamError {
109    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
110        match self {
111            StreamError::Closed => None,
112            StreamError::LastOperationFailed(e) | StreamError::Trap(e) => e.source(),
113        }
114    }
115}
116
117impl From<wasmtime::component::ResourceTableError> for StreamError {
118    fn from(error: wasmtime::component::ResourceTableError) -> Self {
119        Self::Trap(error.into())
120    }
121}
122
123/// Host trait for implementing the `wasi:io/streams.output-stream` resource:
124/// A bytestream which can be written to.
125#[async_trait::async_trait]
126pub trait OutputStream: Pollable {
127    /// Write bytes after obtaining a permit to write those bytes
128    ///
129    /// Prior to calling [`write`](Self::write) the caller must call
130    /// [`check_write`](Self::check_write), which resolves to a non-zero permit
131    ///
132    /// This method must never block.  The [`check_write`](Self::check_write)
133    /// permit indicates the maximum amount of bytes that are permitted to be
134    /// written in a single [`write`](Self::write) following the
135    /// [`check_write`](Self::check_write) resolution.
136    ///
137    /// # Errors
138    ///
139    /// Returns a [`StreamError`] if:
140    /// - stream is closed
141    /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
142    /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
143    fn write(&mut self, bytes: Bytes) -> StreamResult<()>;
144
145    /// Trigger a flush of any bytes buffered in this stream implementation.
146    ///
147    /// This method may be called at any time and must never block.
148    ///
149    /// After this method is called, [`check_write`](Self::check_write) must
150    /// pend until flush is complete.
151    ///
152    /// When [`check_write`](Self::check_write) becomes ready after a flush,
153    /// that guarantees that all prior writes have been flushed from the
154    /// implementation successfully, or that any error associated with those
155    /// writes is reported in the return value of [`flush`](Self::flush) or
156    /// [`check_write`](Self::check_write)
157    ///
158    /// # Errors
159    ///
160    /// Returns a [`StreamError`] if:
161    /// - stream is closed
162    /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
163    /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
164    fn flush(&mut self) -> StreamResult<()>;
165
166    /// Returns the number of bytes that are ready to be written to this stream.
167    ///
168    /// Zero bytes indicates that this stream is not currently ready for writing
169    /// and `ready()` must be awaited first.
170    ///
171    /// Note that this method does not block.
172    ///
173    /// # Errors
174    ///
175    /// Returns an [`StreamError`] if:
176    /// - stream is closed
177    /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
178    fn check_write(&mut self) -> StreamResult<usize>;
179
180    /// Perform a write of up to 4096 bytes, and then flush the stream. Block
181    /// until all of these operations are complete, or an error occurs.
182    ///
183    /// This is a convenience wrapper around the use of `check-write`,
184    /// `subscribe`, `write`, and `flush`, and is implemented with the
185    /// following pseudo-code:
186    ///
187    /// ```text
188    /// let pollable = this.subscribe();
189    /// while !contents.is_empty() {
190    ///     // Wait for the stream to become writable
191    ///     pollable.block();
192    ///     let Ok(n) = this.check-write(); // eliding error handling
193    ///     let len = min(n, contents.len());
194    ///     let (chunk, rest) = contents.split_at(len);
195    ///     this.write(chunk  );            // eliding error handling
196    ///     contents = rest;
197    /// }
198    /// this.flush();
199    /// // Wait for completion of `flush`
200    /// pollable.block();
201    /// // Check for any errors that arose during `flush`
202    /// let _ = this.check-write();         // eliding error handling
203    /// ```
204    async fn blocking_write_and_flush(&mut self, mut bytes: Bytes) -> StreamResult<()> {
205        loop {
206            let permit = self.write_ready().await?;
207            let len = bytes.len().min(permit);
208            let chunk = bytes.split_to(len);
209            self.write(chunk)?;
210            if bytes.is_empty() {
211                break;
212            }
213        }
214
215        self.flush()?;
216        self.write_ready().await?;
217
218        Ok(())
219    }
220
221    /// Repeatedly write a byte to a stream.
222    /// Important: this write must be non-blocking!
223    /// Returning an Err which downcasts to a [`StreamError`] will be
224    /// reported to Wasm as the empty error result. Otherwise, errors will trap.
225    fn write_zeroes(&mut self, nelem: usize) -> StreamResult<()> {
226        // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
227        // repeatedly from a 'static buffer of zeros.
228        let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
229        self.write(bs)?;
230        Ok(())
231    }
232
233    /// Perform a write of up to 4096 zeroes, and then flush the stream.
234    /// Block until all of these operations are complete, or an error
235    /// occurs.
236    ///
237    /// This is a convenience wrapper around the use of `check-write`,
238    /// `subscribe`, `write-zeroes`, and `flush`, and is implemented with
239    /// the following pseudo-code:
240    ///
241    /// ```text
242    /// let pollable = this.subscribe();
243    /// while num_zeroes != 0 {
244    ///     // Wait for the stream to become writable
245    ///     pollable.block();
246    ///     let Ok(n) = this.check-write(); // eliding error handling
247    ///     let len = min(n, num_zeroes);
248    ///     this.write-zeroes(len);         // eliding error handling
249    ///     num_zeroes -= len;
250    /// }
251    /// this.flush();
252    /// // Wait for completion of `flush`
253    /// pollable.block();
254    /// // Check for any errors that arose during `flush`
255    /// let _ = this.check-write();         // eliding error handling
256    /// ```
257    async fn blocking_write_zeroes_and_flush(&mut self, nelem: usize) -> StreamResult<()> {
258        // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
259        // repeatedly from a 'static buffer of zeros.
260        let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
261        self.blocking_write_and_flush(bs).await
262    }
263
264    /// Simultaneously waits for this stream to be writable and then returns how
265    /// much may be written or the last error that happened.
266    async fn write_ready(&mut self) -> StreamResult<usize> {
267        let mut i = 0;
268        loop {
269            // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
270            self.ready().await;
271            let n = self.check_write()?;
272            if n > 0 {
273                return Ok(n);
274            }
275            if i >= MAX_BLOCKING_ATTEMPTS {
276                return Err(StreamError::trap("max blocking attempts exceeded"));
277            }
278            i += 1;
279        }
280    }
281
282    /// Cancel any asynchronous work and wait for it to wrap up.
283    async fn cancel(&mut self) {}
284}
285
286#[async_trait::async_trait]
287impl Pollable for Box<dyn OutputStream> {
288    async fn ready(&mut self) {
289        (**self).ready().await
290    }
291}
292
293#[async_trait::async_trait]
294impl Pollable for Box<dyn InputStream> {
295    async fn ready(&mut self) {
296        (**self).ready().await
297    }
298}
299
300pub type DynInputStream = Box<dyn InputStream>;
301
302pub type DynOutputStream = Box<dyn OutputStream>;