wasmtime_wasi/p2/host/
tcp.rs

1use crate::p2::bindings::{
2    sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network},
3    sockets::tcp::{self, ShutdownType},
4};
5use crate::p2::{Pollable, SocketResult};
6use crate::sockets::{SocketAddrUse, TcpSocket, WasiSocketsCtxView};
7use std::net::SocketAddr;
8use wasmtime::component::Resource;
9use wasmtime_wasi_io::{
10    poll::DynPollable,
11    streams::{DynInputStream, DynOutputStream},
12};
13
14impl tcp::Host for WasiSocketsCtxView<'_> {}
15
16impl crate::p2::host::tcp::tcp::HostTcpSocket for WasiSocketsCtxView<'_> {
17    async fn start_bind(
18        &mut self,
19        this: Resource<TcpSocket>,
20        network: Resource<Network>,
21        local_address: IpSocketAddress,
22    ) -> SocketResult<()> {
23        let network = self.table.get(&network)?;
24        let local_address: SocketAddr = local_address.into();
25
26        // Ensure that we're allowed to connect to this address.
27        network
28            .check_socket_addr(local_address, SocketAddrUse::TcpBind)
29            .await?;
30
31        // Bind to the address.
32        self.table.get_mut(&this)?.start_bind(local_address)?;
33
34        Ok(())
35    }
36
37    fn finish_bind(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
38        let socket = self.table.get_mut(&this)?;
39        socket.finish_bind()?;
40        Ok(())
41    }
42
43    async fn start_connect(
44        &mut self,
45        this: Resource<TcpSocket>,
46        network: Resource<Network>,
47        remote_address: IpSocketAddress,
48    ) -> SocketResult<()> {
49        let network = self.table.get(&network)?;
50        let remote_address: SocketAddr = remote_address.into();
51
52        // Ensure that we're allowed to connect to this address.
53        network
54            .check_socket_addr(remote_address, SocketAddrUse::TcpConnect)
55            .await?;
56
57        // Start connection
58        let socket = self.table.get_mut(&this)?;
59        let future = socket
60            .start_connect(&remote_address)?
61            .connect(remote_address);
62        socket.set_pending_connect(future)?;
63
64        Ok(())
65    }
66
67    fn finish_connect(
68        &mut self,
69        this: Resource<TcpSocket>,
70    ) -> SocketResult<(Resource<DynInputStream>, Resource<DynOutputStream>)> {
71        let socket = self.table.get_mut(&this)?;
72
73        let result = socket
74            .take_pending_connect()?
75            .ok_or(ErrorCode::WouldBlock)?;
76        socket.finish_connect(result)?;
77        let (input, output) = socket.p2_streams()?;
78        let input = self.table.push_child(input, &this)?;
79        let output = self.table.push_child(output, &this)?;
80        Ok((input, output))
81    }
82
83    fn start_listen(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
84        let socket = self.table.get_mut(&this)?;
85
86        socket.start_listen()?;
87        Ok(())
88    }
89
90    fn finish_listen(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
91        let socket = self.table.get_mut(&this)?;
92        socket.finish_listen()?;
93        Ok(())
94    }
95
96    fn accept(
97        &mut self,
98        this: Resource<TcpSocket>,
99    ) -> SocketResult<(
100        Resource<TcpSocket>,
101        Resource<DynInputStream>,
102        Resource<DynOutputStream>,
103    )> {
104        let socket = self.table.get_mut(&this)?;
105
106        let mut tcp_socket = socket.accept()?.ok_or(ErrorCode::WouldBlock)?;
107        let (input, output) = tcp_socket.p2_streams()?;
108
109        let tcp_socket = self.table.push(tcp_socket)?;
110        let input_stream = self.table.push_child(input, &tcp_socket)?;
111        let output_stream = self.table.push_child(output, &tcp_socket)?;
112
113        Ok((tcp_socket, input_stream, output_stream))
114    }
115
116    fn local_address(&mut self, this: Resource<TcpSocket>) -> SocketResult<IpSocketAddress> {
117        let socket = self.table.get(&this)?;
118        Ok(socket.local_address()?.into())
119    }
120
121    fn remote_address(&mut self, this: Resource<TcpSocket>) -> SocketResult<IpSocketAddress> {
122        let socket = self.table.get(&this)?;
123        Ok(socket.remote_address()?.into())
124    }
125
126    fn is_listening(&mut self, this: Resource<TcpSocket>) -> Result<bool, anyhow::Error> {
127        let socket = self.table.get(&this)?;
128
129        Ok(socket.is_listening())
130    }
131
132    fn address_family(
133        &mut self,
134        this: Resource<TcpSocket>,
135    ) -> Result<IpAddressFamily, anyhow::Error> {
136        let socket = self.table.get(&this)?;
137        Ok(socket.address_family().into())
138    }
139
140    fn set_listen_backlog_size(
141        &mut self,
142        this: Resource<TcpSocket>,
143        value: u64,
144    ) -> SocketResult<()> {
145        let socket = self.table.get_mut(&this)?;
146        socket.set_listen_backlog_size(value)?;
147        Ok(())
148    }
149
150    fn keep_alive_enabled(&mut self, this: Resource<TcpSocket>) -> SocketResult<bool> {
151        let socket = self.table.get(&this)?;
152        Ok(socket.keep_alive_enabled()?)
153    }
154
155    fn set_keep_alive_enabled(
156        &mut self,
157        this: Resource<TcpSocket>,
158        value: bool,
159    ) -> SocketResult<()> {
160        let socket = self.table.get(&this)?;
161        socket.set_keep_alive_enabled(value)?;
162        Ok(())
163    }
164
165    fn keep_alive_idle_time(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
166        let socket = self.table.get(&this)?;
167        Ok(socket.keep_alive_idle_time()?)
168    }
169
170    fn set_keep_alive_idle_time(
171        &mut self,
172        this: Resource<TcpSocket>,
173        value: u64,
174    ) -> SocketResult<()> {
175        let socket = self.table.get_mut(&this)?;
176        socket.set_keep_alive_idle_time(value)?;
177        Ok(())
178    }
179
180    fn keep_alive_interval(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
181        let socket = self.table.get(&this)?;
182        Ok(socket.keep_alive_interval()?)
183    }
184
185    fn set_keep_alive_interval(
186        &mut self,
187        this: Resource<TcpSocket>,
188        value: u64,
189    ) -> SocketResult<()> {
190        let socket = self.table.get(&this)?;
191        socket.set_keep_alive_interval(value)?;
192        Ok(())
193    }
194
195    fn keep_alive_count(&mut self, this: Resource<TcpSocket>) -> SocketResult<u32> {
196        let socket = self.table.get(&this)?;
197        Ok(socket.keep_alive_count()?)
198    }
199
200    fn set_keep_alive_count(&mut self, this: Resource<TcpSocket>, value: u32) -> SocketResult<()> {
201        let socket = self.table.get(&this)?;
202        socket.set_keep_alive_count(value)?;
203        Ok(())
204    }
205
206    fn hop_limit(&mut self, this: Resource<TcpSocket>) -> SocketResult<u8> {
207        let socket = self.table.get(&this)?;
208        Ok(socket.hop_limit()?)
209    }
210
211    fn set_hop_limit(&mut self, this: Resource<TcpSocket>, value: u8) -> SocketResult<()> {
212        let socket = self.table.get_mut(&this)?;
213        socket.set_hop_limit(value)?;
214        Ok(())
215    }
216
217    fn receive_buffer_size(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
218        let socket = self.table.get(&this)?;
219        Ok(socket.receive_buffer_size()?)
220    }
221
222    fn set_receive_buffer_size(
223        &mut self,
224        this: Resource<TcpSocket>,
225        value: u64,
226    ) -> SocketResult<()> {
227        let socket = self.table.get_mut(&this)?;
228        socket.set_receive_buffer_size(value)?;
229        Ok(())
230    }
231
232    fn send_buffer_size(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
233        let socket = self.table.get(&this)?;
234        Ok(socket.send_buffer_size()?)
235    }
236
237    fn set_send_buffer_size(&mut self, this: Resource<TcpSocket>, value: u64) -> SocketResult<()> {
238        let socket = self.table.get_mut(&this)?;
239        socket.set_send_buffer_size(value)?;
240        Ok(())
241    }
242
243    fn subscribe(&mut self, this: Resource<TcpSocket>) -> anyhow::Result<Resource<DynPollable>> {
244        wasmtime_wasi_io::poll::subscribe(self.table, this)
245    }
246
247    fn shutdown(
248        &mut self,
249        this: Resource<TcpSocket>,
250        shutdown_type: ShutdownType,
251    ) -> SocketResult<()> {
252        let socket = self.table.get(&this)?;
253
254        let how = match shutdown_type {
255            ShutdownType::Receive => std::net::Shutdown::Read,
256            ShutdownType::Send => std::net::Shutdown::Write,
257            ShutdownType::Both => std::net::Shutdown::Both,
258        };
259
260        let state = socket.p2_streaming_state()?;
261        state.shutdown(how)?;
262        Ok(())
263    }
264
265    fn drop(&mut self, this: Resource<TcpSocket>) -> Result<(), anyhow::Error> {
266        // As in the filesystem implementation, we assume closing a socket
267        // doesn't block.
268        let dropped = self.table.delete(this)?;
269        drop(dropped);
270
271        Ok(())
272    }
273}
274
275#[async_trait::async_trait]
276impl Pollable for TcpSocket {
277    async fn ready(&mut self) {
278        <TcpSocket>::ready(self).await;
279    }
280}
281
282pub mod sync {
283    use crate::p2::{
284        SocketError,
285        bindings::{
286            sockets::{
287                network::Network,
288                tcp::{self as async_tcp, HostTcpSocket as AsyncHostTcpSocket},
289            },
290            sync::sockets::tcp::{
291                self, Duration, HostTcpSocket, InputStream, IpAddressFamily, IpSocketAddress,
292                OutputStream, Pollable, ShutdownType, TcpSocket,
293            },
294        },
295    };
296    use crate::runtime::in_tokio;
297    use crate::sockets::WasiSocketsCtxView;
298    use wasmtime::component::Resource;
299
300    impl tcp::Host for WasiSocketsCtxView<'_> {}
301
302    impl HostTcpSocket for WasiSocketsCtxView<'_> {
303        fn start_bind(
304            &mut self,
305            self_: Resource<TcpSocket>,
306            network: Resource<Network>,
307            local_address: IpSocketAddress,
308        ) -> Result<(), SocketError> {
309            in_tokio(async {
310                AsyncHostTcpSocket::start_bind(self, self_, network, local_address).await
311            })
312        }
313
314        fn finish_bind(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
315            AsyncHostTcpSocket::finish_bind(self, self_)
316        }
317
318        fn start_connect(
319            &mut self,
320            self_: Resource<TcpSocket>,
321            network: Resource<Network>,
322            remote_address: IpSocketAddress,
323        ) -> Result<(), SocketError> {
324            in_tokio(async {
325                AsyncHostTcpSocket::start_connect(self, self_, network, remote_address).await
326            })
327        }
328
329        fn finish_connect(
330            &mut self,
331            self_: Resource<TcpSocket>,
332        ) -> Result<(Resource<InputStream>, Resource<OutputStream>), SocketError> {
333            AsyncHostTcpSocket::finish_connect(self, self_)
334        }
335
336        fn start_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
337            AsyncHostTcpSocket::start_listen(self, self_)
338        }
339
340        fn finish_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
341            AsyncHostTcpSocket::finish_listen(self, self_)
342        }
343
344        fn accept(
345            &mut self,
346            self_: Resource<TcpSocket>,
347        ) -> Result<
348            (
349                Resource<TcpSocket>,
350                Resource<InputStream>,
351                Resource<OutputStream>,
352            ),
353            SocketError,
354        > {
355            AsyncHostTcpSocket::accept(self, self_)
356        }
357
358        fn local_address(
359            &mut self,
360            self_: Resource<TcpSocket>,
361        ) -> Result<IpSocketAddress, SocketError> {
362            AsyncHostTcpSocket::local_address(self, self_)
363        }
364
365        fn remote_address(
366            &mut self,
367            self_: Resource<TcpSocket>,
368        ) -> Result<IpSocketAddress, SocketError> {
369            AsyncHostTcpSocket::remote_address(self, self_)
370        }
371
372        fn is_listening(&mut self, self_: Resource<TcpSocket>) -> wasmtime::Result<bool> {
373            AsyncHostTcpSocket::is_listening(self, self_)
374        }
375
376        fn address_family(
377            &mut self,
378            self_: Resource<TcpSocket>,
379        ) -> wasmtime::Result<IpAddressFamily> {
380            AsyncHostTcpSocket::address_family(self, self_)
381        }
382
383        fn set_listen_backlog_size(
384            &mut self,
385            self_: Resource<TcpSocket>,
386            value: u64,
387        ) -> Result<(), SocketError> {
388            AsyncHostTcpSocket::set_listen_backlog_size(self, self_, value)
389        }
390
391        fn keep_alive_enabled(&mut self, self_: Resource<TcpSocket>) -> Result<bool, SocketError> {
392            AsyncHostTcpSocket::keep_alive_enabled(self, self_)
393        }
394
395        fn set_keep_alive_enabled(
396            &mut self,
397            self_: Resource<TcpSocket>,
398            value: bool,
399        ) -> Result<(), SocketError> {
400            AsyncHostTcpSocket::set_keep_alive_enabled(self, self_, value)
401        }
402
403        fn keep_alive_idle_time(
404            &mut self,
405            self_: Resource<TcpSocket>,
406        ) -> Result<Duration, SocketError> {
407            AsyncHostTcpSocket::keep_alive_idle_time(self, self_)
408        }
409
410        fn set_keep_alive_idle_time(
411            &mut self,
412            self_: Resource<TcpSocket>,
413            value: Duration,
414        ) -> Result<(), SocketError> {
415            AsyncHostTcpSocket::set_keep_alive_idle_time(self, self_, value)
416        }
417
418        fn keep_alive_interval(
419            &mut self,
420            self_: Resource<TcpSocket>,
421        ) -> Result<Duration, SocketError> {
422            AsyncHostTcpSocket::keep_alive_interval(self, self_)
423        }
424
425        fn set_keep_alive_interval(
426            &mut self,
427            self_: Resource<TcpSocket>,
428            value: Duration,
429        ) -> Result<(), SocketError> {
430            AsyncHostTcpSocket::set_keep_alive_interval(self, self_, value)
431        }
432
433        fn keep_alive_count(&mut self, self_: Resource<TcpSocket>) -> Result<u32, SocketError> {
434            AsyncHostTcpSocket::keep_alive_count(self, self_)
435        }
436
437        fn set_keep_alive_count(
438            &mut self,
439            self_: Resource<TcpSocket>,
440            value: u32,
441        ) -> Result<(), SocketError> {
442            AsyncHostTcpSocket::set_keep_alive_count(self, self_, value)
443        }
444
445        fn hop_limit(&mut self, self_: Resource<TcpSocket>) -> Result<u8, SocketError> {
446            AsyncHostTcpSocket::hop_limit(self, self_)
447        }
448
449        fn set_hop_limit(
450            &mut self,
451            self_: Resource<TcpSocket>,
452            value: u8,
453        ) -> Result<(), SocketError> {
454            AsyncHostTcpSocket::set_hop_limit(self, self_, value)
455        }
456
457        fn receive_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
458            AsyncHostTcpSocket::receive_buffer_size(self, self_)
459        }
460
461        fn set_receive_buffer_size(
462            &mut self,
463            self_: Resource<TcpSocket>,
464            value: u64,
465        ) -> Result<(), SocketError> {
466            AsyncHostTcpSocket::set_receive_buffer_size(self, self_, value)
467        }
468
469        fn send_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
470            AsyncHostTcpSocket::send_buffer_size(self, self_)
471        }
472
473        fn set_send_buffer_size(
474            &mut self,
475            self_: Resource<TcpSocket>,
476            value: u64,
477        ) -> Result<(), SocketError> {
478            AsyncHostTcpSocket::set_send_buffer_size(self, self_, value)
479        }
480
481        fn subscribe(
482            &mut self,
483            self_: Resource<TcpSocket>,
484        ) -> wasmtime::Result<Resource<Pollable>> {
485            AsyncHostTcpSocket::subscribe(self, self_)
486        }
487
488        fn shutdown(
489            &mut self,
490            self_: Resource<TcpSocket>,
491            shutdown_type: ShutdownType,
492        ) -> Result<(), SocketError> {
493            AsyncHostTcpSocket::shutdown(self, self_, shutdown_type.into())
494        }
495
496        fn drop(&mut self, rep: Resource<TcpSocket>) -> wasmtime::Result<()> {
497            AsyncHostTcpSocket::drop(self, rep)
498        }
499    }
500
501    impl From<ShutdownType> for async_tcp::ShutdownType {
502        fn from(other: ShutdownType) -> Self {
503            match other {
504                ShutdownType::Receive => async_tcp::ShutdownType::Receive,
505                ShutdownType::Send => async_tcp::ShutdownType::Send,
506                ShutdownType::Both => async_tcp::ShutdownType::Both,
507            }
508        }
509    }
510}