wasmtime_wasi/p2/
stdio.rs

1use crate::cli::IsTerminal;
2use crate::p2::bindings::cli::{
3    stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin,
4    terminal_stdout,
5};
6use crate::p2::pipe;
7use crate::p2::{
8    InputStream, IoView, OutputStream, Pollable, StreamError, StreamResult, WasiImpl, WasiView,
9};
10use bytes::Bytes;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use wasmtime::component::Resource;
14use wasmtime_wasi_io::streams;
15
16/// A trait used to represent the standard input to a guest program.
17///
18/// This is used to implement various WASI APIs via the method implementations
19/// below.
20///
21/// Built-in implementations are provided for [`Stdin`],
22/// [`pipe::MemoryInputPipe`], and [`pipe::ClosedInputStream`].
23pub trait StdinStream: IsTerminal + Send {
24    /// Creates a fresh stream which is reading stdin.
25    ///
26    /// Note that the returned stream must share state with all other streams
27    /// previously created. Guests may create multiple handles to the same stdin
28    /// and they should all be synchronized in their progress through the
29    /// program's input.
30    ///
31    /// Note that this means that if one handle becomes ready for reading they
32    /// all become ready for reading. Subsequently if one is read from it may
33    /// mean that all the others are no longer ready for reading. This is
34    /// basically a consequence of the way the WIT APIs are designed today.
35    fn stream(&self) -> Box<dyn InputStream>;
36}
37
38impl StdinStream for pipe::MemoryInputPipe {
39    fn stream(&self) -> Box<dyn InputStream> {
40        Box::new(self.clone())
41    }
42}
43
44impl IsTerminal for pipe::MemoryInputPipe {
45    fn is_terminal(&self) -> bool {
46        false
47    }
48}
49
50impl StdinStream for pipe::ClosedInputStream {
51    fn stream(&self) -> Box<dyn InputStream> {
52        Box::new(*self)
53    }
54}
55
56impl IsTerminal for pipe::ClosedInputStream {
57    fn is_terminal(&self) -> bool {
58        false
59    }
60}
61
62/// This implementation will yield input streams that block on reads, and
63/// reads directly from a file. If truly async input is required,
64/// [`AsyncStdinStream`] should be used instead.
65pub struct InputFile {
66    file: Arc<std::fs::File>,
67}
68
69impl InputFile {
70    pub fn new(file: std::fs::File) -> Self {
71        Self {
72            file: Arc::new(file),
73        }
74    }
75}
76
77impl StdinStream for InputFile {
78    fn stream(&self) -> Box<dyn InputStream> {
79        Box::new(InputFileStream {
80            file: Arc::clone(&self.file),
81        })
82    }
83}
84
85impl IsTerminal for InputFile {
86    fn is_terminal(&self) -> bool {
87        false
88    }
89}
90
91struct InputFileStream {
92    file: Arc<std::fs::File>,
93}
94
95#[async_trait::async_trait]
96impl Pollable for InputFileStream {
97    async fn ready(&mut self) {}
98}
99
100impl InputStream for InputFileStream {
101    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
102        use std::io::Read;
103
104        let mut buf = bytes::BytesMut::zeroed(size);
105        let bytes_read = self
106            .file
107            .read(&mut buf)
108            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
109        if bytes_read == 0 {
110            return Err(StreamError::Closed);
111        }
112        buf.truncate(bytes_read);
113        StreamResult::Ok(buf.into())
114    }
115}
116
117/// An impl of [`StdinStream`] built on top of [`crate::p2::pipe::AsyncReadStream`].
118//
119// Note the usage of `tokio::sync::Mutex` here as opposed to a
120// `std::sync::Mutex`. This is intentionally done to implement the `Pollable`
121// variant of this trait. Note that in doing so we're left with the quandry of
122// how to implement methods of `InputStream` since those methods are not
123// `async`. They're currently implemented with `try_lock`, which then raises the
124// question of what to do on contention. Currently traps are returned.
125//
126// Why should it be ok to return a trap? In general concurrency/contention
127// shouldn't return a trap since it should be able to happen normally. The
128// current assumption, though, is that WASI stdin/stdout streams are special
129// enough that the contention case should never come up in practice. Currently
130// in WASI there is no actually concurrency, there's just the items in a single
131// `Store` and that store owns all of its I/O in a single Tokio task. There's no
132// means to actually spawn multiple Tokio tasks that use the same store. This
133// means at the very least that there's zero parallelism. Due to the lack of
134// multiple tasks that also means that there's no concurrency either.
135//
136// This `AsyncStdinStream` wrapper is only intended to be used by the WASI
137// bindings themselves. It's possible for the host to take this and work with it
138// on its own task, but that's niche enough it's not designed for.
139//
140// Overall that means that the guest is either calling `Pollable` or
141// `InputStream` methods. This means that there should never be contention
142// between the two at this time. This may all change in the future with WASI
143// 0.3, but perhaps we'll have a better story for stdio at that time (see the
144// doc block on the `OutputStream` impl below)
145pub struct AsyncStdinStream(Arc<Mutex<crate::p2::pipe::AsyncReadStream>>);
146
147impl AsyncStdinStream {
148    pub fn new(s: crate::p2::pipe::AsyncReadStream) -> Self {
149        Self(Arc::new(Mutex::new(s)))
150    }
151}
152
153impl StdinStream for AsyncStdinStream {
154    fn stream(&self) -> Box<dyn InputStream> {
155        Box::new(Self(self.0.clone()))
156    }
157}
158
159impl IsTerminal for AsyncStdinStream {
160    fn is_terminal(&self) -> bool {
161        false
162    }
163}
164
165#[async_trait::async_trait]
166impl InputStream for AsyncStdinStream {
167    fn read(&mut self, size: usize) -> Result<bytes::Bytes, StreamError> {
168        match self.0.try_lock() {
169            Ok(mut stream) => stream.read(size),
170            Err(_) => Err(StreamError::trap("concurrent reads are not supported")),
171        }
172    }
173    fn skip(&mut self, size: usize) -> Result<usize, StreamError> {
174        match self.0.try_lock() {
175            Ok(mut stream) => stream.skip(size),
176            Err(_) => Err(StreamError::trap("concurrent skips are not supported")),
177        }
178    }
179    async fn cancel(&mut self) {
180        // Cancel the inner stream if we're the last reference to it:
181        if let Some(mutex) = Arc::get_mut(&mut self.0) {
182            match mutex.try_lock() {
183                Ok(mut stream) => stream.cancel().await,
184                Err(_) => {}
185            }
186        }
187    }
188}
189
190#[async_trait::async_trait]
191impl Pollable for AsyncStdinStream {
192    async fn ready(&mut self) {
193        self.0.lock().await.ready().await
194    }
195}
196
197mod worker_thread_stdin;
198pub use self::worker_thread_stdin::{Stdin, stdin};
199
200/// Similar to [`StdinStream`], except for output.
201pub trait StdoutStream: IsTerminal + Send {
202    /// Returns a fresh new stream which can write to this output stream.
203    ///
204    /// Note that all output streams should output to the same logical source.
205    /// This means that it's possible for each independent stream to acquire a
206    /// separate "permit" to write and then act on that permit. Note that
207    /// additionally at this time once a permit is "acquired" there's no way to
208    /// release it, for example you can wait for readiness and then never
209    /// actually write in WASI. This means that acquisition of a permit for one
210    /// stream cannot discount the size of a permit another stream could
211    /// obtain.
212    ///
213    /// Implementations must be able to handle this
214    fn stream(&self) -> Box<dyn OutputStream>;
215}
216
217impl StdoutStream for pipe::MemoryOutputPipe {
218    fn stream(&self) -> Box<dyn OutputStream> {
219        Box::new(self.clone())
220    }
221}
222
223impl IsTerminal for pipe::MemoryOutputPipe {
224    fn is_terminal(&self) -> bool {
225        false
226    }
227}
228
229impl StdoutStream for pipe::SinkOutputStream {
230    fn stream(&self) -> Box<dyn OutputStream> {
231        Box::new(*self)
232    }
233}
234
235impl IsTerminal for pipe::SinkOutputStream {
236    fn is_terminal(&self) -> bool {
237        false
238    }
239}
240
241impl StdoutStream for pipe::ClosedOutputStream {
242    fn stream(&self) -> Box<dyn OutputStream> {
243        Box::new(*self)
244    }
245}
246
247impl IsTerminal for pipe::ClosedOutputStream {
248    fn is_terminal(&self) -> bool {
249        false
250    }
251}
252
253/// This implementation will yield output streams that block on writes, and
254/// output directly to a file. If truly async output is required, [`AsyncStdoutStream`]
255/// should be used instead.
256pub struct OutputFile {
257    file: Arc<std::fs::File>,
258}
259
260impl OutputFile {
261    pub fn new(file: std::fs::File) -> Self {
262        Self {
263            file: Arc::new(file),
264        }
265    }
266}
267
268impl StdoutStream for OutputFile {
269    fn stream(&self) -> Box<dyn OutputStream> {
270        Box::new(OutputFileStream {
271            file: Arc::clone(&self.file),
272        })
273    }
274}
275
276impl IsTerminal for OutputFile {
277    fn is_terminal(&self) -> bool {
278        false
279    }
280}
281
282struct OutputFileStream {
283    file: Arc<std::fs::File>,
284}
285
286#[async_trait::async_trait]
287impl Pollable for OutputFileStream {
288    async fn ready(&mut self) {}
289}
290
291impl OutputStream for OutputFileStream {
292    fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
293        use std::io::Write;
294        self.file
295            .write_all(&bytes)
296            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
297    }
298
299    fn flush(&mut self) -> StreamResult<()> {
300        use std::io::Write;
301        self.file
302            .flush()
303            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
304    }
305
306    fn check_write(&mut self) -> StreamResult<usize> {
307        Ok(1024 * 1024)
308    }
309}
310
311/// This implementation will yield output streams that block on writes, as they
312/// inherit the implementation directly from the rust std library. A different
313/// implementation of [`StdoutStream`] will be necessary if truly async output
314/// streams are required.
315pub struct Stdout;
316
317/// Returns a stream that represents the host's standard out.
318///
319/// Suitable for passing to
320/// [`WasiCtxBuilder::stdout`](crate::p2::WasiCtxBuilder::stdout).
321pub fn stdout() -> Stdout {
322    Stdout
323}
324
325impl StdoutStream for Stdout {
326    fn stream(&self) -> Box<dyn OutputStream> {
327        Box::new(StdioOutputStream::Stdout)
328    }
329}
330
331impl IsTerminal for Stdout {
332    fn is_terminal(&self) -> bool {
333        std::io::stdout().is_terminal()
334    }
335}
336
337/// This implementation will yield output streams that block on writes, as they
338/// inherit the implementation directly from the rust std library. A different
339/// implementation of [`StdoutStream`] will be necessary if truly async output
340/// streams are required.
341pub struct Stderr;
342
343/// Returns a stream that represents the host's standard err.
344///
345/// Suitable for passing to
346/// [`WasiCtxBuilder::stderr`](crate::p2::WasiCtxBuilder::stderr).
347pub fn stderr() -> Stderr {
348    Stderr
349}
350
351impl StdoutStream for Stderr {
352    fn stream(&self) -> Box<dyn OutputStream> {
353        Box::new(StdioOutputStream::Stderr)
354    }
355}
356
357impl IsTerminal for Stderr {
358    fn is_terminal(&self) -> bool {
359        std::io::stderr().is_terminal()
360    }
361}
362
363enum StdioOutputStream {
364    Stdout,
365    Stderr,
366}
367
368impl OutputStream for StdioOutputStream {
369    fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
370        use std::io::Write;
371        match self {
372            StdioOutputStream::Stdout => std::io::stdout().write_all(&bytes),
373            StdioOutputStream::Stderr => std::io::stderr().write_all(&bytes),
374        }
375        .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
376    }
377
378    fn flush(&mut self) -> StreamResult<()> {
379        use std::io::Write;
380        match self {
381            StdioOutputStream::Stdout => std::io::stdout().flush(),
382            StdioOutputStream::Stderr => std::io::stderr().flush(),
383        }
384        .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
385    }
386
387    fn check_write(&mut self) -> StreamResult<usize> {
388        Ok(1024 * 1024)
389    }
390}
391
392#[async_trait::async_trait]
393impl Pollable for StdioOutputStream {
394    async fn ready(&mut self) {}
395}
396
397/// A wrapper of [`crate::p2::pipe::AsyncWriteStream`] that implements
398/// [`StdoutStream`]. Note that the [`OutputStream`] impl for this is not
399/// correct when used for interleaved async IO.
400//
401// Note that the use of `tokio::sync::Mutex` here is intentional, in addition to
402// the `try_lock()` calls below in the implementation of `OutputStream`. For
403// more information see the documentation on `AsyncStdinStream`.
404pub struct AsyncStdoutStream(Arc<Mutex<crate::p2::pipe::AsyncWriteStream>>);
405
406impl AsyncStdoutStream {
407    pub fn new(s: crate::p2::pipe::AsyncWriteStream) -> Self {
408        Self(Arc::new(Mutex::new(s)))
409    }
410}
411
412impl StdoutStream for AsyncStdoutStream {
413    fn stream(&self) -> Box<dyn OutputStream> {
414        Box::new(Self(self.0.clone()))
415    }
416}
417
418impl IsTerminal for AsyncStdoutStream {
419    fn is_terminal(&self) -> bool {
420        false
421    }
422}
423
424// This implementation is known to be bogus. All check-writes and writes are
425// directed at the same underlying stream. The check-write/write protocol does
426// require the size returned by a check-write to be accepted by write, even if
427// other side-effects happen between those calls, and this implementation
428// permits another view (created by StdoutStream::stream()) of the same
429// underlying stream to accept a write which will invalidate a prior
430// check-write of another view.
431// Ultimately, the Std{in,out}Stream::stream() methods exist because many
432// different places in a linked component (which may itself contain many
433// modules) may need to access stdio without any coordination to keep those
434// accesses all using pointing to the same resource. So, we allow many
435// resources to be created. We have the reasonable expectation that programs
436// won't attempt to interleave async IO from these disparate uses of stdio.
437// If that expectation doesn't turn out to be true, and you find yourself at
438// this comment to correct it: sorry about that.
439#[async_trait::async_trait]
440impl OutputStream for AsyncStdoutStream {
441    fn check_write(&mut self) -> Result<usize, StreamError> {
442        match self.0.try_lock() {
443            Ok(mut stream) => stream.check_write(),
444            Err(_) => Err(StreamError::trap("concurrent writes are not supported")),
445        }
446    }
447    fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
448        match self.0.try_lock() {
449            Ok(mut stream) => stream.write(bytes),
450            Err(_) => Err(StreamError::trap("concurrent writes not supported yet")),
451        }
452    }
453    fn flush(&mut self) -> Result<(), StreamError> {
454        match self.0.try_lock() {
455            Ok(mut stream) => stream.flush(),
456            Err(_) => Err(StreamError::trap("concurrent flushes not supported yet")),
457        }
458    }
459    async fn cancel(&mut self) {
460        // Cancel the inner stream if we're the last reference to it:
461        if let Some(mutex) = Arc::get_mut(&mut self.0) {
462            match mutex.try_lock() {
463                Ok(mut stream) => stream.cancel().await,
464                Err(_) => {}
465            }
466        }
467    }
468}
469
470#[async_trait::async_trait]
471impl Pollable for AsyncStdoutStream {
472    async fn ready(&mut self) {
473        self.0.lock().await.ready().await
474    }
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
478pub enum IsATTY {
479    Yes,
480    No,
481}
482
483impl<T> stdin::Host for WasiImpl<T>
484where
485    T: WasiView,
486{
487    fn get_stdin(&mut self) -> Result<Resource<streams::DynInputStream>, anyhow::Error> {
488        let stream = self.ctx().stdin.stream();
489        Ok(self.table().push(stream)?)
490    }
491}
492
493impl<T> stdout::Host for WasiImpl<T>
494where
495    T: WasiView,
496{
497    fn get_stdout(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
498        let stream = self.ctx().stdout.stream();
499        Ok(self.table().push(stream)?)
500    }
501}
502
503impl<T> stderr::Host for WasiImpl<T>
504where
505    T: WasiView,
506{
507    fn get_stderr(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
508        let stream = self.ctx().stderr.stream();
509        Ok(self.table().push(stream)?)
510    }
511}
512
513pub struct TerminalInput;
514pub struct TerminalOutput;
515
516impl<T> terminal_input::Host for WasiImpl<T> where T: WasiView {}
517impl<T> terminal_input::HostTerminalInput for WasiImpl<T>
518where
519    T: WasiView,
520{
521    fn drop(&mut self, r: Resource<TerminalInput>) -> anyhow::Result<()> {
522        self.table().delete(r)?;
523        Ok(())
524    }
525}
526impl<T> terminal_output::Host for WasiImpl<T> where T: WasiView {}
527impl<T> terminal_output::HostTerminalOutput for WasiImpl<T>
528where
529    T: WasiView,
530{
531    fn drop(&mut self, r: Resource<TerminalOutput>) -> anyhow::Result<()> {
532        self.table().delete(r)?;
533        Ok(())
534    }
535}
536impl<T> terminal_stdin::Host for WasiImpl<T>
537where
538    T: WasiView,
539{
540    fn get_terminal_stdin(&mut self) -> anyhow::Result<Option<Resource<TerminalInput>>> {
541        if self.ctx().stdin.is_terminal() {
542            let fd = self.table().push(TerminalInput)?;
543            Ok(Some(fd))
544        } else {
545            Ok(None)
546        }
547    }
548}
549impl<T> terminal_stdout::Host for WasiImpl<T>
550where
551    T: WasiView,
552{
553    fn get_terminal_stdout(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
554        if self.ctx().stdout.is_terminal() {
555            let fd = self.table().push(TerminalOutput)?;
556            Ok(Some(fd))
557        } else {
558            Ok(None)
559        }
560    }
561}
562impl<T> terminal_stderr::Host for WasiImpl<T>
563where
564    T: WasiView,
565{
566    fn get_terminal_stderr(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
567        if self.ctx().stderr.is_terminal() {
568            let fd = self.table().push(TerminalOutput)?;
569            Ok(Some(fd))
570        } else {
571            Ok(None)
572        }
573    }
574}
575
576#[cfg(test)]
577mod test {
578    use crate::p2::stdio::StdoutStream;
579    use crate::p2::write_stream::AsyncWriteStream;
580    use crate::p2::{AsyncStdoutStream, OutputStream};
581    use anyhow::Result;
582    use bytes::Bytes;
583    use tokio::io::AsyncReadExt;
584
585    #[test]
586    fn memory_stdin_stream() {
587        // A StdinStream has the property that there are multiple
588        // InputStreams created, using the stream() method which are each
589        // views on the same shared state underneath. Consuming input on one
590        // stream results in consuming that input on all streams.
591        //
592        // The simplest way to measure this is to check if the MemoryInputPipe
593        // impl of StdinStream follows this property.
594
595        let pipe = super::pipe::MemoryInputPipe::new(
596            "the quick brown fox jumped over the three lazy dogs",
597        );
598
599        use super::StdinStream;
600
601        let mut view1 = pipe.stream();
602        let mut view2 = pipe.stream();
603
604        let read1 = view1.read(10).expect("read first 10 bytes");
605        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
606        let read2 = view2.read(10).expect("read second 10 bytes");
607        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
608        let read3 = view1.read(10).expect("read third 10 bytes");
609        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
610        let read4 = view2.read(10).expect("read fourth 10 bytes");
611        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
612    }
613
614    #[tokio::test]
615    async fn async_stdin_stream() {
616        // A StdinStream has the property that there are multiple
617        // InputStreams created, using the stream() method which are each
618        // views on the same shared state underneath. Consuming input on one
619        // stream results in consuming that input on all streams.
620        //
621        // AsyncStdinStream is a slightly more complex impl of StdinStream
622        // than the MemoryInputPipe above. We can create an AsyncReadStream
623        // from a file on the disk, and an AsyncStdinStream from that common
624        // stream, then check that the same property holds as above.
625
626        let dir = tempfile::tempdir().unwrap();
627        let mut path = std::path::PathBuf::from(dir.path());
628        path.push("file");
629        std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
630
631        let file = tokio::fs::File::open(&path)
632            .await
633            .expect("open created file");
634        let stdin_stream =
635            super::AsyncStdinStream::new(crate::p2::pipe::AsyncReadStream::new(file));
636
637        use super::StdinStream;
638
639        let mut view1 = stdin_stream.stream();
640        let mut view2 = stdin_stream.stream();
641
642        view1.ready().await;
643
644        let read1 = view1.read(10).expect("read first 10 bytes");
645        assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
646        let read2 = view2.read(10).expect("read second 10 bytes");
647        assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
648        let read3 = view1.read(10).expect("read third 10 bytes");
649        assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
650        let read4 = view2.read(10).expect("read fourth 10 bytes");
651        assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
652    }
653
654    #[tokio::test]
655    async fn async_stdout_stream_unblocks() {
656        let (mut read, write) = tokio::io::duplex(32);
657        let stdout = AsyncStdoutStream::new(AsyncWriteStream::new(32, write));
658
659        let task = tokio::task::spawn(async move {
660            let mut stream = stdout.stream();
661            blocking_write_and_flush(&mut *stream, "x".into())
662                .await
663                .unwrap();
664        });
665
666        let mut buf = [0; 100];
667        let n = read.read(&mut buf).await.unwrap();
668        assert_eq!(&buf[..n], b"x");
669
670        task.await.unwrap();
671    }
672
673    async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
674        while !bytes.is_empty() {
675            let permit = s.write_ready().await?;
676            let len = bytes.len().min(permit);
677            let chunk = bytes.split_to(len);
678            s.write(chunk)?;
679        }
680
681        s.flush()?;
682        s.write_ready().await?;
683        Ok(())
684    }
685}