wasmtime_wasi/cli/
file.rs

1use crate::cli::{IsTerminal, StdinStream, StdoutStream};
2use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};
3use bytes::Bytes;
4use std::io::{Read, Write};
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use tokio::io::{self, AsyncRead, AsyncWrite};
9
10/// This implementation will yield output streams that block on writes, and
11/// output directly to a file. If truly async output is required, [`AsyncStdoutStream`]
12/// should be used instead.
13#[derive(Clone)]
14pub struct OutputFile {
15    file: Arc<std::fs::File>,
16}
17
18impl OutputFile {
19    pub fn new(file: std::fs::File) -> Self {
20        Self {
21            file: Arc::new(file),
22        }
23    }
24}
25
26impl IsTerminal for OutputFile {
27    fn is_terminal(&self) -> bool {
28        false
29    }
30}
31
32impl StdoutStream for OutputFile {
33    fn p2_stream(&self) -> Box<dyn OutputStream> {
34        Box::new(self.clone())
35    }
36
37    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
38        Box::new(self.clone())
39    }
40}
41
42#[async_trait::async_trait]
43impl Pollable for OutputFile {
44    async fn ready(&mut self) {}
45}
46
47impl OutputStream for OutputFile {
48    fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
49        (&*self.file)
50            .write_all(&bytes)
51            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
52    }
53
54    fn flush(&mut self) -> StreamResult<()> {
55        use std::io::Write;
56        self.file
57            .flush()
58            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
59    }
60
61    fn check_write(&mut self) -> StreamResult<usize> {
62        Ok(1024 * 1024)
63    }
64}
65
66impl AsyncWrite for OutputFile {
67    fn poll_write(
68        self: Pin<&mut Self>,
69        _cx: &mut Context<'_>,
70        buf: &[u8],
71    ) -> Poll<io::Result<usize>> {
72        match (&*self.file).write_all(buf) {
73            Ok(()) => Poll::Ready(Ok(buf.len())),
74            Err(e) => Poll::Ready(Err(e)),
75        }
76    }
77    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
78        Poll::Ready((&*self.file).flush())
79    }
80    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
81        Poll::Ready(Ok(()))
82    }
83}
84
85/// This implementation will yield input streams that block on reads, and
86/// reads directly from a file. If truly async input is required,
87/// [`AsyncStdinStream`] should be used instead.
88#[derive(Clone)]
89pub struct InputFile {
90    file: Arc<std::fs::File>,
91}
92
93impl InputFile {
94    pub fn new(file: std::fs::File) -> Self {
95        Self {
96            file: Arc::new(file),
97        }
98    }
99}
100
101impl StdinStream for InputFile {
102    fn p2_stream(&self) -> Box<dyn InputStream> {
103        Box::new(self.clone())
104    }
105    fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
106        Box::new(self.clone())
107    }
108}
109
110impl IsTerminal for InputFile {
111    fn is_terminal(&self) -> bool {
112        false
113    }
114}
115
116#[async_trait::async_trait]
117impl Pollable for InputFile {
118    async fn ready(&mut self) {}
119}
120
121impl InputStream for InputFile {
122    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
123        let mut buf = bytes::BytesMut::zeroed(size);
124        let bytes_read = self
125            .file
126            .read(&mut buf)
127            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
128        if bytes_read == 0 {
129            return Err(StreamError::Closed);
130        }
131        buf.truncate(bytes_read);
132        StreamResult::Ok(buf.into())
133    }
134}
135
136impl AsyncRead for InputFile {
137    fn poll_read(
138        self: Pin<&mut Self>,
139        _cx: &mut Context<'_>,
140        buf: &mut io::ReadBuf<'_>,
141    ) -> Poll<io::Result<()>> {
142        match (&*self.file).read(buf.initialize_unfilled()) {
143            Ok(n) => {
144                buf.advance(n);
145                Poll::Ready(Ok(()))
146            }
147            Err(e) => Poll::Ready(Err(e)),
148        }
149    }
150}