wasmtime_wasi_io/streams.rs
1use crate::poll::Pollable;
2use alloc::boxed::Box;
3use bytes::Bytes;
4use wasmtime::Result;
5
6/// `Pollable::ready()` for `InputStream` and `OutputStream` may return
7/// prematurely due to `io::ErrorKind::WouldBlock`.
8///
9/// To ensure that `blocking_` functions return a valid non-empty result,
10/// we use a loop with a maximum iteration limit.
11///
12/// This constant defines the maximum number of loop attempts allowed.
13const MAX_BLOCKING_ATTEMPTS: u8 = 10;
14
15/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
16/// bytestream which can be read from.
17#[async_trait::async_trait]
18pub trait InputStream: Pollable {
19 /// Reads up to `size` bytes, returning a buffer holding these bytes on
20 /// success.
21 ///
22 /// This function does not block the current thread and is the equivalent of
23 /// a non-blocking read. On success all bytes read are returned through
24 /// `Bytes`, which is no larger than the `size` provided. If the returned
25 /// list of `Bytes` is empty then no data is ready to be read at this time.
26 ///
27 /// # Errors
28 ///
29 /// The [`StreamError`] return value communicates when this stream is
30 /// closed, when a read fails, or when a trap should be generated.
31 fn read(&mut self, size: usize) -> StreamResult<Bytes>;
32
33 /// Similar to `read`, except that it blocks until at least one byte can be
34 /// read.
35 async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
36 if size == 0 {
37 self.ready().await;
38 return self.read(size);
39 }
40
41 let mut i = 0;
42 loop {
43 // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
44 self.ready().await;
45 let data = self.read(size)?;
46 if !data.is_empty() {
47 return Ok(data);
48 }
49 if i >= MAX_BLOCKING_ATTEMPTS {
50 return Err(StreamError::trap("max blocking attempts exceeded"));
51 }
52 i += 1;
53 }
54 }
55
56 /// Same as the `read` method except that bytes are skipped.
57 ///
58 /// Note that this method is non-blocking like `read` and returns the same
59 /// errors.
60 fn skip(&mut self, nelem: usize) -> StreamResult<usize> {
61 let bs = self.read(nelem)?;
62 Ok(bs.len())
63 }
64
65 /// Similar to `skip`, except that it blocks until at least one byte can be
66 /// skipped.
67 async fn blocking_skip(&mut self, nelem: usize) -> StreamResult<usize> {
68 let bs = self.blocking_read(nelem).await?;
69 Ok(bs.len())
70 }
71
72 /// Cancel any asynchronous work and wait for it to wrap up.
73 async fn cancel(&mut self) {}
74}
75
76/// Representation of the `error` resource type in the `wasi:io/error`
77/// interface.
78///
79/// This is currently `wasmtime::Error` to retain full type information for
80/// errors.
81pub type Error = wasmtime::Error;
82
83pub type StreamResult<T> = Result<T, StreamError>;
84
85#[derive(Debug)]
86pub enum StreamError {
87 Closed,
88 LastOperationFailed(wasmtime::Error),
89 Trap(wasmtime::Error),
90}
91
92impl StreamError {
93 pub fn trap(msg: &str) -> StreamError {
94 StreamError::Trap(wasmtime::format_err!("{msg}"))
95 }
96}
97
98impl alloc::fmt::Display for StreamError {
99 fn fmt(&self, f: &mut alloc::fmt::Formatter<'_>) -> alloc::fmt::Result {
100 match self {
101 StreamError::Closed => write!(f, "closed"),
102 StreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"),
103 StreamError::Trap(e) => write!(f, "trap: {e}"),
104 }
105 }
106}
107
108impl core::error::Error for StreamError {
109 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
110 match self {
111 StreamError::Closed => None,
112 StreamError::LastOperationFailed(e) | StreamError::Trap(e) => e.source(),
113 }
114 }
115}
116
117impl From<wasmtime::component::ResourceTableError> for StreamError {
118 fn from(error: wasmtime::component::ResourceTableError) -> Self {
119 Self::Trap(error.into())
120 }
121}
122
123/// Host trait for implementing the `wasi:io/streams.output-stream` resource:
124/// A bytestream which can be written to.
125#[async_trait::async_trait]
126pub trait OutputStream: Pollable {
127 /// Write bytes after obtaining a permit to write those bytes
128 ///
129 /// Prior to calling [`write`](Self::write) the caller must call
130 /// [`check_write`](Self::check_write), which resolves to a non-zero permit
131 ///
132 /// This method must never block. The [`check_write`](Self::check_write)
133 /// permit indicates the maximum amount of bytes that are permitted to be
134 /// written in a single [`write`](Self::write) following the
135 /// [`check_write`](Self::check_write) resolution.
136 ///
137 /// # Errors
138 ///
139 /// Returns a [`StreamError`] if:
140 /// - stream is closed
141 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
142 /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
143 fn write(&mut self, bytes: Bytes) -> StreamResult<()>;
144
145 /// Trigger a flush of any bytes buffered in this stream implementation.
146 ///
147 /// This method may be called at any time and must never block.
148 ///
149 /// After this method is called, [`check_write`](Self::check_write) must
150 /// pend until flush is complete.
151 ///
152 /// When [`check_write`](Self::check_write) becomes ready after a flush,
153 /// that guarantees that all prior writes have been flushed from the
154 /// implementation successfully, or that any error associated with those
155 /// writes is reported in the return value of [`flush`](Self::flush) or
156 /// [`check_write`](Self::check_write)
157 ///
158 /// # Errors
159 ///
160 /// Returns a [`StreamError`] if:
161 /// - stream is closed
162 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
163 /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
164 fn flush(&mut self) -> StreamResult<()>;
165
166 /// Returns the number of bytes that are ready to be written to this stream.
167 ///
168 /// Zero bytes indicates that this stream is not currently ready for writing
169 /// and `ready()` must be awaited first.
170 ///
171 /// Note that this method does not block.
172 ///
173 /// # Errors
174 ///
175 /// Returns an [`StreamError`] if:
176 /// - stream is closed
177 /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
178 fn check_write(&mut self) -> StreamResult<usize>;
179
180 /// Perform a write of up to 4096 bytes, and then flush the stream. Block
181 /// until all of these operations are complete, or an error occurs.
182 ///
183 /// This is a convenience wrapper around the use of `check-write`,
184 /// `subscribe`, `write`, and `flush`, and is implemented with the
185 /// following pseudo-code:
186 ///
187 /// ```text
188 /// let pollable = this.subscribe();
189 /// while !contents.is_empty() {
190 /// // Wait for the stream to become writable
191 /// pollable.block();
192 /// let Ok(n) = this.check-write(); // eliding error handling
193 /// let len = min(n, contents.len());
194 /// let (chunk, rest) = contents.split_at(len);
195 /// this.write(chunk ); // eliding error handling
196 /// contents = rest;
197 /// }
198 /// this.flush();
199 /// // Wait for completion of `flush`
200 /// pollable.block();
201 /// // Check for any errors that arose during `flush`
202 /// let _ = this.check-write(); // eliding error handling
203 /// ```
204 async fn blocking_write_and_flush(&mut self, mut bytes: Bytes) -> StreamResult<()> {
205 loop {
206 let permit = self.write_ready().await?;
207 let len = bytes.len().min(permit);
208 let chunk = bytes.split_to(len);
209 self.write(chunk)?;
210 if bytes.is_empty() {
211 break;
212 }
213 }
214
215 // If the stream encounters an error, return it, but if the stream
216 // has become closed, do not.
217 match self.flush() {
218 Ok(_) => {}
219 Err(StreamError::Closed) => {}
220 Err(e) => Err(e)?,
221 };
222 match self.write_ready().await {
223 Ok(_) => {}
224 Err(StreamError::Closed) => {}
225 Err(e) => Err(e)?,
226 };
227
228 Ok(())
229 }
230
231 /// Repeatedly write a byte to a stream.
232 /// Important: this write must be non-blocking!
233 /// Returning an Err which downcasts to a [`StreamError`] will be
234 /// reported to Wasm as the empty error result. Otherwise, errors will trap.
235 fn write_zeroes(&mut self, nelem: usize) -> StreamResult<()> {
236 let n = self.check_write()?;
237 if nelem > n {
238 return Err(StreamError::trap(
239 "cannot write more zeroes than `check_write` allows",
240 ));
241 };
242 // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
243 // repeatedly from a 'static buffer of zeros.
244 let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
245 self.write(bs)?;
246 Ok(())
247 }
248
249 /// Simultaneously waits for this stream to be writable and then returns how
250 /// much may be written or the last error that happened.
251 async fn write_ready(&mut self) -> StreamResult<usize> {
252 let mut i = 0;
253 loop {
254 // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
255 self.ready().await;
256 let n = self.check_write()?;
257 if n > 0 {
258 return Ok(n);
259 }
260 if i >= MAX_BLOCKING_ATTEMPTS {
261 return Err(StreamError::trap("max blocking attempts exceeded"));
262 }
263 i += 1;
264 }
265 }
266
267 /// Cancel any asynchronous work and wait for it to wrap up.
268 async fn cancel(&mut self) {}
269}
270
271#[async_trait::async_trait]
272impl Pollable for Box<dyn OutputStream> {
273 async fn ready(&mut self) {
274 (**self).ready().await
275 }
276}
277
278#[async_trait::async_trait]
279impl Pollable for Box<dyn InputStream> {
280 async fn ready(&mut self) {
281 (**self).ready().await
282 }
283}
284
285pub type DynInputStream = Box<dyn InputStream>;
286
287pub type DynOutputStream = Box<dyn OutputStream>;