wasmtime_wasi_tls/
host.rs

1use anyhow::Result;
2use wasmtime::component::Resource;
3use wasmtime_wasi::async_trait;
4use wasmtime_wasi::p2::Pollable;
5use wasmtime_wasi::p2::{DynInputStream, DynOutputStream, DynPollable, IoError};
6
7use crate::{
8    TlsStream, TlsTransport, WasiTls, bindings,
9    io::{
10        AsyncReadStream, AsyncWriteStream, FutureOutput, WasiFuture, WasiStreamReader,
11        WasiStreamWriter,
12    },
13};
14
15impl<'a> bindings::types::Host for WasiTls<'a> {}
16
17/// Represents the ClientHandshake which will be used to configure the handshake
18pub struct HostClientHandshake {
19    server_name: String,
20    transport: Box<dyn TlsTransport>,
21}
22
23impl<'a> bindings::types::HostClientHandshake for WasiTls<'a> {
24    fn new(
25        &mut self,
26        server_name: String,
27        input: Resource<DynInputStream>,
28        output: Resource<DynOutputStream>,
29    ) -> wasmtime::Result<Resource<HostClientHandshake>> {
30        let input = self.table.delete(input)?;
31        let output = self.table.delete(output)?;
32
33        let reader = WasiStreamReader::new(input);
34        let writer = WasiStreamWriter::new(output);
35        let transport = tokio::io::join(reader, writer);
36
37        Ok(self.table.push(HostClientHandshake {
38            server_name,
39            transport: Box::new(transport) as Box<dyn TlsTransport>,
40        })?)
41    }
42
43    fn finish(
44        &mut self,
45        this: Resource<HostClientHandshake>,
46    ) -> wasmtime::Result<Resource<HostFutureClientStreams>> {
47        let handshake = self.table.delete(this)?;
48
49        let connect = self
50            .ctx
51            .provider
52            .connect(handshake.server_name, handshake.transport);
53
54        let future = HostFutureClientStreams(WasiFuture::spawn(async move {
55            let tls_stream = connect.await?;
56
57            let (rx, tx) = tokio::io::split(tls_stream);
58            let write_stream = AsyncWriteStream::new(tx);
59            let client = HostClientConnection(write_stream.clone());
60
61            let input = Box::new(AsyncReadStream::new(rx)) as DynInputStream;
62            let output = Box::new(write_stream) as DynOutputStream;
63
64            Ok((client, input, output))
65        }));
66
67        Ok(self.table.push(future)?)
68    }
69
70    fn drop(&mut self, this: Resource<HostClientHandshake>) -> wasmtime::Result<()> {
71        self.table.delete(this)?;
72        Ok(())
73    }
74}
75
76/// Future streams provides the tls streams after the handshake is completed
77pub struct HostFutureClientStreams(
78    WasiFuture<Result<(HostClientConnection, DynInputStream, DynOutputStream), IoError>>,
79);
80
81#[async_trait]
82impl Pollable for HostFutureClientStreams {
83    async fn ready(&mut self) {
84        self.0.ready().await
85    }
86}
87
88impl<'a> bindings::types::HostFutureClientStreams for WasiTls<'a> {
89    fn subscribe(
90        &mut self,
91        this: Resource<HostFutureClientStreams>,
92    ) -> wasmtime::Result<Resource<DynPollable>> {
93        wasmtime_wasi::p2::subscribe(self.table, this)
94    }
95
96    fn get(
97        &mut self,
98        this: Resource<HostFutureClientStreams>,
99    ) -> wasmtime::Result<
100        Option<
101            Result<
102                Result<
103                    (
104                        Resource<HostClientConnection>,
105                        Resource<DynInputStream>,
106                        Resource<DynOutputStream>,
107                    ),
108                    Resource<IoError>,
109                >,
110                (),
111            >,
112        >,
113    > {
114        let future = self.table.get_mut(&this)?;
115
116        let result = match future.0.get() {
117            FutureOutput::Ready(Ok((client, input, output))) => {
118                let client = self.table.push(client)?;
119                let input = self.table.push_child(input, &client)?;
120                let output = self.table.push_child(output, &client)?;
121
122                Some(Ok(Ok((client, input, output))))
123            }
124            FutureOutput::Ready(Err(io_error)) => {
125                let io_error = self.table.push(io_error)?;
126
127                Some(Ok(Err(io_error)))
128            }
129            FutureOutput::Consumed => Some(Err(())),
130            FutureOutput::Pending => None,
131        };
132
133        Ok(result)
134    }
135
136    fn drop(&mut self, this: Resource<HostFutureClientStreams>) -> wasmtime::Result<()> {
137        self.table.delete(this)?;
138        Ok(())
139    }
140}
141
142/// Represents the client connection and used to shut down the tls stream
143pub struct HostClientConnection(
144    crate::io::AsyncWriteStream<tokio::io::WriteHalf<Box<dyn TlsStream>>>,
145);
146
147impl<'a> bindings::types::HostClientConnection for WasiTls<'a> {
148    fn close_output(&mut self, this: Resource<HostClientConnection>) -> wasmtime::Result<()> {
149        self.table.get_mut(&this)?.0.close()
150    }
151
152    fn drop(&mut self, this: Resource<HostClientConnection>) -> wasmtime::Result<()> {
153        self.table.delete(this)?;
154        Ok(())
155    }
156}