1use crate::p2;
2use std::pin::Pin;
3use std::sync::Arc;
4use tokio::io::{AsyncRead, AsyncWrite, empty};
5use wasmtime::component::{HasData, ResourceTable};
6use wasmtime_wasi_io::streams::{InputStream, OutputStream};
7
8mod empty;
9mod file;
10mod locked_async;
11mod mem;
12mod stdout;
13mod worker_thread_stdin;
14
15pub use self::file::{InputFile, OutputFile};
16pub use self::locked_async::{AsyncStdinStream, AsyncStdoutStream};
17
18#[doc(no_inline)]
21pub use tokio::io::{Stderr, Stdin, Stdout, stderr, stdin, stdout};
22
23pub struct WasiCli;
62
63impl HasData for WasiCli {
64 type Data<'a> = WasiCliCtxView<'a>;
65}
66
67pub trait WasiCliView: Send {
70 fn cli(&mut self) -> WasiCliCtxView<'_>;
71}
72
73pub struct WasiCliCtxView<'a> {
74 pub ctx: &'a mut WasiCliCtx,
75 pub table: &'a mut ResourceTable,
76}
77
78pub struct WasiCliCtx {
79 pub(crate) environment: Vec<(String, String)>,
80 pub(crate) arguments: Vec<String>,
81 pub(crate) initial_cwd: Option<String>,
82 pub(crate) stdin: Box<dyn StdinStream>,
83 pub(crate) stdout: Box<dyn StdoutStream>,
84 pub(crate) stderr: Box<dyn StdoutStream>,
85}
86
87impl Default for WasiCliCtx {
88 fn default() -> WasiCliCtx {
89 WasiCliCtx {
90 environment: Vec::new(),
91 arguments: Vec::new(),
92 initial_cwd: None,
93 stdin: Box::new(empty()),
94 stdout: Box::new(empty()),
95 stderr: Box::new(empty()),
96 }
97 }
98}
99
100pub trait IsTerminal {
101 fn is_terminal(&self) -> bool;
103}
104
105pub trait StdinStream: IsTerminal + Send {
111 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>;
123
124 fn p2_stream(&self) -> Box<dyn InputStream> {
131 Box::new(p2::pipe::AsyncReadStream::new(Pin::from(
132 self.async_stream(),
133 )))
134 }
135}
136
137pub trait StdoutStream: IsTerminal + Send {
145 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>;
158
159 fn p2_stream(&self) -> Box<dyn OutputStream> {
166 Box::new(p2::pipe::AsyncWriteStream::new(
167 8192, Pin::from(self.async_stream()),
169 ))
170 }
171}
172
173impl<T: ?Sized + IsTerminal> IsTerminal for &T {
175 fn is_terminal(&self) -> bool {
176 T::is_terminal(self)
177 }
178}
179impl<T: ?Sized + StdinStream + Sync> StdinStream for &T {
180 fn p2_stream(&self) -> Box<dyn InputStream> {
181 T::p2_stream(self)
182 }
183 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
184 T::async_stream(self)
185 }
186}
187impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &T {
188 fn p2_stream(&self) -> Box<dyn OutputStream> {
189 T::p2_stream(self)
190 }
191 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
192 T::async_stream(self)
193 }
194}
195
196impl<T: ?Sized + IsTerminal> IsTerminal for &mut T {
198 fn is_terminal(&self) -> bool {
199 T::is_terminal(self)
200 }
201}
202impl<T: ?Sized + StdinStream + Sync> StdinStream for &mut T {
203 fn p2_stream(&self) -> Box<dyn InputStream> {
204 T::p2_stream(self)
205 }
206 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
207 T::async_stream(self)
208 }
209}
210impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &mut T {
211 fn p2_stream(&self) -> Box<dyn OutputStream> {
212 T::p2_stream(self)
213 }
214 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
215 T::async_stream(self)
216 }
217}
218
219impl<T: ?Sized + IsTerminal> IsTerminal for Box<T> {
221 fn is_terminal(&self) -> bool {
222 T::is_terminal(self)
223 }
224}
225impl<T: ?Sized + StdinStream + Sync> StdinStream for Box<T> {
226 fn p2_stream(&self) -> Box<dyn InputStream> {
227 T::p2_stream(self)
228 }
229 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
230 T::async_stream(self)
231 }
232}
233impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Box<T> {
234 fn p2_stream(&self) -> Box<dyn OutputStream> {
235 T::p2_stream(self)
236 }
237 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
238 T::async_stream(self)
239 }
240}
241
242impl<T: ?Sized + IsTerminal> IsTerminal for Arc<T> {
244 fn is_terminal(&self) -> bool {
245 T::is_terminal(self)
246 }
247}
248impl<T: ?Sized + StdinStream + Sync> StdinStream for Arc<T> {
249 fn p2_stream(&self) -> Box<dyn InputStream> {
250 T::p2_stream(self)
251 }
252 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
253 T::async_stream(self)
254 }
255}
256impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Arc<T> {
257 fn p2_stream(&self) -> Box<dyn OutputStream> {
258 T::p2_stream(self)
259 }
260 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
261 T::async_stream(self)
262 }
263}
264
265#[cfg(test)]
266mod test {
267 use crate::cli::{AsyncStdoutStream, StdinStream, StdoutStream};
268 use crate::p2::{self, OutputStream};
269 use anyhow::Result;
270 use bytes::Bytes;
271 use tokio::io::AsyncReadExt;
272
273 #[test]
274 fn memory_stdin_stream() {
275 let pipe =
284 p2::pipe::MemoryInputPipe::new("the quick brown fox jumped over the three lazy dogs");
285
286 let mut view1 = pipe.p2_stream();
287 let mut view2 = pipe.p2_stream();
288
289 let read1 = view1.read(10).expect("read first 10 bytes");
290 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
291 let read2 = view2.read(10).expect("read second 10 bytes");
292 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
293 let read3 = view1.read(10).expect("read third 10 bytes");
294 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
295 let read4 = view2.read(10).expect("read fourth 10 bytes");
296 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
297 }
298
299 #[tokio::test]
300 async fn async_stdin_stream() {
301 let dir = tempfile::tempdir().unwrap();
312 let mut path = std::path::PathBuf::from(dir.path());
313 path.push("file");
314 std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
315
316 let file = tokio::fs::File::open(&path)
317 .await
318 .expect("open created file");
319 let stdin_stream = super::AsyncStdinStream::new(file);
320
321 use super::StdinStream;
322
323 let mut view1 = stdin_stream.p2_stream();
324 let mut view2 = stdin_stream.p2_stream();
325
326 view1.ready().await;
327
328 let read1 = view1.read(10).expect("read first 10 bytes");
329 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
330 let read2 = view2.read(10).expect("read second 10 bytes");
331 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
332 let read3 = view1.read(10).expect("read third 10 bytes");
333 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
334 let read4 = view2.read(10).expect("read fourth 10 bytes");
335 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
336 }
337
338 #[tokio::test]
339 async fn async_stdout_stream_unblocks() {
340 let (mut read, write) = tokio::io::duplex(32);
341 let stdout = AsyncStdoutStream::new(32, write);
342
343 let task = tokio::task::spawn(async move {
344 let mut stream = stdout.p2_stream();
345 blocking_write_and_flush(&mut *stream, "x".into())
346 .await
347 .unwrap();
348 });
349
350 let mut buf = [0; 100];
351 let n = read.read(&mut buf).await.unwrap();
352 assert_eq!(&buf[..n], b"x");
353
354 task.await.unwrap();
355 }
356
357 async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
358 while !bytes.is_empty() {
359 let permit = s.write_ready().await?;
360 let len = bytes.len().min(permit);
361 let chunk = bytes.split_to(len);
362 s.write(chunk)?;
363 }
364
365 s.flush()?;
366 s.write_ready().await?;
367 Ok(())
368 }
369}