1use crate::p2::bindings::cli::{
2 stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin,
3 terminal_stdout,
4};
5use crate::p2::pipe;
6use crate::p2::{
7 InputStream, IoView, OutputStream, Pollable, StreamError, StreamResult, WasiImpl, WasiView,
8};
9use bytes::Bytes;
10use std::io::IsTerminal;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use wasmtime::component::Resource;
14use wasmtime_wasi_io::streams;
15
16pub trait StdinStream: Send {
24 fn stream(&self) -> Box<dyn InputStream>;
36
37 fn isatty(&self) -> bool;
39}
40
41impl StdinStream for pipe::MemoryInputPipe {
42 fn stream(&self) -> Box<dyn InputStream> {
43 Box::new(self.clone())
44 }
45
46 fn isatty(&self) -> bool {
47 false
48 }
49}
50
51impl StdinStream for pipe::ClosedInputStream {
52 fn stream(&self) -> Box<dyn InputStream> {
53 Box::new(*self)
54 }
55
56 fn isatty(&self) -> bool {
57 false
58 }
59}
60
61pub struct InputFile {
65 file: Arc<std::fs::File>,
66}
67
68impl InputFile {
69 pub fn new(file: std::fs::File) -> Self {
70 Self {
71 file: Arc::new(file),
72 }
73 }
74}
75
76impl StdinStream for InputFile {
77 fn stream(&self) -> Box<dyn InputStream> {
78 Box::new(InputFileStream {
79 file: Arc::clone(&self.file),
80 })
81 }
82
83 fn isatty(&self) -> bool {
84 false
85 }
86}
87
88struct InputFileStream {
89 file: Arc<std::fs::File>,
90}
91
92#[async_trait::async_trait]
93impl Pollable for InputFileStream {
94 async fn ready(&mut self) {}
95}
96
97impl InputStream for InputFileStream {
98 fn read(&mut self, size: usize) -> StreamResult<Bytes> {
99 use std::io::Read;
100
101 let mut buf = bytes::BytesMut::zeroed(size);
102 let bytes_read = self
103 .file
104 .read(&mut buf)
105 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
106 if bytes_read == 0 {
107 return Err(StreamError::Closed);
108 }
109 buf.truncate(bytes_read);
110 StreamResult::Ok(buf.into())
111 }
112}
113
114pub struct AsyncStdinStream(Arc<Mutex<crate::p2::pipe::AsyncReadStream>>);
143
144impl AsyncStdinStream {
145 pub fn new(s: crate::p2::pipe::AsyncReadStream) -> Self {
146 Self(Arc::new(Mutex::new(s)))
147 }
148}
149
150impl StdinStream for AsyncStdinStream {
151 fn stream(&self) -> Box<dyn InputStream> {
152 Box::new(Self(self.0.clone()))
153 }
154 fn isatty(&self) -> bool {
155 false
156 }
157}
158
159#[async_trait::async_trait]
160impl InputStream for AsyncStdinStream {
161 fn read(&mut self, size: usize) -> Result<bytes::Bytes, StreamError> {
162 match self.0.try_lock() {
163 Ok(mut stream) => stream.read(size),
164 Err(_) => Err(StreamError::trap("concurrent reads are not supported")),
165 }
166 }
167 fn skip(&mut self, size: usize) -> Result<usize, StreamError> {
168 match self.0.try_lock() {
169 Ok(mut stream) => stream.skip(size),
170 Err(_) => Err(StreamError::trap("concurrent skips are not supported")),
171 }
172 }
173 async fn cancel(&mut self) {
174 if let Some(mutex) = Arc::get_mut(&mut self.0) {
176 match mutex.try_lock() {
177 Ok(mut stream) => stream.cancel().await,
178 Err(_) => {}
179 }
180 }
181 }
182}
183
184#[async_trait::async_trait]
185impl Pollable for AsyncStdinStream {
186 async fn ready(&mut self) {
187 self.0.lock().await.ready().await
188 }
189}
190
191mod worker_thread_stdin;
192pub use self::worker_thread_stdin::{Stdin, stdin};
193
194pub trait StdoutStream: Send {
196 fn stream(&self) -> Box<dyn OutputStream>;
209
210 fn isatty(&self) -> bool;
212}
213
214impl StdoutStream for pipe::MemoryOutputPipe {
215 fn stream(&self) -> Box<dyn OutputStream> {
216 Box::new(self.clone())
217 }
218
219 fn isatty(&self) -> bool {
220 false
221 }
222}
223
224impl StdoutStream for pipe::SinkOutputStream {
225 fn stream(&self) -> Box<dyn OutputStream> {
226 Box::new(*self)
227 }
228
229 fn isatty(&self) -> bool {
230 false
231 }
232}
233
234impl StdoutStream for pipe::ClosedOutputStream {
235 fn stream(&self) -> Box<dyn OutputStream> {
236 Box::new(*self)
237 }
238
239 fn isatty(&self) -> bool {
240 false
241 }
242}
243
244pub struct OutputFile {
248 file: Arc<std::fs::File>,
249}
250
251impl OutputFile {
252 pub fn new(file: std::fs::File) -> Self {
253 Self {
254 file: Arc::new(file),
255 }
256 }
257}
258
259impl StdoutStream for OutputFile {
260 fn stream(&self) -> Box<dyn OutputStream> {
261 Box::new(OutputFileStream {
262 file: Arc::clone(&self.file),
263 })
264 }
265
266 fn isatty(&self) -> bool {
267 false
268 }
269}
270
271struct OutputFileStream {
272 file: Arc<std::fs::File>,
273}
274
275#[async_trait::async_trait]
276impl Pollable for OutputFileStream {
277 async fn ready(&mut self) {}
278}
279
280impl OutputStream for OutputFileStream {
281 fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
282 use std::io::Write;
283 self.file
284 .write_all(&bytes)
285 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
286 }
287
288 fn flush(&mut self) -> StreamResult<()> {
289 use std::io::Write;
290 self.file
291 .flush()
292 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
293 }
294
295 fn check_write(&mut self) -> StreamResult<usize> {
296 Ok(1024 * 1024)
297 }
298}
299
300pub struct Stdout;
305
306pub fn stdout() -> Stdout {
311 Stdout
312}
313
314impl StdoutStream for Stdout {
315 fn stream(&self) -> Box<dyn OutputStream> {
316 Box::new(StdioOutputStream::Stdout)
317 }
318
319 fn isatty(&self) -> bool {
320 std::io::stdout().is_terminal()
321 }
322}
323
324pub struct Stderr;
329
330pub fn stderr() -> Stderr {
335 Stderr
336}
337
338impl StdoutStream for Stderr {
339 fn stream(&self) -> Box<dyn OutputStream> {
340 Box::new(StdioOutputStream::Stderr)
341 }
342
343 fn isatty(&self) -> bool {
344 std::io::stderr().is_terminal()
345 }
346}
347
348enum StdioOutputStream {
349 Stdout,
350 Stderr,
351}
352
353impl OutputStream for StdioOutputStream {
354 fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
355 use std::io::Write;
356 match self {
357 StdioOutputStream::Stdout => std::io::stdout().write_all(&bytes),
358 StdioOutputStream::Stderr => std::io::stderr().write_all(&bytes),
359 }
360 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
361 }
362
363 fn flush(&mut self) -> StreamResult<()> {
364 use std::io::Write;
365 match self {
366 StdioOutputStream::Stdout => std::io::stdout().flush(),
367 StdioOutputStream::Stderr => std::io::stderr().flush(),
368 }
369 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
370 }
371
372 fn check_write(&mut self) -> StreamResult<usize> {
373 Ok(1024 * 1024)
374 }
375}
376
377#[async_trait::async_trait]
378impl Pollable for StdioOutputStream {
379 async fn ready(&mut self) {}
380}
381
382pub struct AsyncStdoutStream(Arc<Mutex<crate::p2::pipe::AsyncWriteStream>>);
390
391impl AsyncStdoutStream {
392 pub fn new(s: crate::p2::pipe::AsyncWriteStream) -> Self {
393 Self(Arc::new(Mutex::new(s)))
394 }
395}
396
397impl StdoutStream for AsyncStdoutStream {
398 fn stream(&self) -> Box<dyn OutputStream> {
399 Box::new(Self(self.0.clone()))
400 }
401 fn isatty(&self) -> bool {
402 false
403 }
404}
405
406#[async_trait::async_trait]
422impl OutputStream for AsyncStdoutStream {
423 fn check_write(&mut self) -> Result<usize, StreamError> {
424 match self.0.try_lock() {
425 Ok(mut stream) => stream.check_write(),
426 Err(_) => Err(StreamError::trap("concurrent writes are not supported")),
427 }
428 }
429 fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
430 match self.0.try_lock() {
431 Ok(mut stream) => stream.write(bytes),
432 Err(_) => Err(StreamError::trap("concurrent writes not supported yet")),
433 }
434 }
435 fn flush(&mut self) -> Result<(), StreamError> {
436 match self.0.try_lock() {
437 Ok(mut stream) => stream.flush(),
438 Err(_) => Err(StreamError::trap("concurrent flushes not supported yet")),
439 }
440 }
441 async fn cancel(&mut self) {
442 if let Some(mutex) = Arc::get_mut(&mut self.0) {
444 match mutex.try_lock() {
445 Ok(mut stream) => stream.cancel().await,
446 Err(_) => {}
447 }
448 }
449 }
450}
451
452#[async_trait::async_trait]
453impl Pollable for AsyncStdoutStream {
454 async fn ready(&mut self) {
455 self.0.lock().await.ready().await
456 }
457}
458
459#[derive(Debug, Clone, Copy, PartialEq, Eq)]
460pub enum IsATTY {
461 Yes,
462 No,
463}
464
465impl<T> stdin::Host for WasiImpl<T>
466where
467 T: WasiView,
468{
469 fn get_stdin(&mut self) -> Result<Resource<streams::DynInputStream>, anyhow::Error> {
470 let stream = self.ctx().stdin.stream();
471 Ok(self.table().push(stream)?)
472 }
473}
474
475impl<T> stdout::Host for WasiImpl<T>
476where
477 T: WasiView,
478{
479 fn get_stdout(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
480 let stream = self.ctx().stdout.stream();
481 Ok(self.table().push(stream)?)
482 }
483}
484
485impl<T> stderr::Host for WasiImpl<T>
486where
487 T: WasiView,
488{
489 fn get_stderr(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
490 let stream = self.ctx().stderr.stream();
491 Ok(self.table().push(stream)?)
492 }
493}
494
495pub struct TerminalInput;
496pub struct TerminalOutput;
497
498impl<T> terminal_input::Host for WasiImpl<T> where T: WasiView {}
499impl<T> terminal_input::HostTerminalInput for WasiImpl<T>
500where
501 T: WasiView,
502{
503 fn drop(&mut self, r: Resource<TerminalInput>) -> anyhow::Result<()> {
504 self.table().delete(r)?;
505 Ok(())
506 }
507}
508impl<T> terminal_output::Host for WasiImpl<T> where T: WasiView {}
509impl<T> terminal_output::HostTerminalOutput for WasiImpl<T>
510where
511 T: WasiView,
512{
513 fn drop(&mut self, r: Resource<TerminalOutput>) -> anyhow::Result<()> {
514 self.table().delete(r)?;
515 Ok(())
516 }
517}
518impl<T> terminal_stdin::Host for WasiImpl<T>
519where
520 T: WasiView,
521{
522 fn get_terminal_stdin(&mut self) -> anyhow::Result<Option<Resource<TerminalInput>>> {
523 if self.ctx().stdin.isatty() {
524 let fd = self.table().push(TerminalInput)?;
525 Ok(Some(fd))
526 } else {
527 Ok(None)
528 }
529 }
530}
531impl<T> terminal_stdout::Host for WasiImpl<T>
532where
533 T: WasiView,
534{
535 fn get_terminal_stdout(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
536 if self.ctx().stdout.isatty() {
537 let fd = self.table().push(TerminalOutput)?;
538 Ok(Some(fd))
539 } else {
540 Ok(None)
541 }
542 }
543}
544impl<T> terminal_stderr::Host for WasiImpl<T>
545where
546 T: WasiView,
547{
548 fn get_terminal_stderr(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
549 if self.ctx().stderr.isatty() {
550 let fd = self.table().push(TerminalOutput)?;
551 Ok(Some(fd))
552 } else {
553 Ok(None)
554 }
555 }
556}
557
558#[cfg(test)]
559mod test {
560 use crate::p2::stdio::StdoutStream;
561 use crate::p2::write_stream::AsyncWriteStream;
562 use crate::p2::{AsyncStdoutStream, OutputStream};
563 use anyhow::Result;
564 use bytes::Bytes;
565 use tokio::io::AsyncReadExt;
566
567 #[test]
568 fn memory_stdin_stream() {
569 let pipe = super::pipe::MemoryInputPipe::new(
578 "the quick brown fox jumped over the three lazy dogs",
579 );
580
581 use super::StdinStream;
582
583 let mut view1 = pipe.stream();
584 let mut view2 = pipe.stream();
585
586 let read1 = view1.read(10).expect("read first 10 bytes");
587 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
588 let read2 = view2.read(10).expect("read second 10 bytes");
589 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
590 let read3 = view1.read(10).expect("read third 10 bytes");
591 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
592 let read4 = view2.read(10).expect("read fourth 10 bytes");
593 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
594 }
595
596 #[tokio::test]
597 async fn async_stdin_stream() {
598 let dir = tempfile::tempdir().unwrap();
609 let mut path = std::path::PathBuf::from(dir.path());
610 path.push("file");
611 std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
612
613 let file = tokio::fs::File::open(&path)
614 .await
615 .expect("open created file");
616 let stdin_stream =
617 super::AsyncStdinStream::new(crate::p2::pipe::AsyncReadStream::new(file));
618
619 use super::StdinStream;
620
621 let mut view1 = stdin_stream.stream();
622 let mut view2 = stdin_stream.stream();
623
624 view1.ready().await;
625
626 let read1 = view1.read(10).expect("read first 10 bytes");
627 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
628 let read2 = view2.read(10).expect("read second 10 bytes");
629 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
630 let read3 = view1.read(10).expect("read third 10 bytes");
631 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
632 let read4 = view2.read(10).expect("read fourth 10 bytes");
633 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
634 }
635
636 #[tokio::test]
637 async fn async_stdout_stream_unblocks() {
638 let (mut read, write) = tokio::io::duplex(32);
639 let stdout = AsyncStdoutStream::new(AsyncWriteStream::new(32, write));
640
641 let task = tokio::task::spawn(async move {
642 let mut stream = stdout.stream();
643 blocking_write_and_flush(&mut *stream, "x".into())
644 .await
645 .unwrap();
646 });
647
648 let mut buf = [0; 100];
649 let n = read.read(&mut buf).await.unwrap();
650 assert_eq!(&buf[..n], b"x");
651
652 task.await.unwrap();
653 }
654
655 async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
656 while !bytes.is_empty() {
657 let permit = s.write_ready().await?;
658 let len = bytes.len().min(permit);
659 let chunk = bytes.split_to(len);
660 s.write(chunk)?;
661 }
662
663 s.flush()?;
664 s.write_ready().await?;
665 Ok(())
666 }
667}