wasmtime_wasi/p2/host/
io.rs

1use crate::p2::{
2    StreamError, StreamResult,
3    bindings::sync::io::poll::Pollable,
4    bindings::sync::io::streams::{self, InputStream, OutputStream},
5};
6use crate::runtime::in_tokio;
7use wasmtime::component::{Resource, ResourceTable};
8use wasmtime_wasi_io::bindings::wasi::io::streams::{
9    self as async_streams, Host as AsyncHost, HostInputStream as AsyncHostInputStream,
10    HostOutputStream as AsyncHostOutputStream,
11};
12
13impl From<async_streams::StreamError> for streams::StreamError {
14    fn from(other: async_streams::StreamError) -> Self {
15        match other {
16            async_streams::StreamError::LastOperationFailed(e) => Self::LastOperationFailed(e),
17            async_streams::StreamError::Closed => Self::Closed,
18        }
19    }
20}
21
22impl streams::Host for ResourceTable {
23    fn convert_stream_error(&mut self, err: StreamError) -> wasmtime::Result<streams::StreamError> {
24        Ok(AsyncHost::convert_stream_error(self, err)?.into())
25    }
26}
27
28impl streams::HostOutputStream for ResourceTable {
29    fn drop(&mut self, stream: Resource<OutputStream>) -> wasmtime::Result<()> {
30        in_tokio(async { AsyncHostOutputStream::drop(self, stream).await })
31    }
32
33    fn check_write(&mut self, stream: Resource<OutputStream>) -> StreamResult<u64> {
34        Ok(AsyncHostOutputStream::check_write(self, stream)?)
35    }
36
37    fn write(&mut self, stream: Resource<OutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
38        Ok(AsyncHostOutputStream::write(self, stream, bytes)?)
39    }
40
41    fn blocking_write_and_flush(
42        &mut self,
43        stream: Resource<OutputStream>,
44        bytes: Vec<u8>,
45    ) -> StreamResult<()> {
46        in_tokio(async {
47            AsyncHostOutputStream::blocking_write_and_flush(self, stream, bytes).await
48        })
49    }
50
51    fn blocking_write_zeroes_and_flush(
52        &mut self,
53        stream: Resource<OutputStream>,
54        len: u64,
55    ) -> StreamResult<()> {
56        in_tokio(async {
57            AsyncHostOutputStream::blocking_write_zeroes_and_flush(self, stream, len).await
58        })
59    }
60
61    fn subscribe(
62        &mut self,
63        stream: Resource<OutputStream>,
64    ) -> wasmtime::Result<Resource<Pollable>> {
65        Ok(AsyncHostOutputStream::subscribe(self, stream)?)
66    }
67
68    fn write_zeroes(&mut self, stream: Resource<OutputStream>, len: u64) -> StreamResult<()> {
69        Ok(AsyncHostOutputStream::write_zeroes(self, stream, len)?)
70    }
71
72    fn flush(&mut self, stream: Resource<OutputStream>) -> StreamResult<()> {
73        Ok(AsyncHostOutputStream::flush(
74            self,
75            Resource::new_borrow(stream.rep()),
76        )?)
77    }
78
79    fn blocking_flush(&mut self, stream: Resource<OutputStream>) -> StreamResult<()> {
80        in_tokio(async {
81            AsyncHostOutputStream::blocking_flush(self, Resource::new_borrow(stream.rep())).await
82        })
83    }
84
85    fn splice(
86        &mut self,
87        dst: Resource<OutputStream>,
88        src: Resource<InputStream>,
89        len: u64,
90    ) -> StreamResult<u64> {
91        AsyncHostOutputStream::splice(self, dst, src, len)
92    }
93
94    fn blocking_splice(
95        &mut self,
96        dst: Resource<OutputStream>,
97        src: Resource<InputStream>,
98        len: u64,
99    ) -> StreamResult<u64> {
100        in_tokio(async { AsyncHostOutputStream::blocking_splice(self, dst, src, len).await })
101    }
102}
103
104impl streams::HostInputStream for ResourceTable {
105    fn drop(&mut self, stream: Resource<InputStream>) -> wasmtime::Result<()> {
106        in_tokio(async { AsyncHostInputStream::drop(self, stream).await })
107    }
108
109    fn read(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
110        AsyncHostInputStream::read(self, stream, len)
111    }
112
113    fn blocking_read(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
114        in_tokio(async { AsyncHostInputStream::blocking_read(self, stream, len).await })
115    }
116
117    fn skip(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<u64> {
118        AsyncHostInputStream::skip(self, stream, len)
119    }
120
121    fn blocking_skip(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<u64> {
122        in_tokio(async { AsyncHostInputStream::blocking_skip(self, stream, len).await })
123    }
124
125    fn subscribe(&mut self, stream: Resource<InputStream>) -> wasmtime::Result<Resource<Pollable>> {
126        AsyncHostInputStream::subscribe(self, stream)
127    }
128}