wasmtime_wasi/p2/
stdio.rs

1use crate::p2::bindings::cli::{
2    stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin,
3    terminal_stdout,
4};
5use crate::p2::pipe;
6use crate::p2::{
7    InputStream, IoView, OutputStream, Pollable, StreamError, StreamResult, WasiImpl, WasiView,
8};
9use bytes::Bytes;
10use std::io::IsTerminal;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use wasmtime::component::Resource;
14use wasmtime_wasi_io::streams;
15
16/// A trait used to represent the standard input to a guest program.
17///
18/// This is used to implement various WASI APIs via the method implementations
19/// below.
20///
21/// Built-in implementations are provided for [`Stdin`],
22/// [`pipe::MemoryInputPipe`], and [`pipe::ClosedInputStream`].
23pub trait StdinStream: Send {
24    /// Creates a fresh stream which is reading stdin.
25    ///
26    /// Note that the returned stream must share state with all other streams
27    /// previously created. Guests may create multiple handles to the same stdin
28    /// and they should all be synchronized in their progress through the
29    /// program's input.
30    ///
31    /// Note that this means that if one handle becomes ready for reading they
32    /// all become ready for reading. Subsequently if one is read from it may
33    /// mean that all the others are no longer ready for reading. This is
34    /// basically a consequence of the way the WIT APIs are designed today.
35    fn stream(&self) -> Box<dyn InputStream>;
36
37    /// Returns whether this stream is backed by a TTY.
38    fn isatty(&self) -> bool;
39}
40
41impl StdinStream for pipe::MemoryInputPipe {
42    fn stream(&self) -> Box<dyn InputStream> {
43        Box::new(self.clone())
44    }
45
46    fn isatty(&self) -> bool {
47        false
48    }
49}
50
51impl StdinStream for pipe::ClosedInputStream {
52    fn stream(&self) -> Box<dyn InputStream> {
53        Box::new(*self)
54    }
55
56    fn isatty(&self) -> bool {
57        false
58    }
59}
60
61/// This implementation will yield input streams that block on reads, and
62/// reads directly from a file. If truly async input is required,
63/// [`AsyncStdinStream`] should be used instead.
64pub struct InputFile {
65    file: Arc<std::fs::File>,
66}
67
68impl InputFile {
69    pub fn new(file: std::fs::File) -> Self {
70        Self {
71            file: Arc::new(file),
72        }
73    }
74}
75
76impl StdinStream for InputFile {
77    fn stream(&self) -> Box<dyn InputStream> {
78        Box::new(InputFileStream {
79            file: Arc::clone(&self.file),
80        })
81    }
82
83    fn isatty(&self) -> bool {
84        false
85    }
86}
87
88struct InputFileStream {
89    file: Arc<std::fs::File>,
90}
91
92#[async_trait::async_trait]
93impl Pollable for InputFileStream {
94    async fn ready(&mut self) {}
95}
96
97impl InputStream for InputFileStream {
98    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
99        use std::io::Read;
100
101        let mut buf = bytes::BytesMut::zeroed(size);
102        let bytes_read = self
103            .file
104            .read(&mut buf)
105            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
106        if bytes_read == 0 {
107            return Err(StreamError::Closed);
108        }
109        buf.truncate(bytes_read);
110        StreamResult::Ok(buf.into())
111    }
112}
113
114/// An impl of [`StdinStream`] built on top of [`crate::p2::pipe::AsyncReadStream`].
115//
116// Note the usage of `tokio::sync::Mutex` here as opposed to a
117// `std::sync::Mutex`. This is intentionally done to implement the `Pollable`
118// variant of this trait. Note that in doing so we're left with the quandry of
119// how to implement methods of `InputStream` since those methods are not
120// `async`. They're currently implemented with `try_lock`, which then raises the
121// question of what to do on contention. Currently traps are returned.
122//
123// Why should it be ok to return a trap? In general concurrency/contention
124// shouldn't return a trap since it should be able to happen normally. The
125// current assumption, though, is that WASI stdin/stdout streams are special
126// enough that the contention case should never come up in practice. Currently
127// in WASI there is no actually concurrency, there's just the items in a single
128// `Store` and that store owns all of its I/O in a single Tokio task. There's no
129// means to actually spawn multiple Tokio tasks that use the same store. This
130// means at the very least that there's zero parallelism. Due to the lack of
131// multiple tasks that also means that there's no concurrency either.
132//
133// This `AsyncStdinStream` wrapper is only intended to be used by the WASI
134// bindings themselves. It's possible for the host to take this and work with it
135// on its own task, but that's niche enough it's not designed for.
136//
137// Overall that means that the guest is either calling `Pollable` or
138// `InputStream` methods. This means that there should never be contention
139// between the two at this time. This may all change in the future with WASI
140// 0.3, but perhaps we'll have a better story for stdio at that time (see the
141// doc block on the `OutputStream` impl below)
142pub struct AsyncStdinStream(Arc<Mutex<crate::p2::pipe::AsyncReadStream>>);
143
144impl AsyncStdinStream {
145    pub fn new(s: crate::p2::pipe::AsyncReadStream) -> Self {
146        Self(Arc::new(Mutex::new(s)))
147    }
148}
149
150impl StdinStream for AsyncStdinStream {
151    fn stream(&self) -> Box<dyn InputStream> {
152        Box::new(Self(self.0.clone()))
153    }
154    fn isatty(&self) -> bool {
155        false
156    }
157}
158
159#[async_trait::async_trait]
160impl InputStream for AsyncStdinStream {
161    fn read(&mut self, size: usize) -> Result<bytes::Bytes, StreamError> {
162        match self.0.try_lock() {
163            Ok(mut stream) => stream.read(size),
164            Err(_) => Err(StreamError::trap("concurrent reads are not supported")),
165        }
166    }
167    fn skip(&mut self, size: usize) -> Result<usize, StreamError> {
168        match self.0.try_lock() {
169            Ok(mut stream) => stream.skip(size),
170            Err(_) => Err(StreamError::trap("concurrent skips are not supported")),
171        }
172    }
173    async fn cancel(&mut self) {
174        // Cancel the inner stream if we're the last reference to it:
175        if let Some(mutex) = Arc::get_mut(&mut self.0) {
176            match mutex.try_lock() {
177                Ok(mut stream) => stream.cancel().await,
178                Err(_) => {}
179            }
180        }
181    }
182}
183
184#[async_trait::async_trait]
185impl Pollable for AsyncStdinStream {
186    async fn ready(&mut self) {
187        self.0.lock().await.ready().await
188    }
189}
190
191mod worker_thread_stdin;
192pub use self::worker_thread_stdin::{Stdin, stdin};
193
194/// Similar to [`StdinStream`], except for output.
195pub trait StdoutStream: Send {
196    /// Returns a fresh new stream which can write to this output stream.
197    ///
198    /// Note that all output streams should output to the same logical source.
199    /// This means that it's possible for each independent stream to acquire a
200    /// separate "permit" to write and then act on that permit. Note that
201    /// additionally at this time once a permit is "acquired" there's no way to
202    /// release it, for example you can wait for readiness and then never
203    /// actually write in WASI. This means that acquisition of a permit for one
204    /// stream cannot discount the size of a permit another stream could
205    /// obtain.
206    ///
207    /// Implementations must be able to handle this
208    fn stream(&self) -> Box<dyn OutputStream>;
209
210    /// Returns whether this stream is backed by a TTY.
211    fn isatty(&self) -> bool;
212}
213
214impl StdoutStream for pipe::MemoryOutputPipe {
215    fn stream(&self) -> Box<dyn OutputStream> {
216        Box::new(self.clone())
217    }
218
219    fn isatty(&self) -> bool {
220        false
221    }
222}
223
224impl StdoutStream for pipe::SinkOutputStream {
225    fn stream(&self) -> Box<dyn OutputStream> {
226        Box::new(*self)
227    }
228
229    fn isatty(&self) -> bool {
230        false
231    }
232}
233
234impl StdoutStream for pipe::ClosedOutputStream {
235    fn stream(&self) -> Box<dyn OutputStream> {
236        Box::new(*self)
237    }
238
239    fn isatty(&self) -> bool {
240        false
241    }
242}
243
244/// This implementation will yield output streams that block on writes, and
245/// output directly to a file. If truly async output is required, [`AsyncStdoutStream`]
246/// should be used instead.
247pub struct OutputFile {
248    file: Arc<std::fs::File>,
249}
250
251impl OutputFile {
252    pub fn new(file: std::fs::File) -> Self {
253        Self {
254            file: Arc::new(file),
255        }
256    }
257}
258
259impl StdoutStream for OutputFile {
260    fn stream(&self) -> Box<dyn OutputStream> {
261        Box::new(OutputFileStream {
262            file: Arc::clone(&self.file),
263        })
264    }
265
266    fn isatty(&self) -> bool {
267        false
268    }
269}
270
271struct OutputFileStream {
272    file: Arc<std::fs::File>,
273}
274
275#[async_trait::async_trait]
276impl Pollable for OutputFileStream {
277    async fn ready(&mut self) {}
278}
279
280impl OutputStream for OutputFileStream {
281    fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
282        use std::io::Write;
283        self.file
284            .write_all(&bytes)
285            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
286    }
287
288    fn flush(&mut self) -> StreamResult<()> {
289        use std::io::Write;
290        self.file
291            .flush()
292            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
293    }
294
295    fn check_write(&mut self) -> StreamResult<usize> {
296        Ok(1024 * 1024)
297    }
298}
299
300/// This implementation will yield output streams that block on writes, as they
301/// inherit the implementation directly from the rust std library. A different
302/// implementation of [`StdoutStream`] will be necessary if truly async output
303/// streams are required.
304pub struct Stdout;
305
306/// Returns a stream that represents the host's standard out.
307///
308/// Suitable for passing to
309/// [`WasiCtxBuilder::stdout`](crate::p2::WasiCtxBuilder::stdout).
310pub fn stdout() -> Stdout {
311    Stdout
312}
313
314impl StdoutStream for Stdout {
315    fn stream(&self) -> Box<dyn OutputStream> {
316        Box::new(StdioOutputStream::Stdout)
317    }
318
319    fn isatty(&self) -> bool {
320        std::io::stdout().is_terminal()
321    }
322}
323
324/// This implementation will yield output streams that block on writes, as they
325/// inherit the implementation directly from the rust std library. A different
326/// implementation of [`StdoutStream`] will be necessary if truly async output
327/// streams are required.
328pub struct Stderr;
329
330/// Returns a stream that represents the host's standard err.
331///
332/// Suitable for passing to
333/// [`WasiCtxBuilder::stderr`](crate::p2::WasiCtxBuilder::stderr).
334pub fn stderr() -> Stderr {
335    Stderr
336}
337
338impl StdoutStream for Stderr {
339    fn stream(&self) -> Box<dyn OutputStream> {
340        Box::new(StdioOutputStream::Stderr)
341    }
342
343    fn isatty(&self) -> bool {
344        std::io::stderr().is_terminal()
345    }
346}
347
348enum StdioOutputStream {
349    Stdout,
350    Stderr,
351}
352
353impl OutputStream for StdioOutputStream {
354    fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
355        use std::io::Write;
356        match self {
357            StdioOutputStream::Stdout => std::io::stdout().write_all(&bytes),
358            StdioOutputStream::Stderr => std::io::stderr().write_all(&bytes),
359        }
360        .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
361    }
362
363    fn flush(&mut self) -> StreamResult<()> {
364        use std::io::Write;
365        match self {
366            StdioOutputStream::Stdout => std::io::stdout().flush(),
367            StdioOutputStream::Stderr => std::io::stderr().flush(),
368        }
369        .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
370    }
371
372    fn check_write(&mut self) -> StreamResult<usize> {
373        Ok(1024 * 1024)
374    }
375}
376
377#[async_trait::async_trait]
378impl Pollable for StdioOutputStream {
379    async fn ready(&mut self) {}
380}
381
382/// A wrapper of [`crate::p2::pipe::AsyncWriteStream`] that implements
383/// [`StdoutStream`]. Note that the [`OutputStream`] impl for this is not
384/// correct when used for interleaved async IO.
385//
386// Note that the use of `tokio::sync::Mutex` here is intentional, in addition to
387// the `try_lock()` calls below in the implementation of `OutputStream`. For
388// more information see the documentation on `AsyncStdinStream`.
389pub struct AsyncStdoutStream(Arc<Mutex<crate::p2::pipe::AsyncWriteStream>>);
390
391impl AsyncStdoutStream {
392    pub fn new(s: crate::p2::pipe::AsyncWriteStream) -> Self {
393        Self(Arc::new(Mutex::new(s)))
394    }
395}
396
397impl StdoutStream for AsyncStdoutStream {
398    fn stream(&self) -> Box<dyn OutputStream> {
399        Box::new(Self(self.0.clone()))
400    }
401    fn isatty(&self) -> bool {
402        false
403    }
404}
405
406// This implementation is known to be bogus. All check-writes and writes are
407// directed at the same underlying stream. The check-write/write protocol does
408// require the size returned by a check-write to be accepted by write, even if
409// other side-effects happen between those calls, and this implementation
410// permits another view (created by StdoutStream::stream()) of the same
411// underlying stream to accept a write which will invalidate a prior
412// check-write of another view.
413// Ultimately, the Std{in,out}Stream::stream() methods exist because many
414// different places in a linked component (which may itself contain many
415// modules) may need to access stdio without any coordination to keep those
416// accesses all using pointing to the same resource. So, we allow many
417// resources to be created. We have the reasonable expectation that programs
418// won't attempt to interleave async IO from these disparate uses of stdio.
419// If that expectation doesn't turn out to be true, and you find yourself at
420// this comment to correct it: sorry about that.
421#[async_trait::async_trait]
422impl OutputStream for AsyncStdoutStream {
423    fn check_write(&mut self) -> Result<usize, StreamError> {
424        match self.0.try_lock() {
425            Ok(mut stream) => stream.check_write(),
426            Err(_) => Err(StreamError::trap("concurrent writes are not supported")),
427        }
428    }
429    fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
430        match self.0.try_lock() {
431            Ok(mut stream) => stream.write(bytes),
432            Err(_) => Err(StreamError::trap("concurrent writes not supported yet")),
433        }
434    }
435    fn flush(&mut self) -> Result<(), StreamError> {
436        match self.0.try_lock() {
437            Ok(mut stream) => stream.flush(),
438            Err(_) => Err(StreamError::trap("concurrent flushes not supported yet")),
439        }
440    }
441    async fn cancel(&mut self) {
442        // Cancel the inner stream if we're the last reference to it:
443        if let Some(mutex) = Arc::get_mut(&mut self.0) {
444            match mutex.try_lock() {
445                Ok(mut stream) => stream.cancel().await,
446                Err(_) => {}
447            }
448        }
449    }
450}
451
452#[async_trait::async_trait]
453impl Pollable for AsyncStdoutStream {
454    async fn ready(&mut self) {
455        self.0.lock().await.ready().await
456    }
457}
458
459#[derive(Debug, Clone, Copy, PartialEq, Eq)]
460pub enum IsATTY {
461    Yes,
462    No,
463}
464
465impl<T> stdin::Host for WasiImpl<T>
466where
467    T: WasiView,
468{
469    fn get_stdin(&mut self) -> Result<Resource<streams::DynInputStream>, anyhow::Error> {
470        let stream = self.ctx().stdin.stream();
471        Ok(self.table().push(stream)?)
472    }
473}
474
475impl<T> stdout::Host for WasiImpl<T>
476where
477    T: WasiView,
478{
479    fn get_stdout(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
480        let stream = self.ctx().stdout.stream();
481        Ok(self.table().push(stream)?)
482    }
483}
484
485impl<T> stderr::Host for WasiImpl<T>
486where
487    T: WasiView,
488{
489    fn get_stderr(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
490        let stream = self.ctx().stderr.stream();
491        Ok(self.table().push(stream)?)
492    }
493}
494
495pub struct TerminalInput;
496pub struct TerminalOutput;
497
498impl<T> terminal_input::Host for WasiImpl<T> where T: WasiView {}
499impl<T> terminal_input::HostTerminalInput for WasiImpl<T>
500where
501    T: WasiView,
502{
503    fn drop(&mut self, r: Resource<TerminalInput>) -> anyhow::Result<()> {
504        self.table().delete(r)?;
505        Ok(())
506    }
507}
508impl<T> terminal_output::Host for WasiImpl<T> where T: WasiView {}
509impl<T> terminal_output::HostTerminalOutput for WasiImpl<T>
510where
511    T: WasiView,
512{
513    fn drop(&mut self, r: Resource<TerminalOutput>) -> anyhow::Result<()> {
514        self.table().delete(r)?;
515        Ok(())
516    }
517}
518impl<T> terminal_stdin::Host for WasiImpl<T>
519where
520    T: WasiView,
521{
522    fn get_terminal_stdin(&mut self) -> anyhow::Result<Option<Resource<TerminalInput>>> {
523        if self.ctx().stdin.isatty() {
524            let fd = self.table().push(TerminalInput)?;
525            Ok(Some(fd))
526        } else {
527            Ok(None)
528        }
529    }
530}
531impl<T> terminal_stdout::Host for WasiImpl<T>
532where
533    T: WasiView,
534{
535    fn get_terminal_stdout(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
536        if self.ctx().stdout.isatty() {
537            let fd = self.table().push(TerminalOutput)?;
538            Ok(Some(fd))
539        } else {
540            Ok(None)
541        }
542    }
543}
544impl<T> terminal_stderr::Host for WasiImpl<T>
545where
546    T: WasiView,
547{
548    fn get_terminal_stderr(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
549        if self.ctx().stderr.isatty() {
550            let fd = self.table().push(TerminalOutput)?;
551            Ok(Some(fd))
552        } else {
553            Ok(None)
554        }
555    }
556}
557
558#[cfg(test)]
559mod test {
560    use crate::p2::stdio::StdoutStream;
561    use crate::p2::write_stream::AsyncWriteStream;
562    use crate::p2::{AsyncStdoutStream, OutputStream};
563    use anyhow::Result;
564    use bytes::Bytes;
565    use tokio::io::AsyncReadExt;
566
567    #[test]
568    fn memory_stdin_stream() {
569        // A StdinStream has the property that there are multiple
570        // InputStreams created, using the stream() method which are each
571        // views on the same shared state underneath. Consuming input on one
572        // stream results in consuming that input on all streams.
573        //
574        // The simplest way to measure this is to check if the MemoryInputPipe
575        // impl of StdinStream follows this property.
576
577        let pipe = super::pipe::MemoryInputPipe::new(
578            "the quick brown fox jumped over the three lazy dogs",
579        );
580
581        use super::StdinStream;
582
583        let mut view1 = pipe.stream();
584        let mut view2 = pipe.stream();
585
586        let read1 = view1.read(10).expect("read first 10 bytes");
587        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
588        let read2 = view2.read(10).expect("read second 10 bytes");
589        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
590        let read3 = view1.read(10).expect("read third 10 bytes");
591        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
592        let read4 = view2.read(10).expect("read fourth 10 bytes");
593        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
594    }
595
596    #[tokio::test]
597    async fn async_stdin_stream() {
598        // A StdinStream has the property that there are multiple
599        // InputStreams created, using the stream() method which are each
600        // views on the same shared state underneath. Consuming input on one
601        // stream results in consuming that input on all streams.
602        //
603        // AsyncStdinStream is a slightly more complex impl of StdinStream
604        // than the MemoryInputPipe above. We can create an AsyncReadStream
605        // from a file on the disk, and an AsyncStdinStream from that common
606        // stream, then check that the same property holds as above.
607
608        let dir = tempfile::tempdir().unwrap();
609        let mut path = std::path::PathBuf::from(dir.path());
610        path.push("file");
611        std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
612
613        let file = tokio::fs::File::open(&path)
614            .await
615            .expect("open created file");
616        let stdin_stream =
617            super::AsyncStdinStream::new(crate::p2::pipe::AsyncReadStream::new(file));
618
619        use super::StdinStream;
620
621        let mut view1 = stdin_stream.stream();
622        let mut view2 = stdin_stream.stream();
623
624        view1.ready().await;
625
626        let read1 = view1.read(10).expect("read first 10 bytes");
627        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
628        let read2 = view2.read(10).expect("read second 10 bytes");
629        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
630        let read3 = view1.read(10).expect("read third 10 bytes");
631        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
632        let read4 = view2.read(10).expect("read fourth 10 bytes");
633        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
634    }
635
636    #[tokio::test]
637    async fn async_stdout_stream_unblocks() {
638        let (mut read, write) = tokio::io::duplex(32);
639        let stdout = AsyncStdoutStream::new(AsyncWriteStream::new(32, write));
640
641        let task = tokio::task::spawn(async move {
642            let mut stream = stdout.stream();
643            blocking_write_and_flush(&mut *stream, "x".into())
644                .await
645                .unwrap();
646        });
647
648        let mut buf = [0; 100];
649        let n = read.read(&mut buf).await.unwrap();
650        assert_eq!(&buf[..n], b"x");
651
652        task.await.unwrap();
653    }
654
655    async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
656        while !bytes.is_empty() {
657            let permit = s.write_ready().await?;
658            let len = bytes.len().min(permit);
659            let chunk = bytes.split_to(len);
660            s.write(chunk)?;
661        }
662
663        s.flush()?;
664        s.write_ready().await?;
665        Ok(())
666    }
667}