1use crate::I32Exit;
2use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3use crate::p3::DEFAULT_BUFFER_CAPACITY;
4use crate::p3::bindings::cli::types::ErrorCode;
5use crate::p3::bindings::cli::{
6 environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7 terminal_stdin, terminal_stdout,
8};
9use crate::p3::cli::{TerminalInput, TerminalOutput};
10use bytes::BytesMut;
11use core::pin::Pin;
12use core::task::{Context, Poll};
13use std::io::{self, Cursor};
14use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15use tokio::sync::oneshot;
16use wasmtime::component::{
17 Access, Accessor, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
18 StreamReader, StreamResult,
19};
20use wasmtime::{AsContextMut as _, StoreContextMut, error::Context as _, format_err};
21
22struct InputStreamProducer {
23 rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
24 result_tx: Option<oneshot::Sender<ErrorCode>>,
25}
26
27fn io_error_to_error_code(err: io::Error) -> ErrorCode {
28 match err.kind() {
29 io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
30 other => {
31 tracing::warn!("stdio error: {other}");
32 ErrorCode::Io
33 }
34 }
35}
36
37impl<D> StreamProducer<D> for InputStreamProducer {
38 type Item = u8;
39 type Buffer = Cursor<BytesMut>;
40
41 fn poll_produce<'a>(
42 mut self: Pin<&mut Self>,
43 cx: &mut Context<'_>,
44 mut store: StoreContextMut<'a, D>,
45 dst: Destination<'a, Self::Item, Self::Buffer>,
46 finish: bool,
47 ) -> Poll<wasmtime::Result<StreamResult>> {
48 if dst.remaining(store.as_context_mut()) == Some(0) {
56 return Poll::Ready(Ok(StreamResult::Completed));
57 }
58
59 let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
60 let mut buf = ReadBuf::new(dst.remaining());
61 match self.rx.as_mut().poll_read(cx, &mut buf) {
62 Poll::Ready(Ok(())) if buf.filled().is_empty() => {
63 Poll::Ready(Ok(StreamResult::Dropped))
64 }
65 Poll::Ready(Ok(())) => {
66 let n = buf.filled().len();
67 dst.mark_written(n);
68 Poll::Ready(Ok(StreamResult::Completed))
69 }
70 Poll::Ready(Err(e)) => {
71 let _ = self
72 .result_tx
73 .take()
74 .unwrap()
75 .send(io_error_to_error_code(e));
76 Poll::Ready(Ok(StreamResult::Dropped))
77 }
78 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
79 Poll::Pending => Poll::Pending,
80 }
81 }
82}
83
84struct OutputStreamConsumer {
85 tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
86 result_tx: Option<oneshot::Sender<ErrorCode>>,
87}
88
89impl<D> StreamConsumer<D> for OutputStreamConsumer {
90 type Item = u8;
91
92 fn poll_consume(
93 mut self: Pin<&mut Self>,
94 cx: &mut Context<'_>,
95 store: StoreContextMut<D>,
96 src: Source<Self::Item>,
97 finish: bool,
98 ) -> Poll<wasmtime::Result<StreamResult>> {
99 let mut src = src.as_direct(store);
100 let buf = src.remaining();
101
102 if buf.len() == 0 {
110 return Poll::Ready(Ok(StreamResult::Completed));
111 }
112 match self.tx.as_mut().poll_write(cx, buf) {
113 Poll::Ready(Ok(n)) => {
114 src.mark_read(n);
115 Poll::Ready(Ok(StreamResult::Completed))
116 }
117 Poll::Ready(Err(e)) => {
118 let _ = self
119 .result_tx
120 .take()
121 .unwrap()
122 .send(io_error_to_error_code(e));
123 Poll::Ready(Ok(StreamResult::Dropped))
124 }
125 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
126 Poll::Pending => Poll::Pending,
127 }
128 }
129}
130
131impl terminal_input::Host for WasiCliCtxView<'_> {}
132impl terminal_output::Host for WasiCliCtxView<'_> {}
133
134impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
135 fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
136 self.table
137 .delete(rep)
138 .context("failed to delete terminal input resource from table")?;
139 Ok(())
140 }
141}
142
143impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
144 fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
145 self.table
146 .delete(rep)
147 .context("failed to delete terminal output resource from table")?;
148 Ok(())
149 }
150}
151
152impl terminal_stdin::Host for WasiCliCtxView<'_> {
153 fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
154 if self.ctx.stdin.is_terminal() {
155 let fd = self
156 .table
157 .push(TerminalInput)
158 .context("failed to push terminal stdin resource to table")?;
159 Ok(Some(fd))
160 } else {
161 Ok(None)
162 }
163 }
164}
165
166impl terminal_stdout::Host for WasiCliCtxView<'_> {
167 fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
168 if self.ctx.stdout.is_terminal() {
169 let fd = self
170 .table
171 .push(TerminalOutput)
172 .context("failed to push terminal stdout resource to table")?;
173 Ok(Some(fd))
174 } else {
175 Ok(None)
176 }
177 }
178}
179
180impl terminal_stderr::Host for WasiCliCtxView<'_> {
181 fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
182 if self.ctx.stderr.is_terminal() {
183 let fd = self
184 .table
185 .push(TerminalOutput)
186 .context("failed to push terminal stderr resource to table")?;
187 Ok(Some(fd))
188 } else {
189 Ok(None)
190 }
191 }
192}
193
194impl stdin::HostWithStore for WasiCli {
195 fn read_via_stream<U>(
196 mut store: Access<U, Self>,
197 ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
198 let rx = store.get().ctx.stdin.async_stream();
199 let (result_tx, result_rx) = oneshot::channel();
200 let stream = StreamReader::new(
201 &mut store,
202 InputStreamProducer {
203 rx: Box::into_pin(rx),
204 result_tx: Some(result_tx),
205 },
206 );
207 let future = FutureReader::new(&mut store, async {
208 wasmtime::error::Ok(match result_rx.await {
209 Ok(err) => Err(err),
210 Err(_) => Ok(()),
211 })
212 });
213 Ok((stream, future))
214 }
215}
216
217impl stdin::Host for WasiCliCtxView<'_> {}
218
219impl stdout::HostWithStore for WasiCli {
220 async fn write_via_stream<U>(
221 store: &Accessor<U, Self>,
222 data: StreamReader<u8>,
223 ) -> wasmtime::Result<Result<(), ErrorCode>> {
224 let (result_tx, result_rx) = oneshot::channel();
225 store.with(|mut store| {
226 let tx = store.get().ctx.stdout.async_stream();
227 data.pipe(
228 store,
229 OutputStreamConsumer {
230 tx: Box::into_pin(tx),
231 result_tx: Some(result_tx),
232 },
233 );
234 });
235 Ok(match result_rx.await {
236 Ok(err) => Err(err),
237 Err(_) => Ok(()),
238 })
239 }
240}
241
242impl stdout::Host for WasiCliCtxView<'_> {}
243
244impl stderr::HostWithStore for WasiCli {
245 async fn write_via_stream<U>(
246 store: &Accessor<U, Self>,
247 data: StreamReader<u8>,
248 ) -> wasmtime::Result<Result<(), ErrorCode>> {
249 let (result_tx, result_rx) = oneshot::channel();
250 store.with(|mut store| {
251 let tx = store.get().ctx.stderr.async_stream();
252 data.pipe(
253 store,
254 OutputStreamConsumer {
255 tx: Box::into_pin(tx),
256 result_tx: Some(result_tx),
257 },
258 );
259 });
260 Ok(match result_rx.await {
261 Ok(err) => Err(err),
262 Err(_) => Ok(()),
263 })
264 }
265}
266
267impl stderr::Host for WasiCliCtxView<'_> {}
268
269impl environment::Host for WasiCliCtxView<'_> {
270 fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
271 Ok(self.ctx.environment.clone())
272 }
273
274 fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
275 Ok(self.ctx.arguments.clone())
276 }
277
278 fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
279 Ok(self.ctx.initial_cwd.clone())
280 }
281}
282
283impl exit::Host for WasiCliCtxView<'_> {
284 fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
285 let status = match status {
286 Ok(()) => 0,
287 Err(()) => 1,
288 };
289 Err(format_err!(I32Exit(status)))
290 }
291
292 fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
293 Err(format_err!(I32Exit(status_code.into())))
294 }
295}