1use crate::I32Exit;
2use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3use crate::p3::DEFAULT_BUFFER_CAPACITY;
4use crate::p3::bindings::cli::types::ErrorCode;
5use crate::p3::bindings::cli::{
6 environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7 terminal_stdin, terminal_stdout,
8};
9use crate::p3::cli::{TerminalInput, TerminalOutput};
10use anyhow::{Context as _, anyhow};
11use bytes::BytesMut;
12use core::pin::Pin;
13use core::task::{Context, Poll};
14use std::io::{self, Cursor};
15use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
16use tokio::sync::oneshot;
17use wasmtime::component::{
18 Access, Accessor, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
19 StreamReader, StreamResult,
20};
21use wasmtime::{AsContextMut as _, StoreContextMut};
22
23struct InputStreamProducer {
24 rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
25 result_tx: Option<oneshot::Sender<ErrorCode>>,
26}
27
28fn io_error_to_error_code(err: io::Error) -> ErrorCode {
29 match err.kind() {
30 io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
31 other => {
32 tracing::warn!("stdio error: {other}");
33 ErrorCode::Io
34 }
35 }
36}
37
38impl<D> StreamProducer<D> for InputStreamProducer {
39 type Item = u8;
40 type Buffer = Cursor<BytesMut>;
41
42 fn poll_produce<'a>(
43 mut self: Pin<&mut Self>,
44 cx: &mut Context<'_>,
45 mut store: StoreContextMut<'a, D>,
46 dst: Destination<'a, Self::Item, Self::Buffer>,
47 finish: bool,
48 ) -> Poll<wasmtime::Result<StreamResult>> {
49 if dst.remaining(store.as_context_mut()) == Some(0) {
57 return Poll::Ready(Ok(StreamResult::Completed));
58 }
59
60 let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
61 let mut buf = ReadBuf::new(dst.remaining());
62 match self.rx.as_mut().poll_read(cx, &mut buf) {
63 Poll::Ready(Ok(())) if buf.filled().is_empty() => {
64 Poll::Ready(Ok(StreamResult::Dropped))
65 }
66 Poll::Ready(Ok(())) => {
67 let n = buf.filled().len();
68 dst.mark_written(n);
69 Poll::Ready(Ok(StreamResult::Completed))
70 }
71 Poll::Ready(Err(e)) => {
72 let _ = self
73 .result_tx
74 .take()
75 .unwrap()
76 .send(io_error_to_error_code(e));
77 Poll::Ready(Ok(StreamResult::Dropped))
78 }
79 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
80 Poll::Pending => Poll::Pending,
81 }
82 }
83}
84
85struct OutputStreamConsumer {
86 tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
87 result_tx: Option<oneshot::Sender<ErrorCode>>,
88}
89
90impl<D> StreamConsumer<D> for OutputStreamConsumer {
91 type Item = u8;
92
93 fn poll_consume(
94 mut self: Pin<&mut Self>,
95 cx: &mut Context<'_>,
96 store: StoreContextMut<D>,
97 src: Source<Self::Item>,
98 finish: bool,
99 ) -> Poll<wasmtime::Result<StreamResult>> {
100 let mut src = src.as_direct(store);
101 let buf = src.remaining();
102
103 if buf.len() == 0 {
111 return Poll::Ready(Ok(StreamResult::Completed));
112 }
113 match self.tx.as_mut().poll_write(cx, buf) {
114 Poll::Ready(Ok(n)) => {
115 src.mark_read(n);
116 Poll::Ready(Ok(StreamResult::Completed))
117 }
118 Poll::Ready(Err(e)) => {
119 let _ = self
120 .result_tx
121 .take()
122 .unwrap()
123 .send(io_error_to_error_code(e));
124 Poll::Ready(Ok(StreamResult::Dropped))
125 }
126 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
127 Poll::Pending => Poll::Pending,
128 }
129 }
130}
131
132impl terminal_input::Host for WasiCliCtxView<'_> {}
133impl terminal_output::Host for WasiCliCtxView<'_> {}
134
135impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
136 fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
137 self.table
138 .delete(rep)
139 .context("failed to delete terminal input resource from table")?;
140 Ok(())
141 }
142}
143
144impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
145 fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
146 self.table
147 .delete(rep)
148 .context("failed to delete terminal output resource from table")?;
149 Ok(())
150 }
151}
152
153impl terminal_stdin::Host for WasiCliCtxView<'_> {
154 fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
155 if self.ctx.stdin.is_terminal() {
156 let fd = self
157 .table
158 .push(TerminalInput)
159 .context("failed to push terminal stdin resource to table")?;
160 Ok(Some(fd))
161 } else {
162 Ok(None)
163 }
164 }
165}
166
167impl terminal_stdout::Host for WasiCliCtxView<'_> {
168 fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
169 if self.ctx.stdout.is_terminal() {
170 let fd = self
171 .table
172 .push(TerminalOutput)
173 .context("failed to push terminal stdout resource to table")?;
174 Ok(Some(fd))
175 } else {
176 Ok(None)
177 }
178 }
179}
180
181impl terminal_stderr::Host for WasiCliCtxView<'_> {
182 fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
183 if self.ctx.stderr.is_terminal() {
184 let fd = self
185 .table
186 .push(TerminalOutput)
187 .context("failed to push terminal stderr resource to table")?;
188 Ok(Some(fd))
189 } else {
190 Ok(None)
191 }
192 }
193}
194
195impl stdin::HostWithStore for WasiCli {
196 fn read_via_stream<U>(
197 mut store: Access<U, Self>,
198 ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
199 let rx = store.get().ctx.stdin.async_stream();
200 let (result_tx, result_rx) = oneshot::channel();
201 let stream = StreamReader::new(
202 &mut store,
203 InputStreamProducer {
204 rx: Box::into_pin(rx),
205 result_tx: Some(result_tx),
206 },
207 );
208 let future = FutureReader::new(&mut store, async {
209 anyhow::Ok(match result_rx.await {
210 Ok(err) => Err(err),
211 Err(_) => Ok(()),
212 })
213 });
214 Ok((stream, future))
215 }
216}
217
218impl stdin::Host for WasiCliCtxView<'_> {}
219
220impl stdout::HostWithStore for WasiCli {
221 async fn write_via_stream<U>(
222 store: &Accessor<U, Self>,
223 data: StreamReader<u8>,
224 ) -> wasmtime::Result<Result<(), ErrorCode>> {
225 let (result_tx, result_rx) = oneshot::channel();
226 store.with(|mut store| {
227 let tx = store.get().ctx.stdout.async_stream();
228 data.pipe(
229 store,
230 OutputStreamConsumer {
231 tx: Box::into_pin(tx),
232 result_tx: Some(result_tx),
233 },
234 );
235 });
236 Ok(match result_rx.await {
237 Ok(err) => Err(err),
238 Err(_) => Ok(()),
239 })
240 }
241}
242
243impl stdout::Host for WasiCliCtxView<'_> {}
244
245impl stderr::HostWithStore for WasiCli {
246 async fn write_via_stream<U>(
247 store: &Accessor<U, Self>,
248 data: StreamReader<u8>,
249 ) -> wasmtime::Result<Result<(), ErrorCode>> {
250 let (result_tx, result_rx) = oneshot::channel();
251 store.with(|mut store| {
252 let tx = store.get().ctx.stderr.async_stream();
253 data.pipe(
254 store,
255 OutputStreamConsumer {
256 tx: Box::into_pin(tx),
257 result_tx: Some(result_tx),
258 },
259 );
260 });
261 Ok(match result_rx.await {
262 Ok(err) => Err(err),
263 Err(_) => Ok(()),
264 })
265 }
266}
267
268impl stderr::Host for WasiCliCtxView<'_> {}
269
270impl environment::Host for WasiCliCtxView<'_> {
271 fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
272 Ok(self.ctx.environment.clone())
273 }
274
275 fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
276 Ok(self.ctx.arguments.clone())
277 }
278
279 fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
280 Ok(self.ctx.initial_cwd.clone())
281 }
282}
283
284impl exit::Host for WasiCliCtxView<'_> {
285 fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
286 let status = match status {
287 Ok(()) => 0,
288 Err(()) => 1,
289 };
290 Err(anyhow!(I32Exit(status)))
291 }
292
293 fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
294 Err(anyhow!(I32Exit(status_code.into())))
295 }
296}