Skip to main content

wasmtime_wasi_io/
impls.rs

1use crate::bindings::wasi::io::{error, poll, streams};
2use crate::poll::{DynFuture, DynPollable, MakeFuture, subscribe};
3use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult};
4use alloc::collections::BTreeMap;
5use alloc::string::String;
6use alloc::vec::Vec;
7use bytes::Bytes;
8use core::future::Future;
9use core::pin::Pin;
10use core::task::{Context, Poll};
11use wasmtime::component::{Resource, ResourceTable};
12use wasmtime::{Result, format_err};
13
14impl poll::Host for ResourceTable {
15    async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> {
16        type ReadylistIndex = u32;
17
18        if pollables.is_empty() {
19            return Err(format_err!("empty poll list"));
20        }
21
22        let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new();
23
24        for (ix, p) in pollables.iter().enumerate() {
25            let ix: u32 = ix.try_into()?;
26
27            let pollable = self.get(p)?;
28            let (_, list) = table_futures
29                .entry(pollable.index)
30                .or_insert((pollable.make_future, Vec::new()));
31            list.push(ix);
32        }
33
34        let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
35        for (entry, (make_future, readylist_indices)) in self.iter_entries(table_futures) {
36            let entry = entry?;
37            futures.push((make_future(entry), readylist_indices));
38        }
39
40        struct PollList<'a> {
41            futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>,
42        }
43        impl<'a> Future for PollList<'a> {
44            type Output = Vec<u32>;
45
46            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47                let mut any_ready = false;
48                let mut results = Vec::new();
49                for (fut, readylist_indicies) in self.futures.iter_mut() {
50                    match fut.as_mut().poll(cx) {
51                        Poll::Ready(()) => {
52                            results.extend_from_slice(readylist_indicies);
53                            any_ready = true;
54                        }
55                        Poll::Pending => {}
56                    }
57                }
58                if any_ready {
59                    Poll::Ready(results)
60                } else {
61                    Poll::Pending
62                }
63            }
64        }
65
66        Ok(PollList { futures }.await)
67    }
68}
69
70impl crate::bindings::wasi::io::poll::HostPollable for ResourceTable {
71    async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
72        let pollable = self.get(&pollable)?;
73        let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
74        ready.await;
75        Ok(())
76    }
77    async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> {
78        let pollable = self.get(&pollable)?;
79        let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
80        futures::pin_mut!(ready);
81        Ok(matches!(
82            futures::future::poll_immediate(ready).await,
83            Some(())
84        ))
85    }
86    fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
87        let pollable = self.delete(pollable)?;
88        if let Some(delete) = pollable.remove_index_on_delete {
89            delete(self, pollable.index)?;
90        }
91        Ok(())
92    }
93}
94
95impl error::Host for ResourceTable {}
96
97impl streams::Host for ResourceTable {
98    fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> {
99        match err {
100            StreamError::Closed => Ok(streams::StreamError::Closed),
101            StreamError::LastOperationFailed(e) => {
102                Ok(streams::StreamError::LastOperationFailed(self.push(e)?))
103            }
104            StreamError::Trap(e) => Err(e),
105        }
106    }
107}
108
109impl error::HostError for ResourceTable {
110    fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> {
111        self.delete(err)?;
112        Ok(())
113    }
114
115    fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> {
116        Ok(alloc::format!("{:?}", self.get(&err)?))
117    }
118}
119
120impl streams::HostOutputStream for ResourceTable {
121    async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> {
122        self.delete(stream)?.cancel().await;
123        Ok(())
124    }
125
126    fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> {
127        let bytes = self.get_mut(&stream)?.check_write()?;
128        Ok(bytes as u64)
129    }
130
131    fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
132        self.get_mut(&stream)?.write(bytes.into())?;
133        Ok(())
134    }
135
136    fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> {
137        subscribe(self, stream)
138    }
139
140    async fn blocking_write_and_flush(
141        &mut self,
142        stream: Resource<DynOutputStream>,
143        bytes: Vec<u8>,
144    ) -> StreamResult<()> {
145        if bytes.len() > 4096 {
146            return Err(StreamError::trap(
147                "Buffer too large for blocking-write-and-flush (expected at most 4096)",
148            ));
149        }
150
151        self.get_mut(&stream)?
152            .blocking_write_and_flush(bytes.into())
153            .await
154    }
155
156    async fn blocking_write_zeroes_and_flush(
157        &mut self,
158        stream: Resource<DynOutputStream>,
159        len: u64,
160    ) -> StreamResult<()> {
161        if len > 4096 {
162            return Err(StreamError::trap(
163                "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)",
164            ));
165        }
166
167        // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
168        // repeatedly from a 'static buffer of zeros.
169        let bs = Bytes::from_iter(core::iter::repeat(0).take(len as usize));
170        self.get_mut(&stream)?.blocking_write_and_flush(bs).await
171    }
172
173    fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> {
174        self.get_mut(&stream)?.write_zeroes(len as usize)?;
175        Ok(())
176    }
177
178    fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
179        self.get_mut(&stream)?.flush()?;
180        Ok(())
181    }
182
183    async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
184        let s = self.get_mut(&stream)?;
185        s.flush()?;
186        s.write_ready().await?;
187        Ok(())
188    }
189
190    fn splice(
191        &mut self,
192        dest: Resource<DynOutputStream>,
193        src: Resource<DynInputStream>,
194        len: u64,
195    ) -> StreamResult<u64> {
196        let len = len.try_into().unwrap_or(usize::MAX);
197
198        let permit = {
199            let output = self.get_mut(&dest)?;
200            output.check_write()?
201        };
202        let len = len.min(permit);
203        if len == 0 {
204            return Ok(0);
205        }
206
207        let contents = self.get_mut(&src)?.read(len)?;
208
209        let len = contents.len();
210        if len == 0 {
211            return Ok(0);
212        }
213
214        let output = self.get_mut(&dest)?;
215        output.write(contents)?;
216        Ok(len.try_into().expect("usize can fit in u64"))
217    }
218
219    async fn blocking_splice(
220        &mut self,
221        dest: Resource<DynOutputStream>,
222        src: Resource<DynInputStream>,
223        len: u64,
224    ) -> StreamResult<u64> {
225        let len = len.try_into().unwrap_or(usize::MAX);
226
227        let permit = {
228            let output = self.get_mut(&dest)?;
229            output.write_ready().await?
230        };
231        let len = len.min(permit);
232        if len == 0 {
233            return Ok(0);
234        }
235
236        let contents = self.get_mut(&src)?.blocking_read(len).await?;
237
238        let len = contents.len();
239        if len == 0 {
240            return Ok(0);
241        }
242
243        let output = self.get_mut(&dest)?;
244        output.blocking_write_and_flush(contents).await?;
245        Ok(len.try_into().expect("usize can fit in u64"))
246    }
247}
248
249impl streams::HostInputStream for ResourceTable {
250    async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> {
251        self.delete(stream)?.cancel().await;
252        Ok(())
253    }
254
255    fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> {
256        let len = len.try_into().unwrap_or(usize::MAX);
257        let bytes = self.get_mut(&stream)?.read(len)?;
258        debug_assert!(bytes.len() <= len);
259        Ok(bytes.into())
260    }
261
262    async fn blocking_read(
263        &mut self,
264        stream: Resource<DynInputStream>,
265        len: u64,
266    ) -> StreamResult<Vec<u8>> {
267        let len = len.try_into().unwrap_or(usize::MAX);
268        let bytes = self.get_mut(&stream)?.blocking_read(len).await?;
269        debug_assert!(bytes.len() <= len);
270        Ok(bytes.into())
271    }
272
273    fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> {
274        let len = len.try_into().unwrap_or(usize::MAX);
275        let written = self.get_mut(&stream)?.skip(len)?;
276        Ok(written.try_into().expect("usize always fits in u64"))
277    }
278
279    async fn blocking_skip(
280        &mut self,
281        stream: Resource<DynInputStream>,
282        len: u64,
283    ) -> StreamResult<u64> {
284        let len = len.try_into().unwrap_or(usize::MAX);
285        let written = self.get_mut(&stream)?.blocking_skip(len).await?;
286        Ok(written.try_into().expect("usize always fits in u64"))
287    }
288
289    fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> {
290        crate::poll::subscribe(self, stream)
291    }
292}