wasmtime_wasi/
cli.rs

1use crate::p2;
2use std::pin::Pin;
3use std::sync::Arc;
4use tokio::io::{AsyncRead, AsyncWrite, empty};
5use wasmtime::component::{HasData, ResourceTable};
6use wasmtime_wasi_io::streams::{InputStream, OutputStream};
7
8mod empty;
9mod file;
10mod locked_async;
11mod mem;
12mod stdout;
13mod worker_thread_stdin;
14
15pub use self::file::{InputFile, OutputFile};
16pub use self::locked_async::{AsyncStdinStream, AsyncStdoutStream};
17
18// Convenience reexport for stdio types so tokio doesn't have to be imported
19// itself.
20#[doc(no_inline)]
21pub use tokio::io::{Stderr, Stdin, Stdout, stderr, stdin, stdout};
22
23/// A helper struct which implements [`HasData`] for the `wasi:cli` APIs.
24///
25/// This can be useful when directly calling `add_to_linker` functions directly,
26/// such as [`wasmtime_wasi::p2::bindings::cli::environment::add_to_linker`] as
27/// the `D` type parameter. See [`HasData`] for more information about the type
28/// parameter's purpose.
29///
30/// When using this type you can skip the [`WasiCliView`] trait, for
31/// example.
32///
33/// # Examples
34///
35/// ```
36/// use wasmtime::component::{Linker, ResourceTable};
37/// use wasmtime::{Engine, Result, Config};
38/// use wasmtime_wasi::cli::*;
39///
40/// struct MyStoreState {
41///     table: ResourceTable,
42///     cli: WasiCliCtx,
43/// }
44///
45/// fn main() -> Result<()> {
46///     let mut config = Config::new();
47///     config.async_support(true);
48///     let engine = Engine::new(&config)?;
49///     let mut linker = Linker::new(&engine);
50///
51///     wasmtime_wasi::p2::bindings::cli::environment::add_to_linker::<MyStoreState, WasiCli>(
52///         &mut linker,
53///         |state| WasiCliCtxView {
54///             table: &mut state.table,
55///             ctx: &mut state.cli,
56///         },
57///     )?;
58///     Ok(())
59/// }
60/// ```
61pub struct WasiCli;
62
63impl HasData for WasiCli {
64    type Data<'a> = WasiCliCtxView<'a>;
65}
66
67/// Provides a "view" of `wasi:cli`-related context used to implement host
68/// traits.
69pub trait WasiCliView: Send {
70    fn cli(&mut self) -> WasiCliCtxView<'_>;
71}
72
73pub struct WasiCliCtxView<'a> {
74    pub ctx: &'a mut WasiCliCtx,
75    pub table: &'a mut ResourceTable,
76}
77
78pub struct WasiCliCtx {
79    pub(crate) environment: Vec<(String, String)>,
80    pub(crate) arguments: Vec<String>,
81    pub(crate) initial_cwd: Option<String>,
82    pub(crate) stdin: Box<dyn StdinStream>,
83    pub(crate) stdout: Box<dyn StdoutStream>,
84    pub(crate) stderr: Box<dyn StdoutStream>,
85}
86
87impl Default for WasiCliCtx {
88    fn default() -> WasiCliCtx {
89        WasiCliCtx {
90            environment: Vec::new(),
91            arguments: Vec::new(),
92            initial_cwd: None,
93            stdin: Box::new(empty()),
94            stdout: Box::new(empty()),
95            stderr: Box::new(empty()),
96        }
97    }
98}
99
100pub trait IsTerminal {
101    /// Returns whether this stream is backed by a TTY.
102    fn is_terminal(&self) -> bool;
103}
104
105/// A trait used to represent the standard input to a guest program.
106///
107/// Note that there are many built-in implementations of this trait for various
108/// types such as [`tokio::io::Stdin`], [`tokio::io::Empty`], and
109/// [`p2::pipe::MemoryInputPipe`].
110pub trait StdinStream: IsTerminal + Send {
111    /// Creates a fresh stream which is reading stdin.
112    ///
113    /// Note that the returned stream must share state with all other streams
114    /// previously created. Guests may create multiple handles to the same stdin
115    /// and they should all be synchronized in their progress through the
116    /// program's input.
117    ///
118    /// Note that this means that if one handle becomes ready for reading they
119    /// all become ready for reading. Subsequently if one is read from it may
120    /// mean that all the others are no longer ready for reading. This is
121    /// basically a consequence of the way the WIT APIs are designed today.
122    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>;
123
124    /// Same as [`Self::async_stream`] except that a WASIp2 [`InputStream`] is
125    /// returned.
126    ///
127    /// Note that this has a default implementation which uses
128    /// [`p2::pipe::AsyncReadStream`] as an adapter, but this can be overridden
129    /// if there's a more specialized implementation available.
130    fn p2_stream(&self) -> Box<dyn InputStream> {
131        Box::new(p2::pipe::AsyncReadStream::new(Pin::from(
132            self.async_stream(),
133        )))
134    }
135}
136
137/// Similar to [`StdinStream`], except for output.
138///
139/// This is used both for a guest stdin and a guest stdout.
140///
141/// Note that there are many built-in implementations of this trait for various
142/// types such as [`tokio::io::Stdout`], [`tokio::io::Empty`], and
143/// [`p2::pipe::MemoryOutputPipe`].
144pub trait StdoutStream: IsTerminal + Send {
145    /// Returns a fresh new stream which can write to this output stream.
146    ///
147    /// Note that all output streams should output to the same logical source.
148    /// This means that it's possible for each independent stream to acquire a
149    /// separate "permit" to write and then act on that permit. Note that
150    /// additionally at this time once a permit is "acquired" there's no way to
151    /// release it, for example you can wait for readiness and then never
152    /// actually write in WASI. This means that acquisition of a permit for one
153    /// stream cannot discount the size of a permit another stream could
154    /// obtain.
155    ///
156    /// Implementations must be able to handle this
157    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>;
158
159    /// Same as [`Self::async_stream`] except that a WASIp2 [`OutputStream`] is
160    /// returned.
161    ///
162    /// Note that this has a default implementation which uses
163    /// [`p2::pipe::AsyncWriteStream`] as an adapter, but this can be overridden
164    /// if there's a more specialized implementation available.
165    fn p2_stream(&self) -> Box<dyn OutputStream> {
166        Box::new(p2::pipe::AsyncWriteStream::new(
167            8192, // FIXME: extract this to a constant.
168            Pin::from(self.async_stream()),
169        ))
170    }
171}
172
173// Forward `&T => T`
174impl<T: ?Sized + IsTerminal> IsTerminal for &T {
175    fn is_terminal(&self) -> bool {
176        T::is_terminal(self)
177    }
178}
179impl<T: ?Sized + StdinStream + Sync> StdinStream for &T {
180    fn p2_stream(&self) -> Box<dyn InputStream> {
181        T::p2_stream(self)
182    }
183    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
184        T::async_stream(self)
185    }
186}
187impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &T {
188    fn p2_stream(&self) -> Box<dyn OutputStream> {
189        T::p2_stream(self)
190    }
191    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
192        T::async_stream(self)
193    }
194}
195
196// Forward `&mut T => T`
197impl<T: ?Sized + IsTerminal> IsTerminal for &mut T {
198    fn is_terminal(&self) -> bool {
199        T::is_terminal(self)
200    }
201}
202impl<T: ?Sized + StdinStream + Sync> StdinStream for &mut T {
203    fn p2_stream(&self) -> Box<dyn InputStream> {
204        T::p2_stream(self)
205    }
206    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
207        T::async_stream(self)
208    }
209}
210impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &mut T {
211    fn p2_stream(&self) -> Box<dyn OutputStream> {
212        T::p2_stream(self)
213    }
214    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
215        T::async_stream(self)
216    }
217}
218
219// Forward `Box<T> => T`
220impl<T: ?Sized + IsTerminal> IsTerminal for Box<T> {
221    fn is_terminal(&self) -> bool {
222        T::is_terminal(self)
223    }
224}
225impl<T: ?Sized + StdinStream + Sync> StdinStream for Box<T> {
226    fn p2_stream(&self) -> Box<dyn InputStream> {
227        T::p2_stream(self)
228    }
229    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
230        T::async_stream(self)
231    }
232}
233impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Box<T> {
234    fn p2_stream(&self) -> Box<dyn OutputStream> {
235        T::p2_stream(self)
236    }
237    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
238        T::async_stream(self)
239    }
240}
241
242// Forward `Arc<T> => T`
243impl<T: ?Sized + IsTerminal> IsTerminal for Arc<T> {
244    fn is_terminal(&self) -> bool {
245        T::is_terminal(self)
246    }
247}
248impl<T: ?Sized + StdinStream + Sync> StdinStream for Arc<T> {
249    fn p2_stream(&self) -> Box<dyn InputStream> {
250        T::p2_stream(self)
251    }
252    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
253        T::async_stream(self)
254    }
255}
256impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Arc<T> {
257    fn p2_stream(&self) -> Box<dyn OutputStream> {
258        T::p2_stream(self)
259    }
260    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
261        T::async_stream(self)
262    }
263}
264
265#[cfg(test)]
266mod test {
267    use crate::cli::{AsyncStdoutStream, StdinStream, StdoutStream};
268    use crate::p2::{self, OutputStream};
269    use anyhow::Result;
270    use bytes::Bytes;
271    use tokio::io::AsyncReadExt;
272
273    #[test]
274    fn memory_stdin_stream() {
275        // A StdinStream has the property that there are multiple
276        // InputStreams created, using the stream() method which are each
277        // views on the same shared state underneath. Consuming input on one
278        // stream results in consuming that input on all streams.
279        //
280        // The simplest way to measure this is to check if the MemoryInputPipe
281        // impl of StdinStream follows this property.
282
283        let pipe =
284            p2::pipe::MemoryInputPipe::new("the quick brown fox jumped over the three lazy dogs");
285
286        let mut view1 = pipe.p2_stream();
287        let mut view2 = pipe.p2_stream();
288
289        let read1 = view1.read(10).expect("read first 10 bytes");
290        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
291        let read2 = view2.read(10).expect("read second 10 bytes");
292        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
293        let read3 = view1.read(10).expect("read third 10 bytes");
294        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
295        let read4 = view2.read(10).expect("read fourth 10 bytes");
296        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
297    }
298
299    #[tokio::test]
300    async fn async_stdin_stream() {
301        // A StdinStream has the property that there are multiple
302        // InputStreams created, using the stream() method which are each
303        // views on the same shared state underneath. Consuming input on one
304        // stream results in consuming that input on all streams.
305        //
306        // AsyncStdinStream is a slightly more complex impl of StdinStream
307        // than the MemoryInputPipe above. We can create an AsyncReadStream
308        // from a file on the disk, and an AsyncStdinStream from that common
309        // stream, then check that the same property holds as above.
310
311        let dir = tempfile::tempdir().unwrap();
312        let mut path = std::path::PathBuf::from(dir.path());
313        path.push("file");
314        std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
315
316        let file = tokio::fs::File::open(&path)
317            .await
318            .expect("open created file");
319        let stdin_stream = super::AsyncStdinStream::new(file);
320
321        use super::StdinStream;
322
323        let mut view1 = stdin_stream.p2_stream();
324        let mut view2 = stdin_stream.p2_stream();
325
326        view1.ready().await;
327
328        let read1 = view1.read(10).expect("read first 10 bytes");
329        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
330        let read2 = view2.read(10).expect("read second 10 bytes");
331        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
332        let read3 = view1.read(10).expect("read third 10 bytes");
333        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
334        let read4 = view2.read(10).expect("read fourth 10 bytes");
335        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
336    }
337
338    #[tokio::test]
339    async fn async_stdout_stream_unblocks() {
340        let (mut read, write) = tokio::io::duplex(32);
341        let stdout = AsyncStdoutStream::new(32, write);
342
343        let task = tokio::task::spawn(async move {
344            let mut stream = stdout.p2_stream();
345            blocking_write_and_flush(&mut *stream, "x".into())
346                .await
347                .unwrap();
348        });
349
350        let mut buf = [0; 100];
351        let n = read.read(&mut buf).await.unwrap();
352        assert_eq!(&buf[..n], b"x");
353
354        task.await.unwrap();
355    }
356
357    async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
358        while !bytes.is_empty() {
359            let permit = s.write_ready().await?;
360            let len = bytes.len().min(permit);
361            let chunk = bytes.split_to(len);
362            s.write(chunk)?;
363        }
364
365        s.flush()?;
366        s.write_ready().await?;
367        Ok(())
368    }
369}