1use crate::p2;
2use std::pin::Pin;
3use std::sync::Arc;
4use tokio::io::{AsyncRead, AsyncWrite, empty};
5use wasmtime::component::{HasData, ResourceTable};
6use wasmtime_wasi_io::streams::{InputStream, OutputStream};
7
8mod empty;
9mod file;
10mod locked_async;
11mod mem;
12mod stdout;
13mod worker_thread_stdin;
14
15pub use self::file::{InputFile, OutputFile};
16pub use self::locked_async::{AsyncStdinStream, AsyncStdoutStream};
17
18#[doc(no_inline)]
21pub use tokio::io::{Stderr, Stdin, Stdout, stderr, stdin, stdout};
22
23pub(crate) struct WasiCli;
24
25impl HasData for WasiCli {
26 type Data<'a> = WasiCliCtxView<'a>;
27}
28
29pub trait WasiCliView: Send {
32 fn cli(&mut self) -> WasiCliCtxView<'_>;
33}
34
35pub struct WasiCliCtxView<'a> {
36 pub ctx: &'a mut WasiCliCtx,
37 pub table: &'a mut ResourceTable,
38}
39
40pub struct WasiCliCtx {
41 pub(crate) environment: Vec<(String, String)>,
42 pub(crate) arguments: Vec<String>,
43 pub(crate) initial_cwd: Option<String>,
44 pub(crate) stdin: Box<dyn StdinStream>,
45 pub(crate) stdout: Box<dyn StdoutStream>,
46 pub(crate) stderr: Box<dyn StdoutStream>,
47}
48
49impl Default for WasiCliCtx {
50 fn default() -> WasiCliCtx {
51 WasiCliCtx {
52 environment: Vec::new(),
53 arguments: Vec::new(),
54 initial_cwd: None,
55 stdin: Box::new(empty()),
56 stdout: Box::new(empty()),
57 stderr: Box::new(empty()),
58 }
59 }
60}
61
62pub trait IsTerminal {
63 fn is_terminal(&self) -> bool;
65}
66
67pub trait StdinStream: IsTerminal + Send {
73 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>;
85
86 fn p2_stream(&self) -> Box<dyn InputStream> {
93 Box::new(p2::pipe::AsyncReadStream::new(Pin::from(
94 self.async_stream(),
95 )))
96 }
97}
98
99pub trait StdoutStream: IsTerminal + Send {
107 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>;
120
121 fn p2_stream(&self) -> Box<dyn OutputStream> {
128 Box::new(p2::pipe::AsyncWriteStream::new(
129 8192, Pin::from(self.async_stream()),
131 ))
132 }
133}
134
135impl<T: ?Sized + IsTerminal> IsTerminal for &T {
137 fn is_terminal(&self) -> bool {
138 T::is_terminal(self)
139 }
140}
141impl<T: ?Sized + StdinStream + Sync> StdinStream for &T {
142 fn p2_stream(&self) -> Box<dyn InputStream> {
143 T::p2_stream(self)
144 }
145 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
146 T::async_stream(self)
147 }
148}
149impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &T {
150 fn p2_stream(&self) -> Box<dyn OutputStream> {
151 T::p2_stream(self)
152 }
153 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
154 T::async_stream(self)
155 }
156}
157
158impl<T: ?Sized + IsTerminal> IsTerminal for &mut T {
160 fn is_terminal(&self) -> bool {
161 T::is_terminal(self)
162 }
163}
164impl<T: ?Sized + StdinStream + Sync> StdinStream for &mut T {
165 fn p2_stream(&self) -> Box<dyn InputStream> {
166 T::p2_stream(self)
167 }
168 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
169 T::async_stream(self)
170 }
171}
172impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &mut T {
173 fn p2_stream(&self) -> Box<dyn OutputStream> {
174 T::p2_stream(self)
175 }
176 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
177 T::async_stream(self)
178 }
179}
180
181impl<T: ?Sized + IsTerminal> IsTerminal for Box<T> {
183 fn is_terminal(&self) -> bool {
184 T::is_terminal(self)
185 }
186}
187impl<T: ?Sized + StdinStream + Sync> StdinStream for Box<T> {
188 fn p2_stream(&self) -> Box<dyn InputStream> {
189 T::p2_stream(self)
190 }
191 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
192 T::async_stream(self)
193 }
194}
195impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Box<T> {
196 fn p2_stream(&self) -> Box<dyn OutputStream> {
197 T::p2_stream(self)
198 }
199 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
200 T::async_stream(self)
201 }
202}
203
204impl<T: ?Sized + IsTerminal> IsTerminal for Arc<T> {
206 fn is_terminal(&self) -> bool {
207 T::is_terminal(self)
208 }
209}
210impl<T: ?Sized + StdinStream + Sync> StdinStream for Arc<T> {
211 fn p2_stream(&self) -> Box<dyn InputStream> {
212 T::p2_stream(self)
213 }
214 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
215 T::async_stream(self)
216 }
217}
218impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Arc<T> {
219 fn p2_stream(&self) -> Box<dyn OutputStream> {
220 T::p2_stream(self)
221 }
222 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
223 T::async_stream(self)
224 }
225}
226
227#[cfg(test)]
228mod test {
229 use crate::cli::{AsyncStdoutStream, StdinStream, StdoutStream};
230 use crate::p2::{self, OutputStream};
231 use anyhow::Result;
232 use bytes::Bytes;
233 use tokio::io::AsyncReadExt;
234
235 #[test]
236 fn memory_stdin_stream() {
237 let pipe =
246 p2::pipe::MemoryInputPipe::new("the quick brown fox jumped over the three lazy dogs");
247
248 let mut view1 = pipe.p2_stream();
249 let mut view2 = pipe.p2_stream();
250
251 let read1 = view1.read(10).expect("read first 10 bytes");
252 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
253 let read2 = view2.read(10).expect("read second 10 bytes");
254 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
255 let read3 = view1.read(10).expect("read third 10 bytes");
256 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
257 let read4 = view2.read(10).expect("read fourth 10 bytes");
258 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
259 }
260
261 #[tokio::test]
262 async fn async_stdin_stream() {
263 let dir = tempfile::tempdir().unwrap();
274 let mut path = std::path::PathBuf::from(dir.path());
275 path.push("file");
276 std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
277
278 let file = tokio::fs::File::open(&path)
279 .await
280 .expect("open created file");
281 let stdin_stream = super::AsyncStdinStream::new(file);
282
283 use super::StdinStream;
284
285 let mut view1 = stdin_stream.p2_stream();
286 let mut view2 = stdin_stream.p2_stream();
287
288 view1.ready().await;
289
290 let read1 = view1.read(10).expect("read first 10 bytes");
291 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
292 let read2 = view2.read(10).expect("read second 10 bytes");
293 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
294 let read3 = view1.read(10).expect("read third 10 bytes");
295 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
296 let read4 = view2.read(10).expect("read fourth 10 bytes");
297 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
298 }
299
300 #[tokio::test]
301 async fn async_stdout_stream_unblocks() {
302 let (mut read, write) = tokio::io::duplex(32);
303 let stdout = AsyncStdoutStream::new(32, write);
304
305 let task = tokio::task::spawn(async move {
306 let mut stream = stdout.p2_stream();
307 blocking_write_and_flush(&mut *stream, "x".into())
308 .await
309 .unwrap();
310 });
311
312 let mut buf = [0; 100];
313 let n = read.read(&mut buf).await.unwrap();
314 assert_eq!(&buf[..n], b"x");
315
316 task.await.unwrap();
317 }
318
319 async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
320 while !bytes.is_empty() {
321 let permit = s.write_ready().await?;
322 let len = bytes.len().min(permit);
323 let chunk = bytes.split_to(len);
324 s.write(chunk)?;
325 }
326
327 s.flush()?;
328 s.write_ready().await?;
329 Ok(())
330 }
331}