wasmtime_wasi_io/streams.rs
1use crate::poll::Pollable;
2use alloc::boxed::Box;
3use anyhow::Result;
4use bytes::Bytes;
5
6/// `Pollable::ready()` for `InputStream` and `OutputStream` may return
7/// prematurely due to `io::ErrorKind::WouldBlock`.
8///
9/// To ensure that `blocking_` functions return a valid non-empty result,
10/// we use a loop with a maximum iteration limit.
11///
12/// This constant defines the maximum number of loop attempts allowed.
13const MAX_BLOCKING_ATTEMPTS: u8 = 10;
14
15/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
16/// bytestream which can be read from.
17#[async_trait::async_trait]
18pub trait InputStream: Pollable {
19 /// Reads up to `size` bytes, returning a buffer holding these bytes on
20 /// success.
21 ///
22 /// This function does not block the current thread and is the equivalent of
23 /// a non-blocking read. On success all bytes read are returned through
24 /// `Bytes`, which is no larger than the `size` provided. If the returned
25 /// list of `Bytes` is empty then no data is ready to be read at this time.
26 ///
27 /// # Errors
28 ///
29 /// The [`StreamError`] return value communicates when this stream is
30 /// closed, when a read fails, or when a trap should be generated.
31 fn read(&mut self, size: usize) -> StreamResult<Bytes>;
32
33 /// Similar to `read`, except that it blocks until at least one byte can be
34 /// read.
35 async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
36 if size == 0 {
37 self.ready().await;
38 return self.read(size);
39 }
40
41 let mut i = 0;
42 loop {
43 // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
44 self.ready().await;
45 let data = self.read(size)?;
46 if !data.is_empty() {
47 return Ok(data);
48 }
49 if i >= MAX_BLOCKING_ATTEMPTS {
50 return Err(StreamError::trap("max blocking attempts exceeded"));
51 }
52 i += 1;
53 }
54 }
55
56 /// Same as the `read` method except that bytes are skipped.
57 ///
58 /// Note that this method is non-blocking like `read` and returns the same
59 /// errors.
60 fn skip(&mut self, nelem: usize) -> StreamResult<usize> {
61 let bs = self.read(nelem)?;
62 Ok(bs.len())
63 }
64
65 /// Similar to `skip`, except that it blocks until at least one byte can be
66 /// skipped.
67 async fn blocking_skip(&mut self, nelem: usize) -> StreamResult<usize> {
68 let bs = self.blocking_read(nelem).await?;
69 Ok(bs.len())
70 }
71
72 /// Cancel any asynchronous work and wait for it to wrap up.
73 async fn cancel(&mut self) {}
74}
75
76/// Representation of the `error` resource type in the `wasi:io/error`
77/// interface.
78///
79/// This is currently `anyhow::Error` to retain full type information for
80/// errors.
81pub type Error = anyhow::Error;
82
83pub type StreamResult<T> = Result<T, StreamError>;
84
85#[derive(Debug)]
86pub enum StreamError {
87 Closed,
88 LastOperationFailed(anyhow::Error),
89 Trap(anyhow::Error),
90}
91
92impl StreamError {
93 pub fn trap(msg: &str) -> StreamError {
94 StreamError::Trap(anyhow::anyhow!("{msg}"))
95 }
96}
97
98impl alloc::fmt::Display for StreamError {
99 fn fmt(&self, f: &mut alloc::fmt::Formatter<'_>) -> alloc::fmt::Result {
100 match self {
101 StreamError::Closed => write!(f, "closed"),
102 StreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"),
103 StreamError::Trap(e) => write!(f, "trap: {e}"),
104 }
105 }
106}
107
108impl core::error::Error for StreamError {
109 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
110 match self {
111 StreamError::Closed => None,
112 StreamError::LastOperationFailed(e) | StreamError::Trap(e) => e.source(),
113 }
114 }
115}
116
117impl From<wasmtime::component::ResourceTableError> for StreamError {
118 fn from(error: wasmtime::component::ResourceTableError) -> Self {
119 Self::Trap(error.into())
120 }
121}
122
123/// Host trait for implementing the `wasi:io/streams.output-stream` resource:
124/// A bytestream which can be written to.
125#[async_trait::async_trait]
126pub trait OutputStream: Pollable {
127 /// Write bytes after obtaining a permit to write those bytes
128 ///
129 /// Prior to calling [`write`](Self::write) the caller must call
130 /// [`check_write`](Self::check_write), which resolves to a non-zero permit
131 ///
132 /// This method must never block. The [`check_write`](Self::check_write)
133 /// permit indicates the maximum amount of bytes that are permitted to be
134 /// written in a single [`write`](Self::write) following the
135 /// [`check_write`](Self::check_write) resolution.
136 ///
137 /// # Errors
138 ///
139 /// Returns a [`StreamError`] if:
140 /// - stream is closed
141 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
142 /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
143 fn write(&mut self, bytes: Bytes) -> StreamResult<()>;
144
145 /// Trigger a flush of any bytes buffered in this stream implementation.
146 ///
147 /// This method may be called at any time and must never block.
148 ///
149 /// After this method is called, [`check_write`](Self::check_write) must
150 /// pend until flush is complete.
151 ///
152 /// When [`check_write`](Self::check_write) becomes ready after a flush,
153 /// that guarantees that all prior writes have been flushed from the
154 /// implementation successfully, or that any error associated with those
155 /// writes is reported in the return value of [`flush`](Self::flush) or
156 /// [`check_write`](Self::check_write)
157 ///
158 /// # Errors
159 ///
160 /// Returns a [`StreamError`] if:
161 /// - stream is closed
162 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
163 /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
164 fn flush(&mut self) -> StreamResult<()>;
165
166 /// Returns the number of bytes that are ready to be written to this stream.
167 ///
168 /// Zero bytes indicates that this stream is not currently ready for writing
169 /// and `ready()` must be awaited first.
170 ///
171 /// Note that this method does not block.
172 ///
173 /// # Errors
174 ///
175 /// Returns an [`StreamError`] if:
176 /// - stream is closed
177 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
178 fn check_write(&mut self) -> StreamResult<usize>;
179
180 /// Perform a write of up to 4096 bytes, and then flush the stream. Block
181 /// until all of these operations are complete, or an error occurs.
182 ///
183 /// This is a convenience wrapper around the use of `check-write`,
184 /// `subscribe`, `write`, and `flush`, and is implemented with the
185 /// following pseudo-code:
186 ///
187 /// ```text
188 /// let pollable = this.subscribe();
189 /// while !contents.is_empty() {
190 /// // Wait for the stream to become writable
191 /// pollable.block();
192 /// let Ok(n) = this.check-write(); // eliding error handling
193 /// let len = min(n, contents.len());
194 /// let (chunk, rest) = contents.split_at(len);
195 /// this.write(chunk ); // eliding error handling
196 /// contents = rest;
197 /// }
198 /// this.flush();
199 /// // Wait for completion of `flush`
200 /// pollable.block();
201 /// // Check for any errors that arose during `flush`
202 /// let _ = this.check-write(); // eliding error handling
203 /// ```
204 async fn blocking_write_and_flush(&mut self, mut bytes: Bytes) -> StreamResult<()> {
205 loop {
206 let permit = self.write_ready().await?;
207 let len = bytes.len().min(permit);
208 let chunk = bytes.split_to(len);
209 self.write(chunk)?;
210 if bytes.is_empty() {
211 break;
212 }
213 }
214
215 // If the stream encounters an error, return it, but if the stream
216 // has become closed, do not.
217 match self.flush() {
218 Ok(_) => {}
219 Err(StreamError::Closed) => {}
220 Err(e) => Err(e)?,
221 };
222 match self.write_ready().await {
223 Ok(_) => {}
224 Err(StreamError::Closed) => {}
225 Err(e) => Err(e)?,
226 };
227
228 Ok(())
229 }
230
231 /// Repeatedly write a byte to a stream.
232 /// Important: this write must be non-blocking!
233 /// Returning an Err which downcasts to a [`StreamError`] will be
234 /// reported to Wasm as the empty error result. Otherwise, errors will trap.
235 fn write_zeroes(&mut self, nelem: usize) -> StreamResult<()> {
236 // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
237 // repeatedly from a 'static buffer of zeros.
238 let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
239 self.write(bs)?;
240 Ok(())
241 }
242
243 /// Perform a write of up to 4096 zeroes, and then flush the stream.
244 /// Block until all of these operations are complete, or an error
245 /// occurs.
246 ///
247 /// This is a convenience wrapper around the use of `check-write`,
248 /// `subscribe`, `write-zeroes`, and `flush`, and is implemented with
249 /// the following pseudo-code:
250 ///
251 /// ```text
252 /// let pollable = this.subscribe();
253 /// while num_zeroes != 0 {
254 /// // Wait for the stream to become writable
255 /// pollable.block();
256 /// let Ok(n) = this.check-write(); // eliding error handling
257 /// let len = min(n, num_zeroes);
258 /// this.write-zeroes(len); // eliding error handling
259 /// num_zeroes -= len;
260 /// }
261 /// this.flush();
262 /// // Wait for completion of `flush`
263 /// pollable.block();
264 /// // Check for any errors that arose during `flush`
265 /// let _ = this.check-write(); // eliding error handling
266 /// ```
267 async fn blocking_write_zeroes_and_flush(&mut self, nelem: usize) -> StreamResult<()> {
268 // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
269 // repeatedly from a 'static buffer of zeros.
270 let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
271 self.blocking_write_and_flush(bs).await
272 }
273
274 /// Simultaneously waits for this stream to be writable and then returns how
275 /// much may be written or the last error that happened.
276 async fn write_ready(&mut self) -> StreamResult<usize> {
277 let mut i = 0;
278 loop {
279 // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
280 self.ready().await;
281 let n = self.check_write()?;
282 if n > 0 {
283 return Ok(n);
284 }
285 if i >= MAX_BLOCKING_ATTEMPTS {
286 return Err(StreamError::trap("max blocking attempts exceeded"));
287 }
288 i += 1;
289 }
290 }
291
292 /// Cancel any asynchronous work and wait for it to wrap up.
293 async fn cancel(&mut self) {}
294}
295
296#[async_trait::async_trait]
297impl Pollable for Box<dyn OutputStream> {
298 async fn ready(&mut self) {
299 (**self).ready().await
300 }
301}
302
303#[async_trait::async_trait]
304impl Pollable for Box<dyn InputStream> {
305 async fn ready(&mut self) {
306 (**self).ready().await
307 }
308}
309
310pub type DynInputStream = Box<dyn InputStream>;
311
312pub type DynOutputStream = Box<dyn OutputStream>;