1use crate::I32Exit;
2use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3use crate::p3::DEFAULT_BUFFER_CAPACITY;
4use crate::p3::bindings::cli::types::ErrorCode;
5use crate::p3::bindings::cli::{
6 environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7 terminal_stdin, terminal_stdout,
8};
9use crate::p3::cli::{TerminalInput, TerminalOutput};
10use bytes::BytesMut;
11use core::pin::Pin;
12use core::task::{Context, Poll};
13use std::io;
14use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15use tokio::sync::oneshot;
16use wasmtime::component::{
17 Access, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
18 StreamReader, StreamResult,
19};
20use wasmtime::{AsContextMut as _, StoreContextMut, error::Context as _, format_err};
21
22struct InputStreamProducer {
23 rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
24 result_tx: Option<oneshot::Sender<ErrorCode>>,
25}
26
27fn io_error_to_error_code(err: io::Error) -> ErrorCode {
28 match err.kind() {
29 io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
30 other => {
31 tracing::warn!("stdio error: {other}");
32 ErrorCode::Io
33 }
34 }
35}
36
37impl<D> StreamProducer<D> for InputStreamProducer {
38 type Item = u8;
39 type Buffer = BytesMut;
40
41 fn poll_produce<'a>(
42 mut self: Pin<&mut Self>,
43 cx: &mut Context<'_>,
44 mut store: StoreContextMut<'a, D>,
45 dst: Destination<'a, Self::Item, Self::Buffer>,
46 finish: bool,
47 ) -> Poll<wasmtime::Result<StreamResult>> {
48 if dst.remaining(store.as_context_mut()) == Some(0) {
56 return Poll::Ready(Ok(StreamResult::Completed));
57 }
58
59 let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
60 let mut buf = ReadBuf::new(dst.remaining());
61 match self.rx.as_mut().poll_read(cx, &mut buf) {
62 Poll::Ready(Ok(())) if buf.filled().is_empty() => {
63 Poll::Ready(Ok(StreamResult::Dropped))
64 }
65 Poll::Ready(Ok(())) => {
66 let n = buf.filled().len();
67 dst.mark_written(n);
68 Poll::Ready(Ok(StreamResult::Completed))
69 }
70 Poll::Ready(Err(e)) => {
71 let _ = self
72 .result_tx
73 .take()
74 .unwrap()
75 .send(io_error_to_error_code(e));
76 Poll::Ready(Ok(StreamResult::Dropped))
77 }
78 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
79 Poll::Pending => Poll::Pending,
80 }
81 }
82}
83
84struct OutputStreamConsumer {
85 tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
86 result_tx: Option<oneshot::Sender<ErrorCode>>,
87 flush_pending: bool,
88}
89
90impl OutputStreamConsumer {
91 fn poll_flush(
92 &mut self,
93 cx: &mut Context<'_>,
94 finish: bool,
95 ) -> Poll<wasmtime::Result<StreamResult>> {
96 match self.tx.as_mut().poll_flush(cx) {
97 Poll::Ready(Ok(())) => {
98 self.flush_pending = false;
99 Poll::Ready(Ok(StreamResult::Completed))
100 }
101 Poll::Ready(Err(e)) => self.dropped(e),
102 Poll::Pending => {
103 if finish {
104 self.flush_pending = false;
105 Poll::Ready(Ok(StreamResult::Cancelled))
106 } else {
107 self.flush_pending = true;
108 Poll::Pending
109 }
110 }
111 }
112 }
113
114 fn dropped(&mut self, err: io::Error) -> Poll<wasmtime::Result<StreamResult>> {
115 if let Some(tx) = self.result_tx.take() {
116 let _ = tx.send(io_error_to_error_code(err));
117 }
118 Poll::Ready(Ok(StreamResult::Dropped))
119 }
120}
121
122impl<D> StreamConsumer<D> for OutputStreamConsumer {
123 type Item = u8;
124
125 fn poll_consume(
126 mut self: Pin<&mut Self>,
127 cx: &mut Context<'_>,
128 store: StoreContextMut<D>,
129 src: Source<Self::Item>,
130 finish: bool,
131 ) -> Poll<wasmtime::Result<StreamResult>> {
132 if self.flush_pending {
133 return self.poll_flush(cx, finish);
134 }
135
136 let mut src = src.as_direct(store);
137 let buf = src.remaining();
138
139 if buf.len() == 0 {
147 return Poll::Ready(Ok(StreamResult::Completed));
148 }
149 match self.tx.as_mut().poll_write(cx, buf) {
150 Poll::Ready(Ok(0)) => self.dropped(io::ErrorKind::WriteZero.into()),
151 Poll::Ready(Ok(n)) => {
152 src.mark_read(n);
153 self.poll_flush(cx, finish)
154 }
155 Poll::Ready(Err(e)) => self.dropped(e),
156 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
157 Poll::Pending => Poll::Pending,
158 }
159 }
160}
161
162impl terminal_input::Host for WasiCliCtxView<'_> {}
163impl terminal_output::Host for WasiCliCtxView<'_> {}
164
165impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
166 fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
167 self.table
168 .delete(rep)
169 .context("failed to delete terminal input resource from table")?;
170 Ok(())
171 }
172}
173
174impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
175 fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
176 self.table
177 .delete(rep)
178 .context("failed to delete terminal output resource from table")?;
179 Ok(())
180 }
181}
182
183impl terminal_stdin::Host for WasiCliCtxView<'_> {
184 fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
185 if self.ctx.stdin.is_terminal() {
186 let fd = self
187 .table
188 .push(TerminalInput)
189 .context("failed to push terminal stdin resource to table")?;
190 Ok(Some(fd))
191 } else {
192 Ok(None)
193 }
194 }
195}
196
197impl terminal_stdout::Host for WasiCliCtxView<'_> {
198 fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
199 if self.ctx.stdout.is_terminal() {
200 let fd = self
201 .table
202 .push(TerminalOutput)
203 .context("failed to push terminal stdout resource to table")?;
204 Ok(Some(fd))
205 } else {
206 Ok(None)
207 }
208 }
209}
210
211impl terminal_stderr::Host for WasiCliCtxView<'_> {
212 fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
213 if self.ctx.stderr.is_terminal() {
214 let fd = self
215 .table
216 .push(TerminalOutput)
217 .context("failed to push terminal stderr resource to table")?;
218 Ok(Some(fd))
219 } else {
220 Ok(None)
221 }
222 }
223}
224
225impl<U> stdin::HostWithStore<U> for WasiCli {
226 fn read_via_stream(
227 mut store: Access<U, Self>,
228 ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
229 let rx = store.get().ctx.stdin.async_stream();
230 let (result_tx, result_rx) = oneshot::channel();
231 let stream = StreamReader::new(
232 &mut store,
233 InputStreamProducer {
234 rx: Box::into_pin(rx),
235 result_tx: Some(result_tx),
236 },
237 )?;
238 let future = FutureReader::new(&mut store, async {
239 wasmtime::error::Ok(match result_rx.await {
240 Ok(err) => Err(err),
241 Err(_) => Ok(()),
242 })
243 })?;
244 Ok((stream, future))
245 }
246}
247
248impl stdin::Host for WasiCliCtxView<'_> {}
249
250impl<U> stdout::HostWithStore<U> for WasiCli {
251 fn write_via_stream(
252 mut store: Access<'_, U, Self>,
253 data: StreamReader<u8>,
254 ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
255 let (result_tx, result_rx) = oneshot::channel();
256 let tx = store.get().ctx.stdout.async_stream();
257 data.pipe(
258 &mut store,
259 OutputStreamConsumer {
260 tx: Box::into_pin(tx),
261 result_tx: Some(result_tx),
262 flush_pending: false,
263 },
264 )?;
265 FutureReader::new(&mut store, async {
266 wasmtime::error::Ok(match result_rx.await {
267 Ok(err) => Err(err),
268 Err(_) => Ok(()),
269 })
270 })
271 }
272}
273
274impl stdout::Host for WasiCliCtxView<'_> {}
275
276impl<U> stderr::HostWithStore<U> for WasiCli {
277 fn write_via_stream(
278 mut store: Access<'_, U, Self>,
279 data: StreamReader<u8>,
280 ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
281 let (result_tx, result_rx) = oneshot::channel();
282 let tx = store.get().ctx.stderr.async_stream();
283 data.pipe(
284 &mut store,
285 OutputStreamConsumer {
286 tx: Box::into_pin(tx),
287 result_tx: Some(result_tx),
288 flush_pending: false,
289 },
290 )?;
291 FutureReader::new(&mut store, async {
292 wasmtime::error::Ok(match result_rx.await {
293 Ok(err) => Err(err),
294 Err(_) => Ok(()),
295 })
296 })
297 }
298}
299
300impl stderr::Host for WasiCliCtxView<'_> {}
301
302impl environment::Host for WasiCliCtxView<'_> {
303 fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
304 Ok(self.ctx.environment.clone())
305 }
306
307 fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
308 Ok(self.ctx.arguments.clone())
309 }
310
311 fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
312 Ok(self.ctx.initial_cwd.clone())
313 }
314}
315
316impl exit::Host for WasiCliCtxView<'_> {
317 fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
318 let status = match status {
319 Ok(()) => 0,
320 Err(()) => 1,
321 };
322 Err(format_err!(I32Exit(status)))
323 }
324
325 fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
326 Err(format_err!(I32Exit(status_code.into())))
327 }
328}