wasmtime_wasi/
cli.rs

1use crate::p2;
2use std::pin::Pin;
3use std::sync::Arc;
4use tokio::io::{AsyncRead, AsyncWrite, empty};
5use wasmtime::component::{HasData, ResourceTable};
6use wasmtime_wasi_io::streams::{InputStream, OutputStream};
7
8mod empty;
9mod file;
10mod locked_async;
11mod mem;
12mod stdout;
13mod worker_thread_stdin;
14
15pub use self::file::{InputFile, OutputFile};
16pub use self::locked_async::{AsyncStdinStream, AsyncStdoutStream};
17
18// Convenience reexport for stdio types so tokio doesn't have to be imported
19// itself.
20#[doc(no_inline)]
21pub use tokio::io::{Stderr, Stdin, Stdout, stderr, stdin, stdout};
22
23pub(crate) struct WasiCli;
24
25impl HasData for WasiCli {
26    type Data<'a> = WasiCliCtxView<'a>;
27}
28
29/// Provides a "view" of `wasi:cli`-related context used to implement host
30/// traits.
31pub trait WasiCliView: Send {
32    fn cli(&mut self) -> WasiCliCtxView<'_>;
33}
34
35pub struct WasiCliCtxView<'a> {
36    pub ctx: &'a mut WasiCliCtx,
37    pub table: &'a mut ResourceTable,
38}
39
40pub struct WasiCliCtx {
41    pub(crate) environment: Vec<(String, String)>,
42    pub(crate) arguments: Vec<String>,
43    pub(crate) initial_cwd: Option<String>,
44    pub(crate) stdin: Box<dyn StdinStream>,
45    pub(crate) stdout: Box<dyn StdoutStream>,
46    pub(crate) stderr: Box<dyn StdoutStream>,
47}
48
49impl Default for WasiCliCtx {
50    fn default() -> WasiCliCtx {
51        WasiCliCtx {
52            environment: Vec::new(),
53            arguments: Vec::new(),
54            initial_cwd: None,
55            stdin: Box::new(empty()),
56            stdout: Box::new(empty()),
57            stderr: Box::new(empty()),
58        }
59    }
60}
61
62pub trait IsTerminal {
63    /// Returns whether this stream is backed by a TTY.
64    fn is_terminal(&self) -> bool;
65}
66
67/// A trait used to represent the standard input to a guest program.
68///
69/// Note that there are many built-in implementations of this trait for various
70/// types such as [`tokio::io::Stdin`], [`tokio::io::Empty`], and
71/// [`p2::pipe::MemoryInputPipe`].
72pub trait StdinStream: IsTerminal + Send {
73    /// Creates a fresh stream which is reading stdin.
74    ///
75    /// Note that the returned stream must share state with all other streams
76    /// previously created. Guests may create multiple handles to the same stdin
77    /// and they should all be synchronized in their progress through the
78    /// program's input.
79    ///
80    /// Note that this means that if one handle becomes ready for reading they
81    /// all become ready for reading. Subsequently if one is read from it may
82    /// mean that all the others are no longer ready for reading. This is
83    /// basically a consequence of the way the WIT APIs are designed today.
84    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>;
85
86    /// Same as [`Self::async_stream`] except that a WASIp2 [`InputStream`] is
87    /// returned.
88    ///
89    /// Note that this has a default implementation which uses
90    /// [`p2::pipe::AsyncReadStream`] as an adapter, but this can be overridden
91    /// if there's a more specialized implementation available.
92    fn p2_stream(&self) -> Box<dyn InputStream> {
93        Box::new(p2::pipe::AsyncReadStream::new(Pin::from(
94            self.async_stream(),
95        )))
96    }
97}
98
99/// Similar to [`StdinStream`], except for output.
100///
101/// This is used both for a guest stdin and a guest stdout.
102///
103/// Note that there are many built-in implementations of this trait for various
104/// types such as [`tokio::io::Stdout`], [`tokio::io::Empty`], and
105/// [`p2::pipe::MemoryOutputPipe`].
106pub trait StdoutStream: IsTerminal + Send {
107    /// Returns a fresh new stream which can write to this output stream.
108    ///
109    /// Note that all output streams should output to the same logical source.
110    /// This means that it's possible for each independent stream to acquire a
111    /// separate "permit" to write and then act on that permit. Note that
112    /// additionally at this time once a permit is "acquired" there's no way to
113    /// release it, for example you can wait for readiness and then never
114    /// actually write in WASI. This means that acquisition of a permit for one
115    /// stream cannot discount the size of a permit another stream could
116    /// obtain.
117    ///
118    /// Implementations must be able to handle this
119    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>;
120
121    /// Same as [`Self::async_stream`] except that a WASIp2 [`OutputStream`] is
122    /// returned.
123    ///
124    /// Note that this has a default implementation which uses
125    /// [`p2::pipe::AsyncWriteStream`] as an adapter, but this can be overridden
126    /// if there's a more specialized implementation available.
127    fn p2_stream(&self) -> Box<dyn OutputStream> {
128        Box::new(p2::pipe::AsyncWriteStream::new(
129            8192, // FIXME: extract this to a constant.
130            Pin::from(self.async_stream()),
131        ))
132    }
133}
134
135// Forward `&T => T`
136impl<T: ?Sized + IsTerminal> IsTerminal for &T {
137    fn is_terminal(&self) -> bool {
138        T::is_terminal(self)
139    }
140}
141impl<T: ?Sized + StdinStream + Sync> StdinStream for &T {
142    fn p2_stream(&self) -> Box<dyn InputStream> {
143        T::p2_stream(self)
144    }
145    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
146        T::async_stream(self)
147    }
148}
149impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &T {
150    fn p2_stream(&self) -> Box<dyn OutputStream> {
151        T::p2_stream(self)
152    }
153    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
154        T::async_stream(self)
155    }
156}
157
158// Forward `&mut T => T`
159impl<T: ?Sized + IsTerminal> IsTerminal for &mut T {
160    fn is_terminal(&self) -> bool {
161        T::is_terminal(self)
162    }
163}
164impl<T: ?Sized + StdinStream + Sync> StdinStream for &mut T {
165    fn p2_stream(&self) -> Box<dyn InputStream> {
166        T::p2_stream(self)
167    }
168    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
169        T::async_stream(self)
170    }
171}
172impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &mut T {
173    fn p2_stream(&self) -> Box<dyn OutputStream> {
174        T::p2_stream(self)
175    }
176    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
177        T::async_stream(self)
178    }
179}
180
181// Forward `Box<T> => T`
182impl<T: ?Sized + IsTerminal> IsTerminal for Box<T> {
183    fn is_terminal(&self) -> bool {
184        T::is_terminal(self)
185    }
186}
187impl<T: ?Sized + StdinStream + Sync> StdinStream for Box<T> {
188    fn p2_stream(&self) -> Box<dyn InputStream> {
189        T::p2_stream(self)
190    }
191    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
192        T::async_stream(self)
193    }
194}
195impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Box<T> {
196    fn p2_stream(&self) -> Box<dyn OutputStream> {
197        T::p2_stream(self)
198    }
199    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
200        T::async_stream(self)
201    }
202}
203
204// Forward `Arc<T> => T`
205impl<T: ?Sized + IsTerminal> IsTerminal for Arc<T> {
206    fn is_terminal(&self) -> bool {
207        T::is_terminal(self)
208    }
209}
210impl<T: ?Sized + StdinStream + Sync> StdinStream for Arc<T> {
211    fn p2_stream(&self) -> Box<dyn InputStream> {
212        T::p2_stream(self)
213    }
214    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
215        T::async_stream(self)
216    }
217}
218impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Arc<T> {
219    fn p2_stream(&self) -> Box<dyn OutputStream> {
220        T::p2_stream(self)
221    }
222    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
223        T::async_stream(self)
224    }
225}
226
227#[cfg(test)]
228mod test {
229    use crate::cli::{AsyncStdoutStream, StdinStream, StdoutStream};
230    use crate::p2::{self, OutputStream};
231    use anyhow::Result;
232    use bytes::Bytes;
233    use tokio::io::AsyncReadExt;
234
235    #[test]
236    fn memory_stdin_stream() {
237        // A StdinStream has the property that there are multiple
238        // InputStreams created, using the stream() method which are each
239        // views on the same shared state underneath. Consuming input on one
240        // stream results in consuming that input on all streams.
241        //
242        // The simplest way to measure this is to check if the MemoryInputPipe
243        // impl of StdinStream follows this property.
244
245        let pipe =
246            p2::pipe::MemoryInputPipe::new("the quick brown fox jumped over the three lazy dogs");
247
248        let mut view1 = pipe.p2_stream();
249        let mut view2 = pipe.p2_stream();
250
251        let read1 = view1.read(10).expect("read first 10 bytes");
252        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
253        let read2 = view2.read(10).expect("read second 10 bytes");
254        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
255        let read3 = view1.read(10).expect("read third 10 bytes");
256        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
257        let read4 = view2.read(10).expect("read fourth 10 bytes");
258        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
259    }
260
261    #[tokio::test]
262    async fn async_stdin_stream() {
263        // A StdinStream has the property that there are multiple
264        // InputStreams created, using the stream() method which are each
265        // views on the same shared state underneath. Consuming input on one
266        // stream results in consuming that input on all streams.
267        //
268        // AsyncStdinStream is a slightly more complex impl of StdinStream
269        // than the MemoryInputPipe above. We can create an AsyncReadStream
270        // from a file on the disk, and an AsyncStdinStream from that common
271        // stream, then check that the same property holds as above.
272
273        let dir = tempfile::tempdir().unwrap();
274        let mut path = std::path::PathBuf::from(dir.path());
275        path.push("file");
276        std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
277
278        let file = tokio::fs::File::open(&path)
279            .await
280            .expect("open created file");
281        let stdin_stream = super::AsyncStdinStream::new(file);
282
283        use super::StdinStream;
284
285        let mut view1 = stdin_stream.p2_stream();
286        let mut view2 = stdin_stream.p2_stream();
287
288        view1.ready().await;
289
290        let read1 = view1.read(10).expect("read first 10 bytes");
291        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
292        let read2 = view2.read(10).expect("read second 10 bytes");
293        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
294        let read3 = view1.read(10).expect("read third 10 bytes");
295        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
296        let read4 = view2.read(10).expect("read fourth 10 bytes");
297        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
298    }
299
300    #[tokio::test]
301    async fn async_stdout_stream_unblocks() {
302        let (mut read, write) = tokio::io::duplex(32);
303        let stdout = AsyncStdoutStream::new(32, write);
304
305        let task = tokio::task::spawn(async move {
306            let mut stream = stdout.p2_stream();
307            blocking_write_and_flush(&mut *stream, "x".into())
308                .await
309                .unwrap();
310        });
311
312        let mut buf = [0; 100];
313        let n = read.read(&mut buf).await.unwrap();
314        assert_eq!(&buf[..n], b"x");
315
316        task.await.unwrap();
317    }
318
319    async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
320        while !bytes.is_empty() {
321            let permit = s.write_ready().await?;
322            let len = bytes.len().min(permit);
323            let chunk = bytes.split_to(len);
324            s.write(chunk)?;
325        }
326
327        s.flush()?;
328        s.write_ready().await?;
329        Ok(())
330    }
331}