wasmtime_wasi/p2/host/
tcp.rs

1use crate::net::{SocketAddrUse, SocketAddressFamily};
2use crate::p2::bindings::{
3    sockets::network::{IpAddressFamily, IpSocketAddress, Network},
4    sockets::tcp::{self, ShutdownType},
5};
6use crate::p2::{SocketResult, WasiImpl, WasiView};
7use std::net::SocketAddr;
8use std::time::Duration;
9use wasmtime::component::Resource;
10use wasmtime_wasi_io::{
11    poll::DynPollable,
12    streams::{DynInputStream, DynOutputStream},
13    IoView,
14};
15
16impl<T> tcp::Host for WasiImpl<T> where T: WasiView {}
17
18impl<T> crate::p2::host::tcp::tcp::HostTcpSocket for WasiImpl<T>
19where
20    T: WasiView,
21{
22    async fn start_bind(
23        &mut self,
24        this: Resource<tcp::TcpSocket>,
25        network: Resource<Network>,
26        local_address: IpSocketAddress,
27    ) -> SocketResult<()> {
28        self.ctx().allowed_network_uses.check_allowed_tcp()?;
29        let table = self.table();
30        let network = table.get(&network)?;
31        let local_address: SocketAddr = local_address.into();
32
33        // Ensure that we're allowed to connect to this address.
34        network
35            .check_socket_addr(local_address, SocketAddrUse::TcpBind)
36            .await?;
37
38        // Bind to the address.
39        table.get_mut(&this)?.start_bind(local_address)?;
40
41        Ok(())
42    }
43
44    fn finish_bind(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
45        let table = self.table();
46        let socket = table.get_mut(&this)?;
47
48        socket.finish_bind()
49    }
50
51    async fn start_connect(
52        &mut self,
53        this: Resource<tcp::TcpSocket>,
54        network: Resource<Network>,
55        remote_address: IpSocketAddress,
56    ) -> SocketResult<()> {
57        self.ctx().allowed_network_uses.check_allowed_tcp()?;
58        let table = self.table();
59        let network = table.get(&network)?;
60        let remote_address: SocketAddr = remote_address.into();
61
62        // Ensure that we're allowed to connect to this address.
63        network
64            .check_socket_addr(remote_address, SocketAddrUse::TcpConnect)
65            .await?;
66
67        // Start connection
68        table.get_mut(&this)?.start_connect(remote_address)?;
69
70        Ok(())
71    }
72
73    fn finish_connect(
74        &mut self,
75        this: Resource<tcp::TcpSocket>,
76    ) -> SocketResult<(Resource<DynInputStream>, Resource<DynOutputStream>)> {
77        let table = self.table();
78        let socket = table.get_mut(&this)?;
79
80        let (input, output) = socket.finish_connect()?;
81
82        let input_stream = self.table().push_child(input, &this)?;
83        let output_stream = self.table().push_child(output, &this)?;
84
85        Ok((input_stream, output_stream))
86    }
87
88    fn start_listen(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
89        self.ctx().allowed_network_uses.check_allowed_tcp()?;
90        let table = self.table();
91        let socket = table.get_mut(&this)?;
92
93        socket.start_listen()
94    }
95
96    fn finish_listen(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
97        let table = self.table();
98        let socket = table.get_mut(&this)?;
99        socket.finish_listen()
100    }
101
102    fn accept(
103        &mut self,
104        this: Resource<tcp::TcpSocket>,
105    ) -> SocketResult<(
106        Resource<tcp::TcpSocket>,
107        Resource<DynInputStream>,
108        Resource<DynOutputStream>,
109    )> {
110        self.ctx().allowed_network_uses.check_allowed_tcp()?;
111        let table = self.table();
112        let socket = table.get_mut(&this)?;
113
114        let (tcp_socket, input, output) = socket.accept()?;
115
116        let tcp_socket = self.table().push(tcp_socket)?;
117        let input_stream = self.table().push_child(input, &tcp_socket)?;
118        let output_stream = self.table().push_child(output, &tcp_socket)?;
119
120        Ok((tcp_socket, input_stream, output_stream))
121    }
122
123    fn local_address(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<IpSocketAddress> {
124        let table = self.table();
125        let socket = table.get(&this)?;
126
127        socket.local_address().map(Into::into)
128    }
129
130    fn remote_address(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<IpSocketAddress> {
131        let table = self.table();
132        let socket = table.get(&this)?;
133
134        socket.remote_address().map(Into::into)
135    }
136
137    fn is_listening(&mut self, this: Resource<tcp::TcpSocket>) -> Result<bool, anyhow::Error> {
138        let table = self.table();
139        let socket = table.get(&this)?;
140
141        Ok(socket.is_listening())
142    }
143
144    fn address_family(
145        &mut self,
146        this: Resource<tcp::TcpSocket>,
147    ) -> Result<IpAddressFamily, anyhow::Error> {
148        let table = self.table();
149        let socket = table.get(&this)?;
150
151        match socket.address_family() {
152            SocketAddressFamily::Ipv4 => Ok(IpAddressFamily::Ipv4),
153            SocketAddressFamily::Ipv6 => Ok(IpAddressFamily::Ipv6),
154        }
155    }
156
157    fn set_listen_backlog_size(
158        &mut self,
159        this: Resource<tcp::TcpSocket>,
160        value: u64,
161    ) -> SocketResult<()> {
162        let table = self.table();
163        let socket = table.get_mut(&this)?;
164
165        // Silently clamp backlog size. This is OK for us to do, because operating systems do this too.
166        let value = value.try_into().unwrap_or(u32::MAX);
167
168        socket.set_listen_backlog_size(value)
169    }
170
171    fn keep_alive_enabled(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<bool> {
172        let table = self.table();
173        let socket = table.get(&this)?;
174        socket.keep_alive_enabled()
175    }
176
177    fn set_keep_alive_enabled(
178        &mut self,
179        this: Resource<tcp::TcpSocket>,
180        value: bool,
181    ) -> SocketResult<()> {
182        let table = self.table();
183        let socket = table.get(&this)?;
184        socket.set_keep_alive_enabled(value)
185    }
186
187    fn keep_alive_idle_time(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
188        let table = self.table();
189        let socket = table.get(&this)?;
190        Ok(socket.keep_alive_idle_time()?.as_nanos() as u64)
191    }
192
193    fn set_keep_alive_idle_time(
194        &mut self,
195        this: Resource<tcp::TcpSocket>,
196        value: u64,
197    ) -> SocketResult<()> {
198        let table = self.table();
199        let socket = table.get_mut(&this)?;
200        let duration = Duration::from_nanos(value);
201        socket.set_keep_alive_idle_time(duration)
202    }
203
204    fn keep_alive_interval(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
205        let table = self.table();
206        let socket = table.get(&this)?;
207        Ok(socket.keep_alive_interval()?.as_nanos() as u64)
208    }
209
210    fn set_keep_alive_interval(
211        &mut self,
212        this: Resource<tcp::TcpSocket>,
213        value: u64,
214    ) -> SocketResult<()> {
215        let table = self.table();
216        let socket = table.get(&this)?;
217        socket.set_keep_alive_interval(Duration::from_nanos(value))
218    }
219
220    fn keep_alive_count(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u32> {
221        let table = self.table();
222        let socket = table.get(&this)?;
223        socket.keep_alive_count()
224    }
225
226    fn set_keep_alive_count(
227        &mut self,
228        this: Resource<tcp::TcpSocket>,
229        value: u32,
230    ) -> SocketResult<()> {
231        let table = self.table();
232        let socket = table.get(&this)?;
233        socket.set_keep_alive_count(value)
234    }
235
236    fn hop_limit(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u8> {
237        let table = self.table();
238        let socket = table.get(&this)?;
239        socket.hop_limit()
240    }
241
242    fn set_hop_limit(&mut self, this: Resource<tcp::TcpSocket>, value: u8) -> SocketResult<()> {
243        let table = self.table();
244        let socket = table.get_mut(&this)?;
245        socket.set_hop_limit(value)
246    }
247
248    fn receive_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
249        let table = self.table();
250        let socket = table.get(&this)?;
251
252        Ok(socket.receive_buffer_size()? as u64)
253    }
254
255    fn set_receive_buffer_size(
256        &mut self,
257        this: Resource<tcp::TcpSocket>,
258        value: u64,
259    ) -> SocketResult<()> {
260        let table = self.table();
261        let socket = table.get_mut(&this)?;
262        let value = value.try_into().unwrap_or(usize::MAX);
263        socket.set_receive_buffer_size(value)
264    }
265
266    fn send_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
267        let table = self.table();
268        let socket = table.get(&this)?;
269
270        Ok(socket.send_buffer_size()? as u64)
271    }
272
273    fn set_send_buffer_size(
274        &mut self,
275        this: Resource<tcp::TcpSocket>,
276        value: u64,
277    ) -> SocketResult<()> {
278        let table = self.table();
279        let socket = table.get_mut(&this)?;
280        let value = value.try_into().unwrap_or(usize::MAX);
281        socket.set_send_buffer_size(value)
282    }
283
284    fn subscribe(
285        &mut self,
286        this: Resource<tcp::TcpSocket>,
287    ) -> anyhow::Result<Resource<DynPollable>> {
288        wasmtime_wasi_io::poll::subscribe(self.table(), this)
289    }
290
291    fn shutdown(
292        &mut self,
293        this: Resource<tcp::TcpSocket>,
294        shutdown_type: ShutdownType,
295    ) -> SocketResult<()> {
296        let table = self.table();
297        let socket = table.get(&this)?;
298
299        let how = match shutdown_type {
300            ShutdownType::Receive => std::net::Shutdown::Read,
301            ShutdownType::Send => std::net::Shutdown::Write,
302            ShutdownType::Both => std::net::Shutdown::Both,
303        };
304        socket.shutdown(how)
305    }
306
307    fn drop(&mut self, this: Resource<tcp::TcpSocket>) -> Result<(), anyhow::Error> {
308        let table = self.table();
309
310        // As in the filesystem implementation, we assume closing a socket
311        // doesn't block.
312        let dropped = table.delete(this)?;
313        drop(dropped);
314
315        Ok(())
316    }
317}
318
319pub mod sync {
320    use wasmtime::component::Resource;
321
322    use crate::p2::{
323        bindings::{
324            sockets::{
325                network::Network,
326                tcp::{self as async_tcp, HostTcpSocket as AsyncHostTcpSocket},
327            },
328            sync::sockets::tcp::{
329                self, Duration, HostTcpSocket, InputStream, IpAddressFamily, IpSocketAddress,
330                OutputStream, Pollable, ShutdownType, TcpSocket,
331            },
332        },
333        SocketError, WasiImpl, WasiView,
334    };
335    use crate::runtime::in_tokio;
336
337    impl<T> tcp::Host for WasiImpl<T> where T: WasiView {}
338
339    impl<T> HostTcpSocket for WasiImpl<T>
340    where
341        T: WasiView,
342    {
343        fn start_bind(
344            &mut self,
345            self_: Resource<TcpSocket>,
346            network: Resource<Network>,
347            local_address: IpSocketAddress,
348        ) -> Result<(), SocketError> {
349            in_tokio(async {
350                AsyncHostTcpSocket::start_bind(self, self_, network, local_address).await
351            })
352        }
353
354        fn finish_bind(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
355            AsyncHostTcpSocket::finish_bind(self, self_)
356        }
357
358        fn start_connect(
359            &mut self,
360            self_: Resource<TcpSocket>,
361            network: Resource<Network>,
362            remote_address: IpSocketAddress,
363        ) -> Result<(), SocketError> {
364            in_tokio(async {
365                AsyncHostTcpSocket::start_connect(self, self_, network, remote_address).await
366            })
367        }
368
369        fn finish_connect(
370            &mut self,
371            self_: Resource<TcpSocket>,
372        ) -> Result<(Resource<InputStream>, Resource<OutputStream>), SocketError> {
373            AsyncHostTcpSocket::finish_connect(self, self_)
374        }
375
376        fn start_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
377            AsyncHostTcpSocket::start_listen(self, self_)
378        }
379
380        fn finish_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
381            AsyncHostTcpSocket::finish_listen(self, self_)
382        }
383
384        fn accept(
385            &mut self,
386            self_: Resource<TcpSocket>,
387        ) -> Result<
388            (
389                Resource<TcpSocket>,
390                Resource<InputStream>,
391                Resource<OutputStream>,
392            ),
393            SocketError,
394        > {
395            AsyncHostTcpSocket::accept(self, self_)
396        }
397
398        fn local_address(
399            &mut self,
400            self_: Resource<TcpSocket>,
401        ) -> Result<IpSocketAddress, SocketError> {
402            AsyncHostTcpSocket::local_address(self, self_)
403        }
404
405        fn remote_address(
406            &mut self,
407            self_: Resource<TcpSocket>,
408        ) -> Result<IpSocketAddress, SocketError> {
409            AsyncHostTcpSocket::remote_address(self, self_)
410        }
411
412        fn is_listening(&mut self, self_: Resource<TcpSocket>) -> wasmtime::Result<bool> {
413            AsyncHostTcpSocket::is_listening(self, self_)
414        }
415
416        fn address_family(
417            &mut self,
418            self_: Resource<TcpSocket>,
419        ) -> wasmtime::Result<IpAddressFamily> {
420            AsyncHostTcpSocket::address_family(self, self_)
421        }
422
423        fn set_listen_backlog_size(
424            &mut self,
425            self_: Resource<TcpSocket>,
426            value: u64,
427        ) -> Result<(), SocketError> {
428            AsyncHostTcpSocket::set_listen_backlog_size(self, self_, value)
429        }
430
431        fn keep_alive_enabled(&mut self, self_: Resource<TcpSocket>) -> Result<bool, SocketError> {
432            AsyncHostTcpSocket::keep_alive_enabled(self, self_)
433        }
434
435        fn set_keep_alive_enabled(
436            &mut self,
437            self_: Resource<TcpSocket>,
438            value: bool,
439        ) -> Result<(), SocketError> {
440            AsyncHostTcpSocket::set_keep_alive_enabled(self, self_, value)
441        }
442
443        fn keep_alive_idle_time(
444            &mut self,
445            self_: Resource<TcpSocket>,
446        ) -> Result<Duration, SocketError> {
447            AsyncHostTcpSocket::keep_alive_idle_time(self, self_)
448        }
449
450        fn set_keep_alive_idle_time(
451            &mut self,
452            self_: Resource<TcpSocket>,
453            value: Duration,
454        ) -> Result<(), SocketError> {
455            AsyncHostTcpSocket::set_keep_alive_idle_time(self, self_, value)
456        }
457
458        fn keep_alive_interval(
459            &mut self,
460            self_: Resource<TcpSocket>,
461        ) -> Result<Duration, SocketError> {
462            AsyncHostTcpSocket::keep_alive_interval(self, self_)
463        }
464
465        fn set_keep_alive_interval(
466            &mut self,
467            self_: Resource<TcpSocket>,
468            value: Duration,
469        ) -> Result<(), SocketError> {
470            AsyncHostTcpSocket::set_keep_alive_interval(self, self_, value)
471        }
472
473        fn keep_alive_count(&mut self, self_: Resource<TcpSocket>) -> Result<u32, SocketError> {
474            AsyncHostTcpSocket::keep_alive_count(self, self_)
475        }
476
477        fn set_keep_alive_count(
478            &mut self,
479            self_: Resource<TcpSocket>,
480            value: u32,
481        ) -> Result<(), SocketError> {
482            AsyncHostTcpSocket::set_keep_alive_count(self, self_, value)
483        }
484
485        fn hop_limit(&mut self, self_: Resource<TcpSocket>) -> Result<u8, SocketError> {
486            AsyncHostTcpSocket::hop_limit(self, self_)
487        }
488
489        fn set_hop_limit(
490            &mut self,
491            self_: Resource<TcpSocket>,
492            value: u8,
493        ) -> Result<(), SocketError> {
494            AsyncHostTcpSocket::set_hop_limit(self, self_, value)
495        }
496
497        fn receive_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
498            AsyncHostTcpSocket::receive_buffer_size(self, self_)
499        }
500
501        fn set_receive_buffer_size(
502            &mut self,
503            self_: Resource<TcpSocket>,
504            value: u64,
505        ) -> Result<(), SocketError> {
506            AsyncHostTcpSocket::set_receive_buffer_size(self, self_, value)
507        }
508
509        fn send_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
510            AsyncHostTcpSocket::send_buffer_size(self, self_)
511        }
512
513        fn set_send_buffer_size(
514            &mut self,
515            self_: Resource<TcpSocket>,
516            value: u64,
517        ) -> Result<(), SocketError> {
518            AsyncHostTcpSocket::set_send_buffer_size(self, self_, value)
519        }
520
521        fn subscribe(
522            &mut self,
523            self_: Resource<TcpSocket>,
524        ) -> wasmtime::Result<Resource<Pollable>> {
525            AsyncHostTcpSocket::subscribe(self, self_)
526        }
527
528        fn shutdown(
529            &mut self,
530            self_: Resource<TcpSocket>,
531            shutdown_type: ShutdownType,
532        ) -> Result<(), SocketError> {
533            AsyncHostTcpSocket::shutdown(self, self_, shutdown_type.into())
534        }
535
536        fn drop(&mut self, rep: Resource<TcpSocket>) -> wasmtime::Result<()> {
537            AsyncHostTcpSocket::drop(self, rep)
538        }
539    }
540
541    impl From<ShutdownType> for async_tcp::ShutdownType {
542        fn from(other: ShutdownType) -> Self {
543            match other {
544                ShutdownType::Receive => async_tcp::ShutdownType::Receive,
545                ShutdownType::Send => async_tcp::ShutdownType::Send,
546                ShutdownType::Both => async_tcp::ShutdownType::Both,
547            }
548        }
549    }
550}