Skip to main content

wasmtime_wasi/p3/cli/
host.rs

1use crate::I32Exit;
2use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3use crate::p3::DEFAULT_BUFFER_CAPACITY;
4use crate::p3::bindings::cli::types::ErrorCode;
5use crate::p3::bindings::cli::{
6    environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7    terminal_stdin, terminal_stdout,
8};
9use crate::p3::cli::{TerminalInput, TerminalOutput};
10use bytes::BytesMut;
11use core::pin::Pin;
12use core::task::{Context, Poll};
13use std::io::{self, Cursor};
14use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15use tokio::sync::oneshot;
16use wasmtime::component::{
17    Access, Accessor, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
18    StreamReader, StreamResult,
19};
20use wasmtime::{AsContextMut as _, StoreContextMut, error::Context as _, format_err};
21
22struct InputStreamProducer {
23    rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
24    result_tx: Option<oneshot::Sender<ErrorCode>>,
25}
26
27fn io_error_to_error_code(err: io::Error) -> ErrorCode {
28    match err.kind() {
29        io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
30        other => {
31            tracing::warn!("stdio error: {other}");
32            ErrorCode::Io
33        }
34    }
35}
36
37impl<D> StreamProducer<D> for InputStreamProducer {
38    type Item = u8;
39    type Buffer = Cursor<BytesMut>;
40
41    fn poll_produce<'a>(
42        mut self: Pin<&mut Self>,
43        cx: &mut Context<'_>,
44        mut store: StoreContextMut<'a, D>,
45        dst: Destination<'a, Self::Item, Self::Buffer>,
46        finish: bool,
47    ) -> Poll<wasmtime::Result<StreamResult>> {
48        // If the destination buffer is empty then this is a request on
49        // behalf of the guest to wait for this input stream to be readable.
50        // The `AsyncRead` trait abstraction does not provide the ability to
51        // await this event so we're forced to basically just lie here and
52        // say we're ready read data later.
53        //
54        // See WebAssembly/component-model#561 for some more information.
55        if dst.remaining(store.as_context_mut()) == Some(0) {
56            return Poll::Ready(Ok(StreamResult::Completed));
57        }
58
59        let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
60        let mut buf = ReadBuf::new(dst.remaining());
61        match self.rx.as_mut().poll_read(cx, &mut buf) {
62            Poll::Ready(Ok(())) if buf.filled().is_empty() => {
63                Poll::Ready(Ok(StreamResult::Dropped))
64            }
65            Poll::Ready(Ok(())) => {
66                let n = buf.filled().len();
67                dst.mark_written(n);
68                Poll::Ready(Ok(StreamResult::Completed))
69            }
70            Poll::Ready(Err(e)) => {
71                let _ = self
72                    .result_tx
73                    .take()
74                    .unwrap()
75                    .send(io_error_to_error_code(e));
76                Poll::Ready(Ok(StreamResult::Dropped))
77            }
78            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
79            Poll::Pending => Poll::Pending,
80        }
81    }
82}
83
84struct OutputStreamConsumer {
85    tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
86    result_tx: Option<oneshot::Sender<ErrorCode>>,
87}
88
89impl<D> StreamConsumer<D> for OutputStreamConsumer {
90    type Item = u8;
91
92    fn poll_consume(
93        mut self: Pin<&mut Self>,
94        cx: &mut Context<'_>,
95        store: StoreContextMut<D>,
96        src: Source<Self::Item>,
97        finish: bool,
98    ) -> Poll<wasmtime::Result<StreamResult>> {
99        let mut src = src.as_direct(store);
100        let buf = src.remaining();
101
102        // If the source buffer is empty then this is a request on behalf of
103        // the guest to wait for this output stream to be writable. The
104        // `AsyncWrite` trait abstraction does not provide the ability to await
105        // this event so we're forced to basically just lie here and say we're
106        // ready write data later.
107        //
108        // See WebAssembly/component-model#561 for some more information.
109        if buf.len() == 0 {
110            return Poll::Ready(Ok(StreamResult::Completed));
111        }
112        match self.tx.as_mut().poll_write(cx, buf) {
113            Poll::Ready(Ok(n)) => {
114                src.mark_read(n);
115                Poll::Ready(Ok(StreamResult::Completed))
116            }
117            Poll::Ready(Err(e)) => {
118                let _ = self
119                    .result_tx
120                    .take()
121                    .unwrap()
122                    .send(io_error_to_error_code(e));
123                Poll::Ready(Ok(StreamResult::Dropped))
124            }
125            Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
126            Poll::Pending => Poll::Pending,
127        }
128    }
129}
130
131impl terminal_input::Host for WasiCliCtxView<'_> {}
132impl terminal_output::Host for WasiCliCtxView<'_> {}
133
134impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
135    fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
136        self.table
137            .delete(rep)
138            .context("failed to delete terminal input resource from table")?;
139        Ok(())
140    }
141}
142
143impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
144    fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
145        self.table
146            .delete(rep)
147            .context("failed to delete terminal output resource from table")?;
148        Ok(())
149    }
150}
151
152impl terminal_stdin::Host for WasiCliCtxView<'_> {
153    fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
154        if self.ctx.stdin.is_terminal() {
155            let fd = self
156                .table
157                .push(TerminalInput)
158                .context("failed to push terminal stdin resource to table")?;
159            Ok(Some(fd))
160        } else {
161            Ok(None)
162        }
163    }
164}
165
166impl terminal_stdout::Host for WasiCliCtxView<'_> {
167    fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
168        if self.ctx.stdout.is_terminal() {
169            let fd = self
170                .table
171                .push(TerminalOutput)
172                .context("failed to push terminal stdout resource to table")?;
173            Ok(Some(fd))
174        } else {
175            Ok(None)
176        }
177    }
178}
179
180impl terminal_stderr::Host for WasiCliCtxView<'_> {
181    fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
182        if self.ctx.stderr.is_terminal() {
183            let fd = self
184                .table
185                .push(TerminalOutput)
186                .context("failed to push terminal stderr resource to table")?;
187            Ok(Some(fd))
188        } else {
189            Ok(None)
190        }
191    }
192}
193
194impl stdin::HostWithStore for WasiCli {
195    fn read_via_stream<U>(
196        mut store: Access<U, Self>,
197    ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
198        let rx = store.get().ctx.stdin.async_stream();
199        let (result_tx, result_rx) = oneshot::channel();
200        let stream = StreamReader::new(
201            &mut store,
202            InputStreamProducer {
203                rx: Box::into_pin(rx),
204                result_tx: Some(result_tx),
205            },
206        );
207        let future = FutureReader::new(&mut store, async {
208            wasmtime::error::Ok(match result_rx.await {
209                Ok(err) => Err(err),
210                Err(_) => Ok(()),
211            })
212        });
213        Ok((stream, future))
214    }
215}
216
217impl stdin::Host for WasiCliCtxView<'_> {}
218
219impl stdout::HostWithStore for WasiCli {
220    async fn write_via_stream<U>(
221        store: &Accessor<U, Self>,
222        data: StreamReader<u8>,
223    ) -> wasmtime::Result<Result<(), ErrorCode>> {
224        let (result_tx, result_rx) = oneshot::channel();
225        store.with(|mut store| {
226            let tx = store.get().ctx.stdout.async_stream();
227            data.pipe(
228                store,
229                OutputStreamConsumer {
230                    tx: Box::into_pin(tx),
231                    result_tx: Some(result_tx),
232                },
233            );
234        });
235        Ok(match result_rx.await {
236            Ok(err) => Err(err),
237            Err(_) => Ok(()),
238        })
239    }
240}
241
242impl stdout::Host for WasiCliCtxView<'_> {}
243
244impl stderr::HostWithStore for WasiCli {
245    async fn write_via_stream<U>(
246        store: &Accessor<U, Self>,
247        data: StreamReader<u8>,
248    ) -> wasmtime::Result<Result<(), ErrorCode>> {
249        let (result_tx, result_rx) = oneshot::channel();
250        store.with(|mut store| {
251            let tx = store.get().ctx.stderr.async_stream();
252            data.pipe(
253                store,
254                OutputStreamConsumer {
255                    tx: Box::into_pin(tx),
256                    result_tx: Some(result_tx),
257                },
258            );
259        });
260        Ok(match result_rx.await {
261            Ok(err) => Err(err),
262            Err(_) => Ok(()),
263        })
264    }
265}
266
267impl stderr::Host for WasiCliCtxView<'_> {}
268
269impl environment::Host for WasiCliCtxView<'_> {
270    fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
271        Ok(self.ctx.environment.clone())
272    }
273
274    fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
275        Ok(self.ctx.arguments.clone())
276    }
277
278    fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
279        Ok(self.ctx.initial_cwd.clone())
280    }
281}
282
283impl exit::Host for WasiCliCtxView<'_> {
284    fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
285        let status = match status {
286            Ok(()) => 0,
287            Err(()) => 1,
288        };
289        Err(format_err!(I32Exit(status)))
290    }
291
292    fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
293        Err(format_err!(I32Exit(status_code.into())))
294    }
295}