wasmtime_wasi_io/
impls.rs

1use crate::bindings::wasi::io::{error, poll, streams};
2use crate::poll::{DynFuture, DynPollable, MakeFuture, subscribe};
3use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult};
4use alloc::collections::BTreeMap;
5use alloc::string::String;
6use alloc::vec::Vec;
7use anyhow::{Result, anyhow};
8use core::future::Future;
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use wasmtime::component::{Resource, ResourceTable};
12
13impl poll::Host for ResourceTable {
14    async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> {
15        type ReadylistIndex = u32;
16
17        if pollables.is_empty() {
18            return Err(anyhow!("empty poll list"));
19        }
20
21        let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new();
22
23        for (ix, p) in pollables.iter().enumerate() {
24            let ix: u32 = ix.try_into()?;
25
26            let pollable = self.get(p)?;
27            let (_, list) = table_futures
28                .entry(pollable.index)
29                .or_insert((pollable.make_future, Vec::new()));
30            list.push(ix);
31        }
32
33        let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
34        for (entry, (make_future, readylist_indices)) in self.iter_entries(table_futures) {
35            let entry = entry?;
36            futures.push((make_future(entry), readylist_indices));
37        }
38
39        struct PollList<'a> {
40            futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>,
41        }
42        impl<'a> Future for PollList<'a> {
43            type Output = Vec<u32>;
44
45            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46                let mut any_ready = false;
47                let mut results = Vec::new();
48                for (fut, readylist_indicies) in self.futures.iter_mut() {
49                    match fut.as_mut().poll(cx) {
50                        Poll::Ready(()) => {
51                            results.extend_from_slice(readylist_indicies);
52                            any_ready = true;
53                        }
54                        Poll::Pending => {}
55                    }
56                }
57                if any_ready {
58                    Poll::Ready(results)
59                } else {
60                    Poll::Pending
61                }
62            }
63        }
64
65        Ok(PollList { futures }.await)
66    }
67}
68
69impl crate::bindings::wasi::io::poll::HostPollable for ResourceTable {
70    async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
71        let pollable = self.get(&pollable)?;
72        let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
73        ready.await;
74        Ok(())
75    }
76    async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> {
77        let pollable = self.get(&pollable)?;
78        let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
79        futures::pin_mut!(ready);
80        Ok(matches!(
81            futures::future::poll_immediate(ready).await,
82            Some(())
83        ))
84    }
85    fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
86        let pollable = self.delete(pollable)?;
87        if let Some(delete) = pollable.remove_index_on_delete {
88            delete(self, pollable.index)?;
89        }
90        Ok(())
91    }
92}
93
94impl error::Host for ResourceTable {}
95
96impl streams::Host for ResourceTable {
97    fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> {
98        match err {
99            StreamError::Closed => Ok(streams::StreamError::Closed),
100            StreamError::LastOperationFailed(e) => {
101                Ok(streams::StreamError::LastOperationFailed(self.push(e)?))
102            }
103            StreamError::Trap(e) => Err(e),
104        }
105    }
106}
107
108impl error::HostError for ResourceTable {
109    fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> {
110        self.delete(err)?;
111        Ok(())
112    }
113
114    fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> {
115        Ok(alloc::format!("{:?}", self.get(&err)?))
116    }
117}
118
119impl streams::HostOutputStream for ResourceTable {
120    async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> {
121        self.delete(stream)?.cancel().await;
122        Ok(())
123    }
124
125    fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> {
126        let bytes = self.get_mut(&stream)?.check_write()?;
127        Ok(bytes as u64)
128    }
129
130    fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
131        self.get_mut(&stream)?.write(bytes.into())?;
132        Ok(())
133    }
134
135    fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> {
136        subscribe(self, stream)
137    }
138
139    async fn blocking_write_and_flush(
140        &mut self,
141        stream: Resource<DynOutputStream>,
142        bytes: Vec<u8>,
143    ) -> StreamResult<()> {
144        if bytes.len() > 4096 {
145            return Err(StreamError::trap(
146                "Buffer too large for blocking-write-and-flush (expected at most 4096)",
147            ));
148        }
149
150        self.get_mut(&stream)?
151            .blocking_write_and_flush(bytes.into())
152            .await
153    }
154
155    async fn blocking_write_zeroes_and_flush(
156        &mut self,
157        stream: Resource<DynOutputStream>,
158        len: u64,
159    ) -> StreamResult<()> {
160        if len > 4096 {
161            return Err(StreamError::trap(
162                "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)",
163            ));
164        }
165
166        self.get_mut(&stream)?
167            .blocking_write_zeroes_and_flush(len as usize)
168            .await
169    }
170
171    fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> {
172        self.get_mut(&stream)?.write_zeroes(len as usize)?;
173        Ok(())
174    }
175
176    fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
177        self.get_mut(&stream)?.flush()?;
178        Ok(())
179    }
180
181    async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
182        let s = self.get_mut(&stream)?;
183        s.flush()?;
184        s.write_ready().await?;
185        Ok(())
186    }
187
188    fn splice(
189        &mut self,
190        dest: Resource<DynOutputStream>,
191        src: Resource<DynInputStream>,
192        len: u64,
193    ) -> StreamResult<u64> {
194        let len = len.try_into().unwrap_or(usize::MAX);
195
196        let permit = {
197            let output = self.get_mut(&dest)?;
198            output.check_write()?
199        };
200        let len = len.min(permit);
201        if len == 0 {
202            return Ok(0);
203        }
204
205        let contents = self.get_mut(&src)?.read(len)?;
206
207        let len = contents.len();
208        if len == 0 {
209            return Ok(0);
210        }
211
212        let output = self.get_mut(&dest)?;
213        output.write(contents)?;
214        Ok(len.try_into().expect("usize can fit in u64"))
215    }
216
217    async fn blocking_splice(
218        &mut self,
219        dest: Resource<DynOutputStream>,
220        src: Resource<DynInputStream>,
221        len: u64,
222    ) -> StreamResult<u64> {
223        let len = len.try_into().unwrap_or(usize::MAX);
224
225        let permit = {
226            let output = self.get_mut(&dest)?;
227            output.write_ready().await?
228        };
229        let len = len.min(permit);
230        if len == 0 {
231            return Ok(0);
232        }
233
234        let contents = self.get_mut(&src)?.blocking_read(len).await?;
235
236        let len = contents.len();
237        if len == 0 {
238            return Ok(0);
239        }
240
241        let output = self.get_mut(&dest)?;
242        output.blocking_write_and_flush(contents).await?;
243        Ok(len.try_into().expect("usize can fit in u64"))
244    }
245}
246
247impl streams::HostInputStream for ResourceTable {
248    async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> {
249        self.delete(stream)?.cancel().await;
250        Ok(())
251    }
252
253    fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> {
254        let len = len.try_into().unwrap_or(usize::MAX);
255        let bytes = self.get_mut(&stream)?.read(len)?;
256        debug_assert!(bytes.len() <= len);
257        Ok(bytes.into())
258    }
259
260    async fn blocking_read(
261        &mut self,
262        stream: Resource<DynInputStream>,
263        len: u64,
264    ) -> StreamResult<Vec<u8>> {
265        let len = len.try_into().unwrap_or(usize::MAX);
266        let bytes = self.get_mut(&stream)?.blocking_read(len).await?;
267        debug_assert!(bytes.len() <= len);
268        Ok(bytes.into())
269    }
270
271    fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> {
272        let len = len.try_into().unwrap_or(usize::MAX);
273        let written = self.get_mut(&stream)?.skip(len)?;
274        Ok(written.try_into().expect("usize always fits in u64"))
275    }
276
277    async fn blocking_skip(
278        &mut self,
279        stream: Resource<DynInputStream>,
280        len: u64,
281    ) -> StreamResult<u64> {
282        let len = len.try_into().unwrap_or(usize::MAX);
283        let written = self.get_mut(&stream)?.blocking_skip(len).await?;
284        Ok(written.try_into().expect("usize always fits in u64"))
285    }
286
287    fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> {
288        crate::poll::subscribe(self, stream)
289    }
290}