wasmtime_wasi_io/
impls.rs

1use crate::bindings::wasi::io::{error, poll, streams};
2use crate::poll::{subscribe, DynFuture, DynPollable, MakeFuture};
3use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult};
4use crate::{IoImpl, IoView};
5use alloc::collections::BTreeMap;
6use alloc::string::String;
7use alloc::vec::Vec;
8use anyhow::{anyhow, Result};
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use wasmtime::component::Resource;
13
14impl<T: IoView> poll::Host for IoImpl<T> {
15    async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> {
16        type ReadylistIndex = u32;
17
18        if pollables.is_empty() {
19            return Err(anyhow!("empty poll list"));
20        }
21
22        let table = self.table();
23
24        let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new();
25
26        for (ix, p) in pollables.iter().enumerate() {
27            let ix: u32 = ix.try_into()?;
28
29            let pollable = table.get(p)?;
30            let (_, list) = table_futures
31                .entry(pollable.index)
32                .or_insert((pollable.make_future, Vec::new()));
33            list.push(ix);
34        }
35
36        let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
37        for (entry, (make_future, readylist_indices)) in table.iter_entries(table_futures) {
38            let entry = entry?;
39            futures.push((make_future(entry), readylist_indices));
40        }
41
42        struct PollList<'a> {
43            futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>,
44        }
45        impl<'a> Future for PollList<'a> {
46            type Output = Vec<u32>;
47
48            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49                let mut any_ready = false;
50                let mut results = Vec::new();
51                for (fut, readylist_indicies) in self.futures.iter_mut() {
52                    match fut.as_mut().poll(cx) {
53                        Poll::Ready(()) => {
54                            results.extend_from_slice(readylist_indicies);
55                            any_ready = true;
56                        }
57                        Poll::Pending => {}
58                    }
59                }
60                if any_ready {
61                    Poll::Ready(results)
62                } else {
63                    Poll::Pending
64                }
65            }
66        }
67
68        Ok(PollList { futures }.await)
69    }
70}
71
72impl<T: IoView> crate::bindings::wasi::io::poll::HostPollable for IoImpl<T> {
73    async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
74        let table = self.table();
75        let pollable = table.get(&pollable)?;
76        let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
77        ready.await;
78        Ok(())
79    }
80    async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> {
81        let table = self.table();
82        let pollable = table.get(&pollable)?;
83        let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
84        futures::pin_mut!(ready);
85        Ok(matches!(
86            futures::future::poll_immediate(ready).await,
87            Some(())
88        ))
89    }
90    fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
91        let pollable = self.table().delete(pollable)?;
92        if let Some(delete) = pollable.remove_index_on_delete {
93            delete(self.table(), pollable.index)?;
94        }
95        Ok(())
96    }
97}
98
99impl<T: IoView> error::Host for IoImpl<T> {}
100
101impl<T: IoView> streams::Host for IoImpl<T> {
102    fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> {
103        match err {
104            StreamError::Closed => Ok(streams::StreamError::Closed),
105            StreamError::LastOperationFailed(e) => Ok(streams::StreamError::LastOperationFailed(
106                self.table().push(e)?,
107            )),
108            StreamError::Trap(e) => Err(e),
109        }
110    }
111}
112
113impl<T: IoView> error::HostError for IoImpl<T> {
114    fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> {
115        self.table().delete(err)?;
116        Ok(())
117    }
118
119    fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> {
120        Ok(alloc::format!("{:?}", self.table().get(&err)?))
121    }
122}
123
124impl<T: IoView> streams::HostOutputStream for IoImpl<T> {
125    async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> {
126        self.table().delete(stream)?.cancel().await;
127        Ok(())
128    }
129
130    fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> {
131        let bytes = self.table().get_mut(&stream)?.check_write()?;
132        Ok(bytes as u64)
133    }
134
135    fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
136        self.table().get_mut(&stream)?.write(bytes.into())?;
137        Ok(())
138    }
139
140    fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> {
141        subscribe(self.table(), stream)
142    }
143
144    async fn blocking_write_and_flush(
145        &mut self,
146        stream: Resource<DynOutputStream>,
147        bytes: Vec<u8>,
148    ) -> StreamResult<()> {
149        if bytes.len() > 4096 {
150            return Err(StreamError::trap(
151                "Buffer too large for blocking-write-and-flush (expected at most 4096)",
152            ));
153        }
154
155        self.table()
156            .get_mut(&stream)?
157            .blocking_write_and_flush(bytes.into())
158            .await
159    }
160
161    async fn blocking_write_zeroes_and_flush(
162        &mut self,
163        stream: Resource<DynOutputStream>,
164        len: u64,
165    ) -> StreamResult<()> {
166        if len > 4096 {
167            return Err(StreamError::trap(
168                "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)",
169            ));
170        }
171
172        self.table()
173            .get_mut(&stream)?
174            .blocking_write_zeroes_and_flush(len as usize)
175            .await
176    }
177
178    fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> {
179        self.table().get_mut(&stream)?.write_zeroes(len as usize)?;
180        Ok(())
181    }
182
183    fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
184        self.table().get_mut(&stream)?.flush()?;
185        Ok(())
186    }
187
188    async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
189        let s = self.table().get_mut(&stream)?;
190        s.flush()?;
191        s.write_ready().await?;
192        Ok(())
193    }
194
195    fn splice(
196        &mut self,
197        dest: Resource<DynOutputStream>,
198        src: Resource<DynInputStream>,
199        len: u64,
200    ) -> StreamResult<u64> {
201        let len = len.try_into().unwrap_or(usize::MAX);
202
203        let permit = {
204            let output = self.table().get_mut(&dest)?;
205            output.check_write()?
206        };
207        let len = len.min(permit);
208        if len == 0 {
209            return Ok(0);
210        }
211
212        let contents = self.table().get_mut(&src)?.read(len)?;
213
214        let len = contents.len();
215        if len == 0 {
216            return Ok(0);
217        }
218
219        let output = self.table().get_mut(&dest)?;
220        output.write(contents)?;
221        Ok(len.try_into().expect("usize can fit in u64"))
222    }
223
224    async fn blocking_splice(
225        &mut self,
226        dest: Resource<DynOutputStream>,
227        src: Resource<DynInputStream>,
228        len: u64,
229    ) -> StreamResult<u64> {
230        let len = len.try_into().unwrap_or(usize::MAX);
231
232        let permit = {
233            let output = self.table().get_mut(&dest)?;
234            output.write_ready().await?
235        };
236        let len = len.min(permit);
237        if len == 0 {
238            return Ok(0);
239        }
240
241        let contents = self.table().get_mut(&src)?.blocking_read(len).await?;
242
243        let len = contents.len();
244        if len == 0 {
245            return Ok(0);
246        }
247
248        let output = self.table().get_mut(&dest)?;
249        output.blocking_write_and_flush(contents).await?;
250        Ok(len.try_into().expect("usize can fit in u64"))
251    }
252}
253
254impl<T: IoView> streams::HostInputStream for IoImpl<T> {
255    async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> {
256        self.table().delete(stream)?.cancel().await;
257        Ok(())
258    }
259
260    fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> {
261        let len = len.try_into().unwrap_or(usize::MAX);
262        let bytes = self.table().get_mut(&stream)?.read(len)?;
263        debug_assert!(bytes.len() <= len);
264        Ok(bytes.into())
265    }
266
267    async fn blocking_read(
268        &mut self,
269        stream: Resource<DynInputStream>,
270        len: u64,
271    ) -> StreamResult<Vec<u8>> {
272        let len = len.try_into().unwrap_or(usize::MAX);
273        let bytes = self.table().get_mut(&stream)?.blocking_read(len).await?;
274        debug_assert!(bytes.len() <= len);
275        Ok(bytes.into())
276    }
277
278    fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> {
279        let len = len.try_into().unwrap_or(usize::MAX);
280        let written = self.table().get_mut(&stream)?.skip(len)?;
281        Ok(written.try_into().expect("usize always fits in u64"))
282    }
283
284    async fn blocking_skip(
285        &mut self,
286        stream: Resource<DynInputStream>,
287        len: u64,
288    ) -> StreamResult<u64> {
289        let len = len.try_into().unwrap_or(usize::MAX);
290        let written = self.table().get_mut(&stream)?.blocking_skip(len).await?;
291        Ok(written.try_into().expect("usize always fits in u64"))
292    }
293
294    fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> {
295        crate::poll::subscribe(self.table(), stream)
296    }
297}