wasmtime_wasi_io/streams.rs
1use crate::poll::Pollable;
2use alloc::boxed::Box;
3use anyhow::Result;
4use bytes::Bytes;
5
6/// `Pollable::ready()` for `InputStream` and `OutputStream` may return
7/// prematurely due to `io::ErrorKind::WouldBlock`.
8///
9/// To ensure that `blocking_` functions return a valid non-empty result,
10/// we use a loop with a maximum iteration limit.
11///
12/// This constant defines the maximum number of loop attempts allowed.
13const MAX_BLOCKING_ATTEMPTS: u8 = 10;
14
15/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
16/// bytestream which can be read from.
17#[async_trait::async_trait]
18pub trait InputStream: Pollable {
19 /// Reads up to `size` bytes, returning a buffer holding these bytes on
20 /// success.
21 ///
22 /// This function does not block the current thread and is the equivalent of
23 /// a non-blocking read. On success all bytes read are returned through
24 /// `Bytes`, which is no larger than the `size` provided. If the returned
25 /// list of `Bytes` is empty then no data is ready to be read at this time.
26 ///
27 /// # Errors
28 ///
29 /// The [`StreamError`] return value communicates when this stream is
30 /// closed, when a read fails, or when a trap should be generated.
31 fn read(&mut self, size: usize) -> StreamResult<Bytes>;
32
33 /// Similar to `read`, except that it blocks until at least one byte can be
34 /// read.
35 async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
36 if size == 0 {
37 self.ready().await;
38 return self.read(size);
39 }
40
41 let mut i = 0;
42 loop {
43 // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
44 self.ready().await;
45 let data = self.read(size)?;
46 if !data.is_empty() {
47 return Ok(data);
48 }
49 if i >= MAX_BLOCKING_ATTEMPTS {
50 return Err(StreamError::trap("max blocking attempts exceeded"));
51 }
52 i += 1;
53 }
54 }
55
56 /// Same as the `read` method except that bytes are skipped.
57 ///
58 /// Note that this method is non-blocking like `read` and returns the same
59 /// errors.
60 fn skip(&mut self, nelem: usize) -> StreamResult<usize> {
61 let bs = self.read(nelem)?;
62 Ok(bs.len())
63 }
64
65 /// Similar to `skip`, except that it blocks until at least one byte can be
66 /// skipped.
67 async fn blocking_skip(&mut self, nelem: usize) -> StreamResult<usize> {
68 let bs = self.blocking_read(nelem).await?;
69 Ok(bs.len())
70 }
71
72 /// Cancel any asynchronous work and wait for it to wrap up.
73 async fn cancel(&mut self) {}
74}
75
76/// Representation of the `error` resource type in the `wasi:io/error`
77/// interface.
78///
79/// This is currently `anyhow::Error` to retain full type information for
80/// errors.
81pub type Error = anyhow::Error;
82
83pub type StreamResult<T> = Result<T, StreamError>;
84
85#[derive(Debug)]
86pub enum StreamError {
87 Closed,
88 LastOperationFailed(anyhow::Error),
89 Trap(anyhow::Error),
90}
91
92impl StreamError {
93 pub fn trap(msg: &str) -> StreamError {
94 StreamError::Trap(anyhow::anyhow!("{msg}"))
95 }
96}
97
98impl alloc::fmt::Display for StreamError {
99 fn fmt(&self, f: &mut alloc::fmt::Formatter<'_>) -> alloc::fmt::Result {
100 match self {
101 StreamError::Closed => write!(f, "closed"),
102 StreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"),
103 StreamError::Trap(e) => write!(f, "trap: {e}"),
104 }
105 }
106}
107
108impl core::error::Error for StreamError {
109 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
110 match self {
111 StreamError::Closed => None,
112 StreamError::LastOperationFailed(e) | StreamError::Trap(e) => e.source(),
113 }
114 }
115}
116
117impl From<wasmtime::component::ResourceTableError> for StreamError {
118 fn from(error: wasmtime::component::ResourceTableError) -> Self {
119 Self::Trap(error.into())
120 }
121}
122
123/// Host trait for implementing the `wasi:io/streams.output-stream` resource:
124/// A bytestream which can be written to.
125#[async_trait::async_trait]
126pub trait OutputStream: Pollable {
127 /// Write bytes after obtaining a permit to write those bytes
128 ///
129 /// Prior to calling [`write`](Self::write) the caller must call
130 /// [`check_write`](Self::check_write), which resolves to a non-zero permit
131 ///
132 /// This method must never block. The [`check_write`](Self::check_write)
133 /// permit indicates the maximum amount of bytes that are permitted to be
134 /// written in a single [`write`](Self::write) following the
135 /// [`check_write`](Self::check_write) resolution.
136 ///
137 /// # Errors
138 ///
139 /// Returns a [`StreamError`] if:
140 /// - stream is closed
141 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
142 /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
143 fn write(&mut self, bytes: Bytes) -> StreamResult<()>;
144
145 /// Trigger a flush of any bytes buffered in this stream implementation.
146 ///
147 /// This method may be called at any time and must never block.
148 ///
149 /// After this method is called, [`check_write`](Self::check_write) must
150 /// pend until flush is complete.
151 ///
152 /// When [`check_write`](Self::check_write) becomes ready after a flush,
153 /// that guarantees that all prior writes have been flushed from the
154 /// implementation successfully, or that any error associated with those
155 /// writes is reported in the return value of [`flush`](Self::flush) or
156 /// [`check_write`](Self::check_write)
157 ///
158 /// # Errors
159 ///
160 /// Returns a [`StreamError`] if:
161 /// - stream is closed
162 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
163 /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
164 fn flush(&mut self) -> StreamResult<()>;
165
166 /// Returns the number of bytes that are ready to be written to this stream.
167 ///
168 /// Zero bytes indicates that this stream is not currently ready for writing
169 /// and `ready()` must be awaited first.
170 ///
171 /// Note that this method does not block.
172 ///
173 /// # Errors
174 ///
175 /// Returns an [`StreamError`] if:
176 /// - stream is closed
177 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
178 fn check_write(&mut self) -> StreamResult<usize>;
179
180 /// Perform a write of up to 4096 bytes, and then flush the stream. Block
181 /// until all of these operations are complete, or an error occurs.
182 ///
183 /// This is a convenience wrapper around the use of `check-write`,
184 /// `subscribe`, `write`, and `flush`, and is implemented with the
185 /// following pseudo-code:
186 ///
187 /// ```text
188 /// let pollable = this.subscribe();
189 /// while !contents.is_empty() {
190 /// // Wait for the stream to become writable
191 /// pollable.block();
192 /// let Ok(n) = this.check-write(); // eliding error handling
193 /// let len = min(n, contents.len());
194 /// let (chunk, rest) = contents.split_at(len);
195 /// this.write(chunk ); // eliding error handling
196 /// contents = rest;
197 /// }
198 /// this.flush();
199 /// // Wait for completion of `flush`
200 /// pollable.block();
201 /// // Check for any errors that arose during `flush`
202 /// let _ = this.check-write(); // eliding error handling
203 /// ```
204 async fn blocking_write_and_flush(&mut self, mut bytes: Bytes) -> StreamResult<()> {
205 loop {
206 let permit = self.write_ready().await?;
207 let len = bytes.len().min(permit);
208 let chunk = bytes.split_to(len);
209 self.write(chunk)?;
210 if bytes.is_empty() {
211 break;
212 }
213 }
214
215 self.flush()?;
216 self.write_ready().await?;
217
218 Ok(())
219 }
220
221 /// Repeatedly write a byte to a stream.
222 /// Important: this write must be non-blocking!
223 /// Returning an Err which downcasts to a [`StreamError`] will be
224 /// reported to Wasm as the empty error result. Otherwise, errors will trap.
225 fn write_zeroes(&mut self, nelem: usize) -> StreamResult<()> {
226 // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
227 // repeatedly from a 'static buffer of zeros.
228 let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
229 self.write(bs)?;
230 Ok(())
231 }
232
233 /// Perform a write of up to 4096 zeroes, and then flush the stream.
234 /// Block until all of these operations are complete, or an error
235 /// occurs.
236 ///
237 /// This is a convenience wrapper around the use of `check-write`,
238 /// `subscribe`, `write-zeroes`, and `flush`, and is implemented with
239 /// the following pseudo-code:
240 ///
241 /// ```text
242 /// let pollable = this.subscribe();
243 /// while num_zeroes != 0 {
244 /// // Wait for the stream to become writable
245 /// pollable.block();
246 /// let Ok(n) = this.check-write(); // eliding error handling
247 /// let len = min(n, num_zeroes);
248 /// this.write-zeroes(len); // eliding error handling
249 /// num_zeroes -= len;
250 /// }
251 /// this.flush();
252 /// // Wait for completion of `flush`
253 /// pollable.block();
254 /// // Check for any errors that arose during `flush`
255 /// let _ = this.check-write(); // eliding error handling
256 /// ```
257 async fn blocking_write_zeroes_and_flush(&mut self, nelem: usize) -> StreamResult<()> {
258 // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
259 // repeatedly from a 'static buffer of zeros.
260 let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
261 self.blocking_write_and_flush(bs).await
262 }
263
264 /// Simultaneously waits for this stream to be writable and then returns how
265 /// much may be written or the last error that happened.
266 async fn write_ready(&mut self) -> StreamResult<usize> {
267 let mut i = 0;
268 loop {
269 // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
270 self.ready().await;
271 let n = self.check_write()?;
272 if n > 0 {
273 return Ok(n);
274 }
275 if i >= MAX_BLOCKING_ATTEMPTS {
276 return Err(StreamError::trap("max blocking attempts exceeded"));
277 }
278 i += 1;
279 }
280 }
281
282 /// Cancel any asynchronous work and wait for it to wrap up.
283 async fn cancel(&mut self) {}
284}
285
286#[async_trait::async_trait]
287impl Pollable for Box<dyn OutputStream> {
288 async fn ready(&mut self) {
289 (**self).ready().await
290 }
291}
292
293#[async_trait::async_trait]
294impl Pollable for Box<dyn InputStream> {
295 async fn ready(&mut self) {
296 (**self).ready().await
297 }
298}
299
300pub type DynInputStream = Box<dyn InputStream>;
301
302pub type DynOutputStream = Box<dyn OutputStream>;