wasmtime_wasi/p2/host/
io.rs1use crate::p2::{
2 StreamError, StreamResult,
3 bindings::sync::io::poll::Pollable,
4 bindings::sync::io::streams::{self, InputStream, OutputStream},
5};
6use crate::runtime::in_tokio;
7use wasmtime::component::{Resource, ResourceTable};
8use wasmtime_wasi_io::bindings::wasi::io::streams::{
9 self as async_streams, Host as AsyncHost, HostInputStream as AsyncHostInputStream,
10 HostOutputStream as AsyncHostOutputStream,
11};
12
13impl From<async_streams::StreamError> for streams::StreamError {
14 fn from(other: async_streams::StreamError) -> Self {
15 match other {
16 async_streams::StreamError::LastOperationFailed(e) => Self::LastOperationFailed(e),
17 async_streams::StreamError::Closed => Self::Closed,
18 }
19 }
20}
21
22impl streams::Host for ResourceTable {
23 fn convert_stream_error(&mut self, err: StreamError) -> wasmtime::Result<streams::StreamError> {
24 Ok(AsyncHost::convert_stream_error(self, err)?.into())
25 }
26}
27
28impl streams::HostOutputStream for ResourceTable {
29 fn drop(&mut self, stream: Resource<OutputStream>) -> wasmtime::Result<()> {
30 in_tokio(async { AsyncHostOutputStream::drop(self, stream).await })
31 }
32
33 fn check_write(&mut self, stream: Resource<OutputStream>) -> StreamResult<u64> {
34 Ok(AsyncHostOutputStream::check_write(self, stream)?)
35 }
36
37 fn write(&mut self, stream: Resource<OutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
38 Ok(AsyncHostOutputStream::write(self, stream, bytes)?)
39 }
40
41 fn blocking_write_and_flush(
42 &mut self,
43 stream: Resource<OutputStream>,
44 bytes: Vec<u8>,
45 ) -> StreamResult<()> {
46 in_tokio(async {
47 AsyncHostOutputStream::blocking_write_and_flush(self, stream, bytes).await
48 })
49 }
50
51 fn blocking_write_zeroes_and_flush(
52 &mut self,
53 stream: Resource<OutputStream>,
54 len: u64,
55 ) -> StreamResult<()> {
56 in_tokio(async {
57 AsyncHostOutputStream::blocking_write_zeroes_and_flush(self, stream, len).await
58 })
59 }
60
61 fn subscribe(
62 &mut self,
63 stream: Resource<OutputStream>,
64 ) -> wasmtime::Result<Resource<Pollable>> {
65 Ok(AsyncHostOutputStream::subscribe(self, stream)?)
66 }
67
68 fn write_zeroes(&mut self, stream: Resource<OutputStream>, len: u64) -> StreamResult<()> {
69 Ok(AsyncHostOutputStream::write_zeroes(self, stream, len)?)
70 }
71
72 fn flush(&mut self, stream: Resource<OutputStream>) -> StreamResult<()> {
73 Ok(AsyncHostOutputStream::flush(
74 self,
75 Resource::new_borrow(stream.rep()),
76 )?)
77 }
78
79 fn blocking_flush(&mut self, stream: Resource<OutputStream>) -> StreamResult<()> {
80 in_tokio(async {
81 AsyncHostOutputStream::blocking_flush(self, Resource::new_borrow(stream.rep())).await
82 })
83 }
84
85 fn splice(
86 &mut self,
87 dst: Resource<OutputStream>,
88 src: Resource<InputStream>,
89 len: u64,
90 ) -> StreamResult<u64> {
91 AsyncHostOutputStream::splice(self, dst, src, len)
92 }
93
94 fn blocking_splice(
95 &mut self,
96 dst: Resource<OutputStream>,
97 src: Resource<InputStream>,
98 len: u64,
99 ) -> StreamResult<u64> {
100 in_tokio(async { AsyncHostOutputStream::blocking_splice(self, dst, src, len).await })
101 }
102}
103
104impl streams::HostInputStream for ResourceTable {
105 fn drop(&mut self, stream: Resource<InputStream>) -> wasmtime::Result<()> {
106 in_tokio(async { AsyncHostInputStream::drop(self, stream).await })
107 }
108
109 fn read(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
110 AsyncHostInputStream::read(self, stream, len)
111 }
112
113 fn blocking_read(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
114 in_tokio(async { AsyncHostInputStream::blocking_read(self, stream, len).await })
115 }
116
117 fn skip(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<u64> {
118 AsyncHostInputStream::skip(self, stream, len)
119 }
120
121 fn blocking_skip(&mut self, stream: Resource<InputStream>, len: u64) -> StreamResult<u64> {
122 in_tokio(async { AsyncHostInputStream::blocking_skip(self, stream, len).await })
123 }
124
125 fn subscribe(&mut self, stream: Resource<InputStream>) -> wasmtime::Result<Resource<Pollable>> {
126 AsyncHostInputStream::subscribe(self, stream)
127 }
128}