Skip to main content

wasmtime_wasi/p3/cli/
host.rs

1use crate::I32Exit;
2use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3use crate::p3::DEFAULT_BUFFER_CAPACITY;
4use crate::p3::bindings::cli::types::ErrorCode;
5use crate::p3::bindings::cli::{
6    environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7    terminal_stdin, terminal_stdout,
8};
9use crate::p3::cli::{TerminalInput, TerminalOutput};
10use bytes::BytesMut;
11use core::pin::Pin;
12use core::task::{Context, Poll};
13use std::io;
14use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15use tokio::sync::oneshot;
16use wasmtime::component::{
17    Access, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
18    StreamReader, StreamResult,
19};
20use wasmtime::{AsContextMut as _, StoreContextMut, error::Context as _, format_err};
21
22struct InputStreamProducer {
23    rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
24    result_tx: Option<oneshot::Sender<ErrorCode>>,
25}
26
27fn io_error_to_error_code(err: io::Error) -> ErrorCode {
28    match err.kind() {
29        io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
30        other => {
31            tracing::warn!("stdio error: {other}");
32            ErrorCode::Io
33        }
34    }
35}
36
37impl<D> StreamProducer<D> for InputStreamProducer {
38    type Item = u8;
39    type Buffer = BytesMut;
40
41    fn poll_produce<'a>(
42        mut self: Pin<&mut Self>,
43        cx: &mut Context<'_>,
44        mut store: StoreContextMut<'a, D>,
45        dst: Destination<'a, Self::Item, Self::Buffer>,
46        finish: bool,
47    ) -> Poll<wasmtime::Result<StreamResult>> {
48        // If the destination buffer is empty then this is a request on
49        // behalf of the guest to wait for this input stream to be readable.
50        // The `AsyncRead` trait abstraction does not provide the ability to
51        // await this event so we're forced to basically just lie here and
52        // say we're ready read data later.
53        //
54        // See WebAssembly/component-model#561 for some more information.
55        if dst.remaining(store.as_context_mut()) == Some(0) {
56            return Poll::Ready(Ok(StreamResult::Completed));
57        }
58
59        let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
60        let mut buf = ReadBuf::new(dst.remaining());
61        match self.rx.as_mut().poll_read(cx, &mut buf) {
62            Poll::Ready(Ok(())) if buf.filled().is_empty() => {
63                Poll::Ready(Ok(StreamResult::Dropped))
64            }
65            Poll::Ready(Ok(())) => {
66                let n = buf.filled().len();
67                dst.mark_written(n);
68                Poll::Ready(Ok(StreamResult::Completed))
69            }
70            Poll::Ready(Err(e)) => {
71                let _ = self
72                    .result_tx
73                    .take()
74                    .unwrap()
75                    .send(io_error_to_error_code(e));
76                Poll::Ready(Ok(StreamResult::Dropped))
77            }
78            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
79            Poll::Pending => Poll::Pending,
80        }
81    }
82}
83
84struct OutputStreamConsumer {
85    tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
86    result_tx: Option<oneshot::Sender<ErrorCode>>,
87    flush_pending: bool,
88}
89
90impl OutputStreamConsumer {
91    fn poll_flush(
92        &mut self,
93        cx: &mut Context<'_>,
94        finish: bool,
95    ) -> Poll<wasmtime::Result<StreamResult>> {
96        match self.tx.as_mut().poll_flush(cx) {
97            Poll::Ready(Ok(())) => {
98                self.flush_pending = false;
99                Poll::Ready(Ok(StreamResult::Completed))
100            }
101            Poll::Ready(Err(e)) => self.dropped(e),
102            Poll::Pending => {
103                if finish {
104                    self.flush_pending = false;
105                    Poll::Ready(Ok(StreamResult::Cancelled))
106                } else {
107                    self.flush_pending = true;
108                    Poll::Pending
109                }
110            }
111        }
112    }
113
114    fn dropped(&mut self, err: io::Error) -> Poll<wasmtime::Result<StreamResult>> {
115        if let Some(tx) = self.result_tx.take() {
116            let _ = tx.send(io_error_to_error_code(err));
117        }
118        Poll::Ready(Ok(StreamResult::Dropped))
119    }
120}
121
122impl<D> StreamConsumer<D> for OutputStreamConsumer {
123    type Item = u8;
124
125    fn poll_consume(
126        mut self: Pin<&mut Self>,
127        cx: &mut Context<'_>,
128        store: StoreContextMut<D>,
129        src: Source<Self::Item>,
130        finish: bool,
131    ) -> Poll<wasmtime::Result<StreamResult>> {
132        if self.flush_pending {
133            return self.poll_flush(cx, finish);
134        }
135
136        let mut src = src.as_direct(store);
137        let buf = src.remaining();
138
139        // If the source buffer is empty then this is a request on behalf of
140        // the guest to wait for this output stream to be writable. The
141        // `AsyncWrite` trait abstraction does not provide the ability to await
142        // this event so we're forced to basically just lie here and say we're
143        // ready write data later.
144        //
145        // See WebAssembly/component-model#561 for some more information.
146        if buf.len() == 0 {
147            return Poll::Ready(Ok(StreamResult::Completed));
148        }
149        match self.tx.as_mut().poll_write(cx, buf) {
150            Poll::Ready(Ok(0)) => self.dropped(io::ErrorKind::WriteZero.into()),
151            Poll::Ready(Ok(n)) => {
152                src.mark_read(n);
153                self.poll_flush(cx, finish)
154            }
155            Poll::Ready(Err(e)) => self.dropped(e),
156            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
157            Poll::Pending => Poll::Pending,
158        }
159    }
160}
161
162impl terminal_input::Host for WasiCliCtxView<'_> {}
163impl terminal_output::Host for WasiCliCtxView<'_> {}
164
165impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
166    fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
167        self.table
168            .delete(rep)
169            .context("failed to delete terminal input resource from table")?;
170        Ok(())
171    }
172}
173
174impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
175    fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
176        self.table
177            .delete(rep)
178            .context("failed to delete terminal output resource from table")?;
179        Ok(())
180    }
181}
182
183impl terminal_stdin::Host for WasiCliCtxView<'_> {
184    fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
185        if self.ctx.stdin.is_terminal() {
186            let fd = self
187                .table
188                .push(TerminalInput)
189                .context("failed to push terminal stdin resource to table")?;
190            Ok(Some(fd))
191        } else {
192            Ok(None)
193        }
194    }
195}
196
197impl terminal_stdout::Host for WasiCliCtxView<'_> {
198    fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
199        if self.ctx.stdout.is_terminal() {
200            let fd = self
201                .table
202                .push(TerminalOutput)
203                .context("failed to push terminal stdout resource to table")?;
204            Ok(Some(fd))
205        } else {
206            Ok(None)
207        }
208    }
209}
210
211impl terminal_stderr::Host for WasiCliCtxView<'_> {
212    fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
213        if self.ctx.stderr.is_terminal() {
214            let fd = self
215                .table
216                .push(TerminalOutput)
217                .context("failed to push terminal stderr resource to table")?;
218            Ok(Some(fd))
219        } else {
220            Ok(None)
221        }
222    }
223}
224
225impl<U> stdin::HostWithStore<U> for WasiCli {
226    fn read_via_stream(
227        mut store: Access<U, Self>,
228    ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
229        let rx = store.get().ctx.stdin.async_stream();
230        let (result_tx, result_rx) = oneshot::channel();
231        let stream = StreamReader::new(
232            &mut store,
233            InputStreamProducer {
234                rx: Box::into_pin(rx),
235                result_tx: Some(result_tx),
236            },
237        )?;
238        let future = FutureReader::new(&mut store, async {
239            wasmtime::error::Ok(match result_rx.await {
240                Ok(err) => Err(err),
241                Err(_) => Ok(()),
242            })
243        })?;
244        Ok((stream, future))
245    }
246}
247
248impl stdin::Host for WasiCliCtxView<'_> {}
249
250impl<U> stdout::HostWithStore<U> for WasiCli {
251    fn write_via_stream(
252        mut store: Access<'_, U, Self>,
253        data: StreamReader<u8>,
254    ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
255        let (result_tx, result_rx) = oneshot::channel();
256        let tx = store.get().ctx.stdout.async_stream();
257        data.pipe(
258            &mut store,
259            OutputStreamConsumer {
260                tx: Box::into_pin(tx),
261                result_tx: Some(result_tx),
262                flush_pending: false,
263            },
264        )?;
265        FutureReader::new(&mut store, async {
266            wasmtime::error::Ok(match result_rx.await {
267                Ok(err) => Err(err),
268                Err(_) => Ok(()),
269            })
270        })
271    }
272}
273
274impl stdout::Host for WasiCliCtxView<'_> {}
275
276impl<U> stderr::HostWithStore<U> for WasiCli {
277    fn write_via_stream(
278        mut store: Access<'_, U, Self>,
279        data: StreamReader<u8>,
280    ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
281        let (result_tx, result_rx) = oneshot::channel();
282        let tx = store.get().ctx.stderr.async_stream();
283        data.pipe(
284            &mut store,
285            OutputStreamConsumer {
286                tx: Box::into_pin(tx),
287                result_tx: Some(result_tx),
288                flush_pending: false,
289            },
290        )?;
291        FutureReader::new(&mut store, async {
292            wasmtime::error::Ok(match result_rx.await {
293                Ok(err) => Err(err),
294                Err(_) => Ok(()),
295            })
296        })
297    }
298}
299
300impl stderr::Host for WasiCliCtxView<'_> {}
301
302impl environment::Host for WasiCliCtxView<'_> {
303    fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
304        Ok(self.ctx.environment.clone())
305    }
306
307    fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
308        Ok(self.ctx.arguments.clone())
309    }
310
311    fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
312        Ok(self.ctx.initial_cwd.clone())
313    }
314}
315
316impl exit::Host for WasiCliCtxView<'_> {
317    fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
318        let status = match status {
319            Ok(()) => 0,
320            Err(()) => 1,
321        };
322        Err(format_err!(I32Exit(status)))
323    }
324
325    fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
326        Err(format_err!(I32Exit(status_code.into())))
327    }
328}