wasmtime_wasi_io/
impls.rs1use crate::bindings::wasi::io::{error, poll, streams};
2use crate::poll::{DynFuture, DynPollable, MakeFuture, subscribe};
3use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult};
4use alloc::collections::BTreeMap;
5use alloc::string::String;
6use alloc::vec::Vec;
7use anyhow::{Result, anyhow};
8use core::future::Future;
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use wasmtime::component::{Resource, ResourceTable};
12
13impl poll::Host for ResourceTable {
14 async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> {
15 type ReadylistIndex = u32;
16
17 if pollables.is_empty() {
18 return Err(anyhow!("empty poll list"));
19 }
20
21 let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new();
22
23 for (ix, p) in pollables.iter().enumerate() {
24 let ix: u32 = ix.try_into()?;
25
26 let pollable = self.get(p)?;
27 let (_, list) = table_futures
28 .entry(pollable.index)
29 .or_insert((pollable.make_future, Vec::new()));
30 list.push(ix);
31 }
32
33 let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
34 for (entry, (make_future, readylist_indices)) in self.iter_entries(table_futures) {
35 let entry = entry?;
36 futures.push((make_future(entry), readylist_indices));
37 }
38
39 struct PollList<'a> {
40 futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>,
41 }
42 impl<'a> Future for PollList<'a> {
43 type Output = Vec<u32>;
44
45 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46 let mut any_ready = false;
47 let mut results = Vec::new();
48 for (fut, readylist_indicies) in self.futures.iter_mut() {
49 match fut.as_mut().poll(cx) {
50 Poll::Ready(()) => {
51 results.extend_from_slice(readylist_indicies);
52 any_ready = true;
53 }
54 Poll::Pending => {}
55 }
56 }
57 if any_ready {
58 Poll::Ready(results)
59 } else {
60 Poll::Pending
61 }
62 }
63 }
64
65 Ok(PollList { futures }.await)
66 }
67}
68
69impl crate::bindings::wasi::io::poll::HostPollable for ResourceTable {
70 async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
71 let pollable = self.get(&pollable)?;
72 let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
73 ready.await;
74 Ok(())
75 }
76 async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> {
77 let pollable = self.get(&pollable)?;
78 let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
79 futures::pin_mut!(ready);
80 Ok(matches!(
81 futures::future::poll_immediate(ready).await,
82 Some(())
83 ))
84 }
85 fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
86 let pollable = self.delete(pollable)?;
87 if let Some(delete) = pollable.remove_index_on_delete {
88 delete(self, pollable.index)?;
89 }
90 Ok(())
91 }
92}
93
94impl error::Host for ResourceTable {}
95
96impl streams::Host for ResourceTable {
97 fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> {
98 match err {
99 StreamError::Closed => Ok(streams::StreamError::Closed),
100 StreamError::LastOperationFailed(e) => {
101 Ok(streams::StreamError::LastOperationFailed(self.push(e)?))
102 }
103 StreamError::Trap(e) => Err(e),
104 }
105 }
106}
107
108impl error::HostError for ResourceTable {
109 fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> {
110 self.delete(err)?;
111 Ok(())
112 }
113
114 fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> {
115 Ok(alloc::format!("{:?}", self.get(&err)?))
116 }
117}
118
119impl streams::HostOutputStream for ResourceTable {
120 async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> {
121 self.delete(stream)?.cancel().await;
122 Ok(())
123 }
124
125 fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> {
126 let bytes = self.get_mut(&stream)?.check_write()?;
127 Ok(bytes as u64)
128 }
129
130 fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
131 self.get_mut(&stream)?.write(bytes.into())?;
132 Ok(())
133 }
134
135 fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> {
136 subscribe(self, stream)
137 }
138
139 async fn blocking_write_and_flush(
140 &mut self,
141 stream: Resource<DynOutputStream>,
142 bytes: Vec<u8>,
143 ) -> StreamResult<()> {
144 if bytes.len() > 4096 {
145 return Err(StreamError::trap(
146 "Buffer too large for blocking-write-and-flush (expected at most 4096)",
147 ));
148 }
149
150 self.get_mut(&stream)?
151 .blocking_write_and_flush(bytes.into())
152 .await
153 }
154
155 async fn blocking_write_zeroes_and_flush(
156 &mut self,
157 stream: Resource<DynOutputStream>,
158 len: u64,
159 ) -> StreamResult<()> {
160 if len > 4096 {
161 return Err(StreamError::trap(
162 "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)",
163 ));
164 }
165
166 self.get_mut(&stream)?
167 .blocking_write_zeroes_and_flush(len as usize)
168 .await
169 }
170
171 fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> {
172 self.get_mut(&stream)?.write_zeroes(len as usize)?;
173 Ok(())
174 }
175
176 fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
177 self.get_mut(&stream)?.flush()?;
178 Ok(())
179 }
180
181 async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
182 let s = self.get_mut(&stream)?;
183 s.flush()?;
184 s.write_ready().await?;
185 Ok(())
186 }
187
188 fn splice(
189 &mut self,
190 dest: Resource<DynOutputStream>,
191 src: Resource<DynInputStream>,
192 len: u64,
193 ) -> StreamResult<u64> {
194 let len = len.try_into().unwrap_or(usize::MAX);
195
196 let permit = {
197 let output = self.get_mut(&dest)?;
198 output.check_write()?
199 };
200 let len = len.min(permit);
201 if len == 0 {
202 return Ok(0);
203 }
204
205 let contents = self.get_mut(&src)?.read(len)?;
206
207 let len = contents.len();
208 if len == 0 {
209 return Ok(0);
210 }
211
212 let output = self.get_mut(&dest)?;
213 output.write(contents)?;
214 Ok(len.try_into().expect("usize can fit in u64"))
215 }
216
217 async fn blocking_splice(
218 &mut self,
219 dest: Resource<DynOutputStream>,
220 src: Resource<DynInputStream>,
221 len: u64,
222 ) -> StreamResult<u64> {
223 let len = len.try_into().unwrap_or(usize::MAX);
224
225 let permit = {
226 let output = self.get_mut(&dest)?;
227 output.write_ready().await?
228 };
229 let len = len.min(permit);
230 if len == 0 {
231 return Ok(0);
232 }
233
234 let contents = self.get_mut(&src)?.blocking_read(len).await?;
235
236 let len = contents.len();
237 if len == 0 {
238 return Ok(0);
239 }
240
241 let output = self.get_mut(&dest)?;
242 output.blocking_write_and_flush(contents).await?;
243 Ok(len.try_into().expect("usize can fit in u64"))
244 }
245}
246
247impl streams::HostInputStream for ResourceTable {
248 async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> {
249 self.delete(stream)?.cancel().await;
250 Ok(())
251 }
252
253 fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> {
254 let len = len.try_into().unwrap_or(usize::MAX);
255 let bytes = self.get_mut(&stream)?.read(len)?;
256 debug_assert!(bytes.len() <= len);
257 Ok(bytes.into())
258 }
259
260 async fn blocking_read(
261 &mut self,
262 stream: Resource<DynInputStream>,
263 len: u64,
264 ) -> StreamResult<Vec<u8>> {
265 let len = len.try_into().unwrap_or(usize::MAX);
266 let bytes = self.get_mut(&stream)?.blocking_read(len).await?;
267 debug_assert!(bytes.len() <= len);
268 Ok(bytes.into())
269 }
270
271 fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> {
272 let len = len.try_into().unwrap_or(usize::MAX);
273 let written = self.get_mut(&stream)?.skip(len)?;
274 Ok(written.try_into().expect("usize always fits in u64"))
275 }
276
277 async fn blocking_skip(
278 &mut self,
279 stream: Resource<DynInputStream>,
280 len: u64,
281 ) -> StreamResult<u64> {
282 let len = len.try_into().unwrap_or(usize::MAX);
283 let written = self.get_mut(&stream)?.blocking_skip(len).await?;
284 Ok(written.try_into().expect("usize always fits in u64"))
285 }
286
287 fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> {
288 crate::poll::subscribe(self, stream)
289 }
290}