1use crate::cli::IsTerminal;
2use crate::p2::bindings::cli::{
3 stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin,
4 terminal_stdout,
5};
6use crate::p2::pipe;
7use crate::p2::{
8 InputStream, IoView, OutputStream, Pollable, StreamError, StreamResult, WasiImpl, WasiView,
9};
10use bytes::Bytes;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use wasmtime::component::Resource;
14use wasmtime_wasi_io::streams;
15
16pub trait StdinStream: IsTerminal + Send {
24 fn stream(&self) -> Box<dyn InputStream>;
36}
37
38impl StdinStream for pipe::MemoryInputPipe {
39 fn stream(&self) -> Box<dyn InputStream> {
40 Box::new(self.clone())
41 }
42}
43
44impl IsTerminal for pipe::MemoryInputPipe {
45 fn is_terminal(&self) -> bool {
46 false
47 }
48}
49
50impl StdinStream for pipe::ClosedInputStream {
51 fn stream(&self) -> Box<dyn InputStream> {
52 Box::new(*self)
53 }
54}
55
56impl IsTerminal for pipe::ClosedInputStream {
57 fn is_terminal(&self) -> bool {
58 false
59 }
60}
61
62pub struct InputFile {
66 file: Arc<std::fs::File>,
67}
68
69impl InputFile {
70 pub fn new(file: std::fs::File) -> Self {
71 Self {
72 file: Arc::new(file),
73 }
74 }
75}
76
77impl StdinStream for InputFile {
78 fn stream(&self) -> Box<dyn InputStream> {
79 Box::new(InputFileStream {
80 file: Arc::clone(&self.file),
81 })
82 }
83}
84
85impl IsTerminal for InputFile {
86 fn is_terminal(&self) -> bool {
87 false
88 }
89}
90
91struct InputFileStream {
92 file: Arc<std::fs::File>,
93}
94
95#[async_trait::async_trait]
96impl Pollable for InputFileStream {
97 async fn ready(&mut self) {}
98}
99
100impl InputStream for InputFileStream {
101 fn read(&mut self, size: usize) -> StreamResult<Bytes> {
102 use std::io::Read;
103
104 let mut buf = bytes::BytesMut::zeroed(size);
105 let bytes_read = self
106 .file
107 .read(&mut buf)
108 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
109 if bytes_read == 0 {
110 return Err(StreamError::Closed);
111 }
112 buf.truncate(bytes_read);
113 StreamResult::Ok(buf.into())
114 }
115}
116
117pub struct AsyncStdinStream(Arc<Mutex<crate::p2::pipe::AsyncReadStream>>);
146
147impl AsyncStdinStream {
148 pub fn new(s: crate::p2::pipe::AsyncReadStream) -> Self {
149 Self(Arc::new(Mutex::new(s)))
150 }
151}
152
153impl StdinStream for AsyncStdinStream {
154 fn stream(&self) -> Box<dyn InputStream> {
155 Box::new(Self(self.0.clone()))
156 }
157}
158
159impl IsTerminal for AsyncStdinStream {
160 fn is_terminal(&self) -> bool {
161 false
162 }
163}
164
165#[async_trait::async_trait]
166impl InputStream for AsyncStdinStream {
167 fn read(&mut self, size: usize) -> Result<bytes::Bytes, StreamError> {
168 match self.0.try_lock() {
169 Ok(mut stream) => stream.read(size),
170 Err(_) => Err(StreamError::trap("concurrent reads are not supported")),
171 }
172 }
173 fn skip(&mut self, size: usize) -> Result<usize, StreamError> {
174 match self.0.try_lock() {
175 Ok(mut stream) => stream.skip(size),
176 Err(_) => Err(StreamError::trap("concurrent skips are not supported")),
177 }
178 }
179 async fn cancel(&mut self) {
180 if let Some(mutex) = Arc::get_mut(&mut self.0) {
182 match mutex.try_lock() {
183 Ok(mut stream) => stream.cancel().await,
184 Err(_) => {}
185 }
186 }
187 }
188}
189
190#[async_trait::async_trait]
191impl Pollable for AsyncStdinStream {
192 async fn ready(&mut self) {
193 self.0.lock().await.ready().await
194 }
195}
196
197mod worker_thread_stdin;
198pub use self::worker_thread_stdin::{Stdin, stdin};
199
200pub trait StdoutStream: IsTerminal + Send {
202 fn stream(&self) -> Box<dyn OutputStream>;
215}
216
217impl StdoutStream for pipe::MemoryOutputPipe {
218 fn stream(&self) -> Box<dyn OutputStream> {
219 Box::new(self.clone())
220 }
221}
222
223impl IsTerminal for pipe::MemoryOutputPipe {
224 fn is_terminal(&self) -> bool {
225 false
226 }
227}
228
229impl StdoutStream for pipe::SinkOutputStream {
230 fn stream(&self) -> Box<dyn OutputStream> {
231 Box::new(*self)
232 }
233}
234
235impl IsTerminal for pipe::SinkOutputStream {
236 fn is_terminal(&self) -> bool {
237 false
238 }
239}
240
241impl StdoutStream for pipe::ClosedOutputStream {
242 fn stream(&self) -> Box<dyn OutputStream> {
243 Box::new(*self)
244 }
245}
246
247impl IsTerminal for pipe::ClosedOutputStream {
248 fn is_terminal(&self) -> bool {
249 false
250 }
251}
252
253pub struct OutputFile {
257 file: Arc<std::fs::File>,
258}
259
260impl OutputFile {
261 pub fn new(file: std::fs::File) -> Self {
262 Self {
263 file: Arc::new(file),
264 }
265 }
266}
267
268impl StdoutStream for OutputFile {
269 fn stream(&self) -> Box<dyn OutputStream> {
270 Box::new(OutputFileStream {
271 file: Arc::clone(&self.file),
272 })
273 }
274}
275
276impl IsTerminal for OutputFile {
277 fn is_terminal(&self) -> bool {
278 false
279 }
280}
281
282struct OutputFileStream {
283 file: Arc<std::fs::File>,
284}
285
286#[async_trait::async_trait]
287impl Pollable for OutputFileStream {
288 async fn ready(&mut self) {}
289}
290
291impl OutputStream for OutputFileStream {
292 fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
293 use std::io::Write;
294 self.file
295 .write_all(&bytes)
296 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
297 }
298
299 fn flush(&mut self) -> StreamResult<()> {
300 use std::io::Write;
301 self.file
302 .flush()
303 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
304 }
305
306 fn check_write(&mut self) -> StreamResult<usize> {
307 Ok(1024 * 1024)
308 }
309}
310
311pub struct Stdout;
316
317pub fn stdout() -> Stdout {
322 Stdout
323}
324
325impl StdoutStream for Stdout {
326 fn stream(&self) -> Box<dyn OutputStream> {
327 Box::new(StdioOutputStream::Stdout)
328 }
329}
330
331impl IsTerminal for Stdout {
332 fn is_terminal(&self) -> bool {
333 std::io::stdout().is_terminal()
334 }
335}
336
337pub struct Stderr;
342
343pub fn stderr() -> Stderr {
348 Stderr
349}
350
351impl StdoutStream for Stderr {
352 fn stream(&self) -> Box<dyn OutputStream> {
353 Box::new(StdioOutputStream::Stderr)
354 }
355}
356
357impl IsTerminal for Stderr {
358 fn is_terminal(&self) -> bool {
359 std::io::stderr().is_terminal()
360 }
361}
362
363enum StdioOutputStream {
364 Stdout,
365 Stderr,
366}
367
368impl OutputStream for StdioOutputStream {
369 fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
370 use std::io::Write;
371 match self {
372 StdioOutputStream::Stdout => std::io::stdout().write_all(&bytes),
373 StdioOutputStream::Stderr => std::io::stderr().write_all(&bytes),
374 }
375 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
376 }
377
378 fn flush(&mut self) -> StreamResult<()> {
379 use std::io::Write;
380 match self {
381 StdioOutputStream::Stdout => std::io::stdout().flush(),
382 StdioOutputStream::Stderr => std::io::stderr().flush(),
383 }
384 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
385 }
386
387 fn check_write(&mut self) -> StreamResult<usize> {
388 Ok(1024 * 1024)
389 }
390}
391
392#[async_trait::async_trait]
393impl Pollable for StdioOutputStream {
394 async fn ready(&mut self) {}
395}
396
397pub struct AsyncStdoutStream(Arc<Mutex<crate::p2::pipe::AsyncWriteStream>>);
405
406impl AsyncStdoutStream {
407 pub fn new(s: crate::p2::pipe::AsyncWriteStream) -> Self {
408 Self(Arc::new(Mutex::new(s)))
409 }
410}
411
412impl StdoutStream for AsyncStdoutStream {
413 fn stream(&self) -> Box<dyn OutputStream> {
414 Box::new(Self(self.0.clone()))
415 }
416}
417
418impl IsTerminal for AsyncStdoutStream {
419 fn is_terminal(&self) -> bool {
420 false
421 }
422}
423
424#[async_trait::async_trait]
440impl OutputStream for AsyncStdoutStream {
441 fn check_write(&mut self) -> Result<usize, StreamError> {
442 match self.0.try_lock() {
443 Ok(mut stream) => stream.check_write(),
444 Err(_) => Err(StreamError::trap("concurrent writes are not supported")),
445 }
446 }
447 fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
448 match self.0.try_lock() {
449 Ok(mut stream) => stream.write(bytes),
450 Err(_) => Err(StreamError::trap("concurrent writes not supported yet")),
451 }
452 }
453 fn flush(&mut self) -> Result<(), StreamError> {
454 match self.0.try_lock() {
455 Ok(mut stream) => stream.flush(),
456 Err(_) => Err(StreamError::trap("concurrent flushes not supported yet")),
457 }
458 }
459 async fn cancel(&mut self) {
460 if let Some(mutex) = Arc::get_mut(&mut self.0) {
462 match mutex.try_lock() {
463 Ok(mut stream) => stream.cancel().await,
464 Err(_) => {}
465 }
466 }
467 }
468}
469
470#[async_trait::async_trait]
471impl Pollable for AsyncStdoutStream {
472 async fn ready(&mut self) {
473 self.0.lock().await.ready().await
474 }
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
478pub enum IsATTY {
479 Yes,
480 No,
481}
482
483impl<T> stdin::Host for WasiImpl<T>
484where
485 T: WasiView,
486{
487 fn get_stdin(&mut self) -> Result<Resource<streams::DynInputStream>, anyhow::Error> {
488 let stream = self.ctx().stdin.stream();
489 Ok(self.table().push(stream)?)
490 }
491}
492
493impl<T> stdout::Host for WasiImpl<T>
494where
495 T: WasiView,
496{
497 fn get_stdout(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
498 let stream = self.ctx().stdout.stream();
499 Ok(self.table().push(stream)?)
500 }
501}
502
503impl<T> stderr::Host for WasiImpl<T>
504where
505 T: WasiView,
506{
507 fn get_stderr(&mut self) -> Result<Resource<streams::DynOutputStream>, anyhow::Error> {
508 let stream = self.ctx().stderr.stream();
509 Ok(self.table().push(stream)?)
510 }
511}
512
513pub struct TerminalInput;
514pub struct TerminalOutput;
515
516impl<T> terminal_input::Host for WasiImpl<T> where T: WasiView {}
517impl<T> terminal_input::HostTerminalInput for WasiImpl<T>
518where
519 T: WasiView,
520{
521 fn drop(&mut self, r: Resource<TerminalInput>) -> anyhow::Result<()> {
522 self.table().delete(r)?;
523 Ok(())
524 }
525}
526impl<T> terminal_output::Host for WasiImpl<T> where T: WasiView {}
527impl<T> terminal_output::HostTerminalOutput for WasiImpl<T>
528where
529 T: WasiView,
530{
531 fn drop(&mut self, r: Resource<TerminalOutput>) -> anyhow::Result<()> {
532 self.table().delete(r)?;
533 Ok(())
534 }
535}
536impl<T> terminal_stdin::Host for WasiImpl<T>
537where
538 T: WasiView,
539{
540 fn get_terminal_stdin(&mut self) -> anyhow::Result<Option<Resource<TerminalInput>>> {
541 if self.ctx().stdin.is_terminal() {
542 let fd = self.table().push(TerminalInput)?;
543 Ok(Some(fd))
544 } else {
545 Ok(None)
546 }
547 }
548}
549impl<T> terminal_stdout::Host for WasiImpl<T>
550where
551 T: WasiView,
552{
553 fn get_terminal_stdout(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
554 if self.ctx().stdout.is_terminal() {
555 let fd = self.table().push(TerminalOutput)?;
556 Ok(Some(fd))
557 } else {
558 Ok(None)
559 }
560 }
561}
562impl<T> terminal_stderr::Host for WasiImpl<T>
563where
564 T: WasiView,
565{
566 fn get_terminal_stderr(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
567 if self.ctx().stderr.is_terminal() {
568 let fd = self.table().push(TerminalOutput)?;
569 Ok(Some(fd))
570 } else {
571 Ok(None)
572 }
573 }
574}
575
576#[cfg(test)]
577mod test {
578 use crate::p2::stdio::StdoutStream;
579 use crate::p2::write_stream::AsyncWriteStream;
580 use crate::p2::{AsyncStdoutStream, OutputStream};
581 use anyhow::Result;
582 use bytes::Bytes;
583 use tokio::io::AsyncReadExt;
584
585 #[test]
586 fn memory_stdin_stream() {
587 let pipe = super::pipe::MemoryInputPipe::new(
596 "the quick brown fox jumped over the three lazy dogs",
597 );
598
599 use super::StdinStream;
600
601 let mut view1 = pipe.stream();
602 let mut view2 = pipe.stream();
603
604 let read1 = view1.read(10).expect("read first 10 bytes");
605 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
606 let read2 = view2.read(10).expect("read second 10 bytes");
607 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
608 let read3 = view1.read(10).expect("read third 10 bytes");
609 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
610 let read4 = view2.read(10).expect("read fourth 10 bytes");
611 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
612 }
613
614 #[tokio::test]
615 async fn async_stdin_stream() {
616 let dir = tempfile::tempdir().unwrap();
627 let mut path = std::path::PathBuf::from(dir.path());
628 path.push("file");
629 std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
630
631 let file = tokio::fs::File::open(&path)
632 .await
633 .expect("open created file");
634 let stdin_stream =
635 super::AsyncStdinStream::new(crate::p2::pipe::AsyncReadStream::new(file));
636
637 use super::StdinStream;
638
639 let mut view1 = stdin_stream.stream();
640 let mut view2 = stdin_stream.stream();
641
642 view1.ready().await;
643
644 let read1 = view1.read(10).expect("read first 10 bytes");
645 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
646 let read2 = view2.read(10).expect("read second 10 bytes");
647 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
648 let read3 = view1.read(10).expect("read third 10 bytes");
649 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
650 let read4 = view2.read(10).expect("read fourth 10 bytes");
651 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
652 }
653
654 #[tokio::test]
655 async fn async_stdout_stream_unblocks() {
656 let (mut read, write) = tokio::io::duplex(32);
657 let stdout = AsyncStdoutStream::new(AsyncWriteStream::new(32, write));
658
659 let task = tokio::task::spawn(async move {
660 let mut stream = stdout.stream();
661 blocking_write_and_flush(&mut *stream, "x".into())
662 .await
663 .unwrap();
664 });
665
666 let mut buf = [0; 100];
667 let n = read.read(&mut buf).await.unwrap();
668 assert_eq!(&buf[..n], b"x");
669
670 task.await.unwrap();
671 }
672
673 async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
674 while !bytes.is_empty() {
675 let permit = s.write_ready().await?;
676 let len = bytes.len().min(permit);
677 let chunk = bytes.split_to(len);
678 s.write(chunk)?;
679 }
680
681 s.flush()?;
682 s.write_ready().await?;
683 Ok(())
684 }
685}