wasmtime_wasi_io/
impls.rs1use crate::bindings::wasi::io::{error, poll, streams};
2use crate::poll::{DynFuture, DynPollable, MakeFuture, subscribe};
3use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult};
4use alloc::collections::BTreeMap;
5use alloc::string::String;
6use alloc::vec::Vec;
7use bytes::Bytes;
8use core::future::Future;
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use wasmtime::component::{Resource, ResourceTable};
12use wasmtime::{Result, format_err};
13
14impl poll::Host for ResourceTable {
15 async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> {
16 type ReadylistIndex = u32;
17
18 if pollables.is_empty() {
19 return Err(format_err!("empty poll list"));
20 }
21
22 let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new();
23
24 for (ix, p) in pollables.iter().enumerate() {
25 let ix: u32 = ix.try_into()?;
26
27 let pollable = self.get(p)?;
28 let (_, list) = table_futures
29 .entry(pollable.index)
30 .or_insert((pollable.make_future, Vec::new()));
31 list.push(ix);
32 }
33
34 let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
35 for (entry, (make_future, readylist_indices)) in self.iter_entries(table_futures) {
36 let entry = entry?;
37 futures.push((make_future(entry), readylist_indices));
38 }
39
40 struct PollList<'a> {
41 futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>,
42 }
43 impl<'a> Future for PollList<'a> {
44 type Output = Vec<u32>;
45
46 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47 let mut any_ready = false;
48 let mut results = Vec::new();
49 for (fut, readylist_indicies) in self.futures.iter_mut() {
50 match fut.as_mut().poll(cx) {
51 Poll::Ready(()) => {
52 results.extend_from_slice(readylist_indicies);
53 any_ready = true;
54 }
55 Poll::Pending => {}
56 }
57 }
58 if any_ready {
59 Poll::Ready(results)
60 } else {
61 Poll::Pending
62 }
63 }
64 }
65
66 Ok(PollList { futures }.await)
67 }
68}
69
70impl crate::bindings::wasi::io::poll::HostPollable for ResourceTable {
71 async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
72 let pollable = self.get(&pollable)?;
73 let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
74 ready.await;
75 Ok(())
76 }
77 async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> {
78 let pollable = self.get(&pollable)?;
79 let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
80 futures::pin_mut!(ready);
81 Ok(matches!(
82 futures::future::poll_immediate(ready).await,
83 Some(())
84 ))
85 }
86 fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
87 let pollable = self.delete(pollable)?;
88 if let Some(delete) = pollable.remove_index_on_delete {
89 delete(self, pollable.index)?;
90 }
91 Ok(())
92 }
93}
94
95impl error::Host for ResourceTable {}
96
97impl streams::Host for ResourceTable {
98 fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> {
99 match err {
100 StreamError::Closed => Ok(streams::StreamError::Closed),
101 StreamError::LastOperationFailed(e) => {
102 Ok(streams::StreamError::LastOperationFailed(self.push(e)?))
103 }
104 StreamError::Trap(e) => Err(e),
105 }
106 }
107}
108
109impl error::HostError for ResourceTable {
110 fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> {
111 self.delete(err)?;
112 Ok(())
113 }
114
115 fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> {
116 Ok(alloc::format!("{:?}", self.get(&err)?))
117 }
118}
119
120impl streams::HostOutputStream for ResourceTable {
121 async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> {
122 self.delete(stream)?.cancel().await;
123 Ok(())
124 }
125
126 fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> {
127 let bytes = self.get_mut(&stream)?.check_write()?;
128 Ok(bytes as u64)
129 }
130
131 fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
132 self.get_mut(&stream)?.write(bytes.into())?;
133 Ok(())
134 }
135
136 fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> {
137 subscribe(self, stream)
138 }
139
140 async fn blocking_write_and_flush(
141 &mut self,
142 stream: Resource<DynOutputStream>,
143 bytes: Vec<u8>,
144 ) -> StreamResult<()> {
145 if bytes.len() > 4096 {
146 return Err(StreamError::trap(
147 "Buffer too large for blocking-write-and-flush (expected at most 4096)",
148 ));
149 }
150
151 self.get_mut(&stream)?
152 .blocking_write_and_flush(bytes.into())
153 .await
154 }
155
156 async fn blocking_write_zeroes_and_flush(
157 &mut self,
158 stream: Resource<DynOutputStream>,
159 len: u64,
160 ) -> StreamResult<()> {
161 if len > 4096 {
162 return Err(StreamError::trap(
163 "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)",
164 ));
165 }
166
167 let bs = Bytes::from_iter(core::iter::repeat(0).take(len as usize));
170 self.get_mut(&stream)?.blocking_write_and_flush(bs).await
171 }
172
173 fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> {
174 self.get_mut(&stream)?.write_zeroes(len as usize)?;
175 Ok(())
176 }
177
178 fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
179 self.get_mut(&stream)?.flush()?;
180 Ok(())
181 }
182
183 async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
184 let s = self.get_mut(&stream)?;
185 s.flush()?;
186 s.write_ready().await?;
187 Ok(())
188 }
189
190 fn splice(
191 &mut self,
192 dest: Resource<DynOutputStream>,
193 src: Resource<DynInputStream>,
194 len: u64,
195 ) -> StreamResult<u64> {
196 let len = len.try_into().unwrap_or(usize::MAX);
197
198 let permit = {
199 let output = self.get_mut(&dest)?;
200 output.check_write()?
201 };
202 let len = len.min(permit);
203 if len == 0 {
204 return Ok(0);
205 }
206
207 let contents = self.get_mut(&src)?.read(len)?;
208
209 let len = contents.len();
210 if len == 0 {
211 return Ok(0);
212 }
213
214 let output = self.get_mut(&dest)?;
215 output.write(contents)?;
216 Ok(len.try_into().expect("usize can fit in u64"))
217 }
218
219 async fn blocking_splice(
220 &mut self,
221 dest: Resource<DynOutputStream>,
222 src: Resource<DynInputStream>,
223 len: u64,
224 ) -> StreamResult<u64> {
225 let len = len.try_into().unwrap_or(usize::MAX);
226
227 let permit = {
228 let output = self.get_mut(&dest)?;
229 output.write_ready().await?
230 };
231 let len = len.min(permit);
232 if len == 0 {
233 return Ok(0);
234 }
235
236 let contents = self.get_mut(&src)?.blocking_read(len).await?;
237
238 let len = contents.len();
239 if len == 0 {
240 return Ok(0);
241 }
242
243 let output = self.get_mut(&dest)?;
244 output.blocking_write_and_flush(contents).await?;
245 Ok(len.try_into().expect("usize can fit in u64"))
246 }
247}
248
249impl streams::HostInputStream for ResourceTable {
250 async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> {
251 self.delete(stream)?.cancel().await;
252 Ok(())
253 }
254
255 fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> {
256 let len = len.try_into().unwrap_or(usize::MAX);
257 let bytes = self.get_mut(&stream)?.read(len)?;
258 debug_assert!(bytes.len() <= len);
259 Ok(bytes.into())
260 }
261
262 async fn blocking_read(
263 &mut self,
264 stream: Resource<DynInputStream>,
265 len: u64,
266 ) -> StreamResult<Vec<u8>> {
267 let len = len.try_into().unwrap_or(usize::MAX);
268 let bytes = self.get_mut(&stream)?.blocking_read(len).await?;
269 debug_assert!(bytes.len() <= len);
270 Ok(bytes.into())
271 }
272
273 fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> {
274 let len = len.try_into().unwrap_or(usize::MAX);
275 let written = self.get_mut(&stream)?.skip(len)?;
276 Ok(written.try_into().expect("usize always fits in u64"))
277 }
278
279 async fn blocking_skip(
280 &mut self,
281 stream: Resource<DynInputStream>,
282 len: u64,
283 ) -> StreamResult<u64> {
284 let len = len.try_into().unwrap_or(usize::MAX);
285 let written = self.get_mut(&stream)?.blocking_skip(len).await?;
286 Ok(written.try_into().expect("usize always fits in u64"))
287 }
288
289 fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> {
290 crate::poll::subscribe(self, stream)
291 }
292}