wasmtime_wasi/p3/cli/
host.rs

1use crate::I32Exit;
2use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3use crate::p3::DEFAULT_BUFFER_CAPACITY;
4use crate::p3::bindings::cli::types::ErrorCode;
5use crate::p3::bindings::cli::{
6    environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7    terminal_stdin, terminal_stdout,
8};
9use crate::p3::cli::{TerminalInput, TerminalOutput};
10use anyhow::{Context as _, anyhow};
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll};
14use std::io::{self, Cursor};
15use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
16use tokio::sync::oneshot;
17use wasmtime::component::{
18    Accessor, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
19    StreamReader, StreamResult,
20};
21use wasmtime::{AsContextMut as _, StoreContextMut};
22
23struct InputStreamProducer {
24    rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
25    result_tx: Option<oneshot::Sender<ErrorCode>>,
26}
27
28fn io_error_to_error_code(err: io::Error) -> ErrorCode {
29    match err.kind() {
30        io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
31        other => {
32            tracing::warn!("stdio error: {other}");
33            ErrorCode::Io
34        }
35    }
36}
37
38impl<D> StreamProducer<D> for InputStreamProducer {
39    type Item = u8;
40    type Buffer = Cursor<BytesMut>;
41
42    fn poll_produce<'a>(
43        mut self: Pin<&mut Self>,
44        cx: &mut Context<'_>,
45        mut store: StoreContextMut<'a, D>,
46        dst: Destination<'a, Self::Item, Self::Buffer>,
47        finish: bool,
48    ) -> Poll<wasmtime::Result<StreamResult>> {
49        // If the destination buffer is empty then this is a request on
50        // behalf of the guest to wait for this input stream to be readable.
51        // The `AsyncRead` trait abstraction does not provide the ability to
52        // await this event so we're forced to basically just lie here and
53        // say we're ready read data later.
54        //
55        // See WebAssembly/component-model#561 for some more information.
56        if dst.remaining(store.as_context_mut()) == Some(0) {
57            return Poll::Ready(Ok(StreamResult::Completed));
58        }
59
60        let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
61        let mut buf = ReadBuf::new(dst.remaining());
62        match self.rx.as_mut().poll_read(cx, &mut buf) {
63            Poll::Ready(Ok(())) if buf.filled().is_empty() => {
64                Poll::Ready(Ok(StreamResult::Dropped))
65            }
66            Poll::Ready(Ok(())) => {
67                let n = buf.filled().len();
68                dst.mark_written(n);
69                Poll::Ready(Ok(StreamResult::Completed))
70            }
71            Poll::Ready(Err(e)) => {
72                let _ = self
73                    .result_tx
74                    .take()
75                    .unwrap()
76                    .send(io_error_to_error_code(e));
77                Poll::Ready(Ok(StreamResult::Dropped))
78            }
79            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
80            Poll::Pending => Poll::Pending,
81        }
82    }
83}
84
85struct OutputStreamConsumer {
86    tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
87    result_tx: Option<oneshot::Sender<ErrorCode>>,
88}
89
90impl<D> StreamConsumer<D> for OutputStreamConsumer {
91    type Item = u8;
92
93    fn poll_consume(
94        mut self: Pin<&mut Self>,
95        cx: &mut Context<'_>,
96        store: StoreContextMut<D>,
97        src: Source<Self::Item>,
98        finish: bool,
99    ) -> Poll<wasmtime::Result<StreamResult>> {
100        let mut src = src.as_direct(store);
101        let buf = src.remaining();
102
103        // If the source buffer is empty then this is a request on behalf of
104        // the guest to wait for this output stream to be writable. The
105        // `AsyncWrite` trait abstraction does not provide the ability to await
106        // this event so we're forced to basically just lie here and say we're
107        // ready write data later.
108        //
109        // See WebAssembly/component-model#561 for some more information.
110        if buf.len() == 0 {
111            return Poll::Ready(Ok(StreamResult::Completed));
112        }
113        match self.tx.as_mut().poll_write(cx, buf) {
114            Poll::Ready(Ok(n)) => {
115                src.mark_read(n);
116                Poll::Ready(Ok(StreamResult::Completed))
117            }
118            Poll::Ready(Err(e)) => {
119                let _ = self
120                    .result_tx
121                    .take()
122                    .unwrap()
123                    .send(io_error_to_error_code(e));
124                Poll::Ready(Ok(StreamResult::Dropped))
125            }
126            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
127            Poll::Pending => Poll::Pending,
128        }
129    }
130}
131
132impl terminal_input::Host for WasiCliCtxView<'_> {}
133impl terminal_output::Host for WasiCliCtxView<'_> {}
134
135impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
136    fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
137        self.table
138            .delete(rep)
139            .context("failed to delete terminal input resource from table")?;
140        Ok(())
141    }
142}
143
144impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
145    fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
146        self.table
147            .delete(rep)
148            .context("failed to delete terminal output resource from table")?;
149        Ok(())
150    }
151}
152
153impl terminal_stdin::Host for WasiCliCtxView<'_> {
154    fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
155        if self.ctx.stdin.is_terminal() {
156            let fd = self
157                .table
158                .push(TerminalInput)
159                .context("failed to push terminal stdin resource to table")?;
160            Ok(Some(fd))
161        } else {
162            Ok(None)
163        }
164    }
165}
166
167impl terminal_stdout::Host for WasiCliCtxView<'_> {
168    fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
169        if self.ctx.stdout.is_terminal() {
170            let fd = self
171                .table
172                .push(TerminalOutput)
173                .context("failed to push terminal stdout resource to table")?;
174            Ok(Some(fd))
175        } else {
176            Ok(None)
177        }
178    }
179}
180
181impl terminal_stderr::Host for WasiCliCtxView<'_> {
182    fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
183        if self.ctx.stderr.is_terminal() {
184            let fd = self
185                .table
186                .push(TerminalOutput)
187                .context("failed to push terminal stderr resource to table")?;
188            Ok(Some(fd))
189        } else {
190            Ok(None)
191        }
192    }
193}
194
195impl stdin::HostWithStore for WasiCli {
196    async fn read_via_stream<U>(
197        store: &Accessor<U, Self>,
198    ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
199        store.with(|mut store| {
200            let rx = store.get().ctx.stdin.async_stream();
201            let (result_tx, result_rx) = oneshot::channel();
202            let stream = StreamReader::new(
203                &mut store,
204                InputStreamProducer {
205                    rx: Box::into_pin(rx),
206                    result_tx: Some(result_tx),
207                },
208            );
209            let future = FutureReader::new(&mut store, async {
210                anyhow::Ok(match result_rx.await {
211                    Ok(err) => Err(err),
212                    Err(_) => Ok(()),
213                })
214            });
215            Ok((stream, future))
216        })
217    }
218}
219
220impl stdin::Host for WasiCliCtxView<'_> {}
221
222impl stdout::HostWithStore for WasiCli {
223    async fn write_via_stream<U>(
224        store: &Accessor<U, Self>,
225        data: StreamReader<u8>,
226    ) -> wasmtime::Result<Result<(), ErrorCode>> {
227        let (result_tx, result_rx) = oneshot::channel();
228        store.with(|mut store| {
229            let tx = store.get().ctx.stdout.async_stream();
230            data.pipe(
231                store,
232                OutputStreamConsumer {
233                    tx: Box::into_pin(tx),
234                    result_tx: Some(result_tx),
235                },
236            );
237        });
238        Ok(match result_rx.await {
239            Ok(err) => Err(err),
240            Err(_) => Ok(()),
241        })
242    }
243}
244
245impl stdout::Host for WasiCliCtxView<'_> {}
246
247impl stderr::HostWithStore for WasiCli {
248    async fn write_via_stream<U>(
249        store: &Accessor<U, Self>,
250        data: StreamReader<u8>,
251    ) -> wasmtime::Result<Result<(), ErrorCode>> {
252        let (result_tx, result_rx) = oneshot::channel();
253        store.with(|mut store| {
254            let tx = store.get().ctx.stderr.async_stream();
255            data.pipe(
256                store,
257                OutputStreamConsumer {
258                    tx: Box::into_pin(tx),
259                    result_tx: Some(result_tx),
260                },
261            );
262        });
263        Ok(match result_rx.await {
264            Ok(err) => Err(err),
265            Err(_) => Ok(()),
266        })
267    }
268}
269
270impl stderr::Host for WasiCliCtxView<'_> {}
271
272impl environment::Host for WasiCliCtxView<'_> {
273    fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
274        Ok(self.ctx.environment.clone())
275    }
276
277    fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
278        Ok(self.ctx.arguments.clone())
279    }
280
281    fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
282        Ok(self.ctx.initial_cwd.clone())
283    }
284}
285
286impl exit::Host for WasiCliCtxView<'_> {
287    fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
288        let status = match status {
289            Ok(()) => 0,
290            Err(()) => 1,
291        };
292        Err(anyhow!(I32Exit(status)))
293    }
294
295    fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
296        Err(anyhow!(I32Exit(status_code.into())))
297    }
298}