1use crate::I32Exit;
2use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3use crate::p3::DEFAULT_BUFFER_CAPACITY;
4use crate::p3::bindings::cli::types::ErrorCode;
5use crate::p3::bindings::cli::{
6 environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7 terminal_stdin, terminal_stdout,
8};
9use crate::p3::cli::{TerminalInput, TerminalOutput};
10use anyhow::{Context as _, anyhow};
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll};
14use std::io::{self, Cursor};
15use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
16use tokio::sync::oneshot;
17use wasmtime::component::{
18 Accessor, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
19 StreamReader, StreamResult,
20};
21use wasmtime::{AsContextMut as _, StoreContextMut};
22
23struct InputStreamProducer {
24 rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
25 result_tx: Option<oneshot::Sender<ErrorCode>>,
26}
27
28fn io_error_to_error_code(err: io::Error) -> ErrorCode {
29 match err.kind() {
30 io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
31 other => {
32 tracing::warn!("stdio error: {other}");
33 ErrorCode::Io
34 }
35 }
36}
37
38impl<D> StreamProducer<D> for InputStreamProducer {
39 type Item = u8;
40 type Buffer = Cursor<BytesMut>;
41
42 fn poll_produce<'a>(
43 mut self: Pin<&mut Self>,
44 cx: &mut Context<'_>,
45 mut store: StoreContextMut<'a, D>,
46 dst: Destination<'a, Self::Item, Self::Buffer>,
47 finish: bool,
48 ) -> Poll<wasmtime::Result<StreamResult>> {
49 if dst.remaining(store.as_context_mut()) == Some(0) {
57 return Poll::Ready(Ok(StreamResult::Completed));
58 }
59
60 let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
61 let mut buf = ReadBuf::new(dst.remaining());
62 match self.rx.as_mut().poll_read(cx, &mut buf) {
63 Poll::Ready(Ok(())) if buf.filled().is_empty() => {
64 Poll::Ready(Ok(StreamResult::Dropped))
65 }
66 Poll::Ready(Ok(())) => {
67 let n = buf.filled().len();
68 dst.mark_written(n);
69 Poll::Ready(Ok(StreamResult::Completed))
70 }
71 Poll::Ready(Err(e)) => {
72 let _ = self
73 .result_tx
74 .take()
75 .unwrap()
76 .send(io_error_to_error_code(e));
77 Poll::Ready(Ok(StreamResult::Dropped))
78 }
79 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
80 Poll::Pending => Poll::Pending,
81 }
82 }
83}
84
85struct OutputStreamConsumer {
86 tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
87 result_tx: Option<oneshot::Sender<ErrorCode>>,
88}
89
90impl<D> StreamConsumer<D> for OutputStreamConsumer {
91 type Item = u8;
92
93 fn poll_consume(
94 mut self: Pin<&mut Self>,
95 cx: &mut Context<'_>,
96 store: StoreContextMut<D>,
97 src: Source<Self::Item>,
98 finish: bool,
99 ) -> Poll<wasmtime::Result<StreamResult>> {
100 let mut src = src.as_direct(store);
101 let buf = src.remaining();
102
103 if buf.len() == 0 {
111 return Poll::Ready(Ok(StreamResult::Completed));
112 }
113 match self.tx.as_mut().poll_write(cx, buf) {
114 Poll::Ready(Ok(n)) => {
115 src.mark_read(n);
116 Poll::Ready(Ok(StreamResult::Completed))
117 }
118 Poll::Ready(Err(e)) => {
119 let _ = self
120 .result_tx
121 .take()
122 .unwrap()
123 .send(io_error_to_error_code(e));
124 Poll::Ready(Ok(StreamResult::Dropped))
125 }
126 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
127 Poll::Pending => Poll::Pending,
128 }
129 }
130}
131
132impl terminal_input::Host for WasiCliCtxView<'_> {}
133impl terminal_output::Host for WasiCliCtxView<'_> {}
134
135impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
136 fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
137 self.table
138 .delete(rep)
139 .context("failed to delete terminal input resource from table")?;
140 Ok(())
141 }
142}
143
144impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
145 fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
146 self.table
147 .delete(rep)
148 .context("failed to delete terminal output resource from table")?;
149 Ok(())
150 }
151}
152
153impl terminal_stdin::Host for WasiCliCtxView<'_> {
154 fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
155 if self.ctx.stdin.is_terminal() {
156 let fd = self
157 .table
158 .push(TerminalInput)
159 .context("failed to push terminal stdin resource to table")?;
160 Ok(Some(fd))
161 } else {
162 Ok(None)
163 }
164 }
165}
166
167impl terminal_stdout::Host for WasiCliCtxView<'_> {
168 fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
169 if self.ctx.stdout.is_terminal() {
170 let fd = self
171 .table
172 .push(TerminalOutput)
173 .context("failed to push terminal stdout resource to table")?;
174 Ok(Some(fd))
175 } else {
176 Ok(None)
177 }
178 }
179}
180
181impl terminal_stderr::Host for WasiCliCtxView<'_> {
182 fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
183 if self.ctx.stderr.is_terminal() {
184 let fd = self
185 .table
186 .push(TerminalOutput)
187 .context("failed to push terminal stderr resource to table")?;
188 Ok(Some(fd))
189 } else {
190 Ok(None)
191 }
192 }
193}
194
195impl stdin::HostWithStore for WasiCli {
196 async fn read_via_stream<U>(
197 store: &Accessor<U, Self>,
198 ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
199 store.with(|mut store| {
200 let rx = store.get().ctx.stdin.async_stream();
201 let (result_tx, result_rx) = oneshot::channel();
202 let stream = StreamReader::new(
203 &mut store,
204 InputStreamProducer {
205 rx: Box::into_pin(rx),
206 result_tx: Some(result_tx),
207 },
208 );
209 let future = FutureReader::new(&mut store, async {
210 anyhow::Ok(match result_rx.await {
211 Ok(err) => Err(err),
212 Err(_) => Ok(()),
213 })
214 });
215 Ok((stream, future))
216 })
217 }
218}
219
220impl stdin::Host for WasiCliCtxView<'_> {}
221
222impl stdout::HostWithStore for WasiCli {
223 async fn write_via_stream<U>(
224 store: &Accessor<U, Self>,
225 data: StreamReader<u8>,
226 ) -> wasmtime::Result<Result<(), ErrorCode>> {
227 let (result_tx, result_rx) = oneshot::channel();
228 store.with(|mut store| {
229 let tx = store.get().ctx.stdout.async_stream();
230 data.pipe(
231 store,
232 OutputStreamConsumer {
233 tx: Box::into_pin(tx),
234 result_tx: Some(result_tx),
235 },
236 );
237 });
238 Ok(match result_rx.await {
239 Ok(err) => Err(err),
240 Err(_) => Ok(()),
241 })
242 }
243}
244
245impl stdout::Host for WasiCliCtxView<'_> {}
246
247impl stderr::HostWithStore for WasiCli {
248 async fn write_via_stream<U>(
249 store: &Accessor<U, Self>,
250 data: StreamReader<u8>,
251 ) -> wasmtime::Result<Result<(), ErrorCode>> {
252 let (result_tx, result_rx) = oneshot::channel();
253 store.with(|mut store| {
254 let tx = store.get().ctx.stderr.async_stream();
255 data.pipe(
256 store,
257 OutputStreamConsumer {
258 tx: Box::into_pin(tx),
259 result_tx: Some(result_tx),
260 },
261 );
262 });
263 Ok(match result_rx.await {
264 Ok(err) => Err(err),
265 Err(_) => Ok(()),
266 })
267 }
268}
269
270impl stderr::Host for WasiCliCtxView<'_> {}
271
272impl environment::Host for WasiCliCtxView<'_> {
273 fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
274 Ok(self.ctx.environment.clone())
275 }
276
277 fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
278 Ok(self.ctx.arguments.clone())
279 }
280
281 fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
282 Ok(self.ctx.initial_cwd.clone())
283 }
284}
285
286impl exit::Host for WasiCliCtxView<'_> {
287 fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
288 let status = match status {
289 Ok(()) => 0,
290 Err(()) => 1,
291 };
292 Err(anyhow!(I32Exit(status)))
293 }
294
295 fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
296 Err(anyhow!(I32Exit(status_code.into())))
297 }
298}