wasmtime_wasi_io/
impls.rs1use crate::bindings::wasi::io::{error, poll, streams};
2use crate::poll::{subscribe, DynFuture, DynPollable, MakeFuture};
3use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult};
4use crate::{IoImpl, IoView};
5use alloc::collections::BTreeMap;
6use alloc::string::String;
7use alloc::vec::Vec;
8use anyhow::{anyhow, Result};
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use wasmtime::component::Resource;
13
14impl<T: IoView> poll::Host for IoImpl<T> {
15 async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> {
16 type ReadylistIndex = u32;
17
18 if pollables.is_empty() {
19 return Err(anyhow!("empty poll list"));
20 }
21
22 let table = self.table();
23
24 let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new();
25
26 for (ix, p) in pollables.iter().enumerate() {
27 let ix: u32 = ix.try_into()?;
28
29 let pollable = table.get(p)?;
30 let (_, list) = table_futures
31 .entry(pollable.index)
32 .or_insert((pollable.make_future, Vec::new()));
33 list.push(ix);
34 }
35
36 let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
37 for (entry, (make_future, readylist_indices)) in table.iter_entries(table_futures) {
38 let entry = entry?;
39 futures.push((make_future(entry), readylist_indices));
40 }
41
42 struct PollList<'a> {
43 futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>,
44 }
45 impl<'a> Future for PollList<'a> {
46 type Output = Vec<u32>;
47
48 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49 let mut any_ready = false;
50 let mut results = Vec::new();
51 for (fut, readylist_indicies) in self.futures.iter_mut() {
52 match fut.as_mut().poll(cx) {
53 Poll::Ready(()) => {
54 results.extend_from_slice(readylist_indicies);
55 any_ready = true;
56 }
57 Poll::Pending => {}
58 }
59 }
60 if any_ready {
61 Poll::Ready(results)
62 } else {
63 Poll::Pending
64 }
65 }
66 }
67
68 Ok(PollList { futures }.await)
69 }
70}
71
72impl<T: IoView> crate::bindings::wasi::io::poll::HostPollable for IoImpl<T> {
73 async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
74 let table = self.table();
75 let pollable = table.get(&pollable)?;
76 let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
77 ready.await;
78 Ok(())
79 }
80 async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> {
81 let table = self.table();
82 let pollable = table.get(&pollable)?;
83 let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
84 futures::pin_mut!(ready);
85 Ok(matches!(
86 futures::future::poll_immediate(ready).await,
87 Some(())
88 ))
89 }
90 fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
91 let pollable = self.table().delete(pollable)?;
92 if let Some(delete) = pollable.remove_index_on_delete {
93 delete(self.table(), pollable.index)?;
94 }
95 Ok(())
96 }
97}
98
99impl<T: IoView> error::Host for IoImpl<T> {}
100
101impl<T: IoView> streams::Host for IoImpl<T> {
102 fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> {
103 match err {
104 StreamError::Closed => Ok(streams::StreamError::Closed),
105 StreamError::LastOperationFailed(e) => Ok(streams::StreamError::LastOperationFailed(
106 self.table().push(e)?,
107 )),
108 StreamError::Trap(e) => Err(e),
109 }
110 }
111}
112
113impl<T: IoView> error::HostError for IoImpl<T> {
114 fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> {
115 self.table().delete(err)?;
116 Ok(())
117 }
118
119 fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> {
120 Ok(alloc::format!("{:?}", self.table().get(&err)?))
121 }
122}
123
124impl<T: IoView> streams::HostOutputStream for IoImpl<T> {
125 async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> {
126 self.table().delete(stream)?.cancel().await;
127 Ok(())
128 }
129
130 fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> {
131 let bytes = self.table().get_mut(&stream)?.check_write()?;
132 Ok(bytes as u64)
133 }
134
135 fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
136 self.table().get_mut(&stream)?.write(bytes.into())?;
137 Ok(())
138 }
139
140 fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> {
141 subscribe(self.table(), stream)
142 }
143
144 async fn blocking_write_and_flush(
145 &mut self,
146 stream: Resource<DynOutputStream>,
147 bytes: Vec<u8>,
148 ) -> StreamResult<()> {
149 if bytes.len() > 4096 {
150 return Err(StreamError::trap(
151 "Buffer too large for blocking-write-and-flush (expected at most 4096)",
152 ));
153 }
154
155 self.table()
156 .get_mut(&stream)?
157 .blocking_write_and_flush(bytes.into())
158 .await
159 }
160
161 async fn blocking_write_zeroes_and_flush(
162 &mut self,
163 stream: Resource<DynOutputStream>,
164 len: u64,
165 ) -> StreamResult<()> {
166 if len > 4096 {
167 return Err(StreamError::trap(
168 "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)",
169 ));
170 }
171
172 self.table()
173 .get_mut(&stream)?
174 .blocking_write_zeroes_and_flush(len as usize)
175 .await
176 }
177
178 fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> {
179 self.table().get_mut(&stream)?.write_zeroes(len as usize)?;
180 Ok(())
181 }
182
183 fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
184 self.table().get_mut(&stream)?.flush()?;
185 Ok(())
186 }
187
188 async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
189 let s = self.table().get_mut(&stream)?;
190 s.flush()?;
191 s.write_ready().await?;
192 Ok(())
193 }
194
195 fn splice(
196 &mut self,
197 dest: Resource<DynOutputStream>,
198 src: Resource<DynInputStream>,
199 len: u64,
200 ) -> StreamResult<u64> {
201 let len = len.try_into().unwrap_or(usize::MAX);
202
203 let permit = {
204 let output = self.table().get_mut(&dest)?;
205 output.check_write()?
206 };
207 let len = len.min(permit);
208 if len == 0 {
209 return Ok(0);
210 }
211
212 let contents = self.table().get_mut(&src)?.read(len)?;
213
214 let len = contents.len();
215 if len == 0 {
216 return Ok(0);
217 }
218
219 let output = self.table().get_mut(&dest)?;
220 output.write(contents)?;
221 Ok(len.try_into().expect("usize can fit in u64"))
222 }
223
224 async fn blocking_splice(
225 &mut self,
226 dest: Resource<DynOutputStream>,
227 src: Resource<DynInputStream>,
228 len: u64,
229 ) -> StreamResult<u64> {
230 let len = len.try_into().unwrap_or(usize::MAX);
231
232 let permit = {
233 let output = self.table().get_mut(&dest)?;
234 output.write_ready().await?
235 };
236 let len = len.min(permit);
237 if len == 0 {
238 return Ok(0);
239 }
240
241 let contents = self.table().get_mut(&src)?.blocking_read(len).await?;
242
243 let len = contents.len();
244 if len == 0 {
245 return Ok(0);
246 }
247
248 let output = self.table().get_mut(&dest)?;
249 output.blocking_write_and_flush(contents).await?;
250 Ok(len.try_into().expect("usize can fit in u64"))
251 }
252}
253
254impl<T: IoView> streams::HostInputStream for IoImpl<T> {
255 async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> {
256 self.table().delete(stream)?.cancel().await;
257 Ok(())
258 }
259
260 fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> {
261 let len = len.try_into().unwrap_or(usize::MAX);
262 let bytes = self.table().get_mut(&stream)?.read(len)?;
263 debug_assert!(bytes.len() <= len);
264 Ok(bytes.into())
265 }
266
267 async fn blocking_read(
268 &mut self,
269 stream: Resource<DynInputStream>,
270 len: u64,
271 ) -> StreamResult<Vec<u8>> {
272 let len = len.try_into().unwrap_or(usize::MAX);
273 let bytes = self.table().get_mut(&stream)?.blocking_read(len).await?;
274 debug_assert!(bytes.len() <= len);
275 Ok(bytes.into())
276 }
277
278 fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> {
279 let len = len.try_into().unwrap_or(usize::MAX);
280 let written = self.table().get_mut(&stream)?.skip(len)?;
281 Ok(written.try_into().expect("usize always fits in u64"))
282 }
283
284 async fn blocking_skip(
285 &mut self,
286 stream: Resource<DynInputStream>,
287 len: u64,
288 ) -> StreamResult<u64> {
289 let len = len.try_into().unwrap_or(usize::MAX);
290 let written = self.table().get_mut(&stream)?.blocking_skip(len).await?;
291 Ok(written.try_into().expect("usize always fits in u64"))
292 }
293
294 fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> {
295 crate::poll::subscribe(self.table(), stream)
296 }
297}