wasmtime_wasi/host/
tcp.rs

1use crate::network::SocketAddrUse;
2use crate::{
3    bindings::{
4        sockets::network::{IpAddressFamily, IpSocketAddress, Network},
5        sockets::tcp::{self, ShutdownType},
6    },
7    network::SocketAddressFamily,
8};
9use crate::{SocketResult, WasiImpl, WasiView};
10use std::net::SocketAddr;
11use std::time::Duration;
12use wasmtime::component::Resource;
13use wasmtime_wasi_io::{
14    poll::DynPollable,
15    streams::{DynInputStream, DynOutputStream},
16    IoView,
17};
18
19impl<T> tcp::Host for WasiImpl<T> where T: WasiView {}
20
21impl<T> crate::host::tcp::tcp::HostTcpSocket for WasiImpl<T>
22where
23    T: WasiView,
24{
25    async fn start_bind(
26        &mut self,
27        this: Resource<tcp::TcpSocket>,
28        network: Resource<Network>,
29        local_address: IpSocketAddress,
30    ) -> SocketResult<()> {
31        self.ctx().allowed_network_uses.check_allowed_tcp()?;
32        let table = self.table();
33        let network = table.get(&network)?;
34        let local_address: SocketAddr = local_address.into();
35
36        // Ensure that we're allowed to connect to this address.
37        network
38            .check_socket_addr(local_address, SocketAddrUse::TcpBind)
39            .await?;
40
41        // Bind to the address.
42        table.get_mut(&this)?.start_bind(local_address)?;
43
44        Ok(())
45    }
46
47    fn finish_bind(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
48        let table = self.table();
49        let socket = table.get_mut(&this)?;
50
51        socket.finish_bind()
52    }
53
54    async fn start_connect(
55        &mut self,
56        this: Resource<tcp::TcpSocket>,
57        network: Resource<Network>,
58        remote_address: IpSocketAddress,
59    ) -> SocketResult<()> {
60        self.ctx().allowed_network_uses.check_allowed_tcp()?;
61        let table = self.table();
62        let network = table.get(&network)?;
63        let remote_address: SocketAddr = remote_address.into();
64
65        // Ensure that we're allowed to connect to this address.
66        network
67            .check_socket_addr(remote_address, SocketAddrUse::TcpConnect)
68            .await?;
69
70        // Start connection
71        table.get_mut(&this)?.start_connect(remote_address)?;
72
73        Ok(())
74    }
75
76    fn finish_connect(
77        &mut self,
78        this: Resource<tcp::TcpSocket>,
79    ) -> SocketResult<(Resource<DynInputStream>, Resource<DynOutputStream>)> {
80        let table = self.table();
81        let socket = table.get_mut(&this)?;
82
83        let (input, output) = socket.finish_connect()?;
84
85        let input_stream = self.table().push_child(input, &this)?;
86        let output_stream = self.table().push_child(output, &this)?;
87
88        Ok((input_stream, output_stream))
89    }
90
91    fn start_listen(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
92        self.ctx().allowed_network_uses.check_allowed_tcp()?;
93        let table = self.table();
94        let socket = table.get_mut(&this)?;
95
96        socket.start_listen()
97    }
98
99    fn finish_listen(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
100        let table = self.table();
101        let socket = table.get_mut(&this)?;
102        socket.finish_listen()
103    }
104
105    fn accept(
106        &mut self,
107        this: Resource<tcp::TcpSocket>,
108    ) -> SocketResult<(
109        Resource<tcp::TcpSocket>,
110        Resource<DynInputStream>,
111        Resource<DynOutputStream>,
112    )> {
113        self.ctx().allowed_network_uses.check_allowed_tcp()?;
114        let table = self.table();
115        let socket = table.get_mut(&this)?;
116
117        let (tcp_socket, input, output) = socket.accept()?;
118
119        let tcp_socket = self.table().push(tcp_socket)?;
120        let input_stream = self.table().push_child(input, &tcp_socket)?;
121        let output_stream = self.table().push_child(output, &tcp_socket)?;
122
123        Ok((tcp_socket, input_stream, output_stream))
124    }
125
126    fn local_address(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<IpSocketAddress> {
127        let table = self.table();
128        let socket = table.get(&this)?;
129
130        socket.local_address().map(Into::into)
131    }
132
133    fn remote_address(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<IpSocketAddress> {
134        let table = self.table();
135        let socket = table.get(&this)?;
136
137        socket.remote_address().map(Into::into)
138    }
139
140    fn is_listening(&mut self, this: Resource<tcp::TcpSocket>) -> Result<bool, anyhow::Error> {
141        let table = self.table();
142        let socket = table.get(&this)?;
143
144        Ok(socket.is_listening())
145    }
146
147    fn address_family(
148        &mut self,
149        this: Resource<tcp::TcpSocket>,
150    ) -> Result<IpAddressFamily, anyhow::Error> {
151        let table = self.table();
152        let socket = table.get(&this)?;
153
154        match socket.address_family() {
155            SocketAddressFamily::Ipv4 => Ok(IpAddressFamily::Ipv4),
156            SocketAddressFamily::Ipv6 => Ok(IpAddressFamily::Ipv6),
157        }
158    }
159
160    fn set_listen_backlog_size(
161        &mut self,
162        this: Resource<tcp::TcpSocket>,
163        value: u64,
164    ) -> SocketResult<()> {
165        let table = self.table();
166        let socket = table.get_mut(&this)?;
167
168        // Silently clamp backlog size. This is OK for us to do, because operating systems do this too.
169        let value = value.try_into().unwrap_or(u32::MAX);
170
171        socket.set_listen_backlog_size(value)
172    }
173
174    fn keep_alive_enabled(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<bool> {
175        let table = self.table();
176        let socket = table.get(&this)?;
177        socket.keep_alive_enabled()
178    }
179
180    fn set_keep_alive_enabled(
181        &mut self,
182        this: Resource<tcp::TcpSocket>,
183        value: bool,
184    ) -> SocketResult<()> {
185        let table = self.table();
186        let socket = table.get(&this)?;
187        socket.set_keep_alive_enabled(value)
188    }
189
190    fn keep_alive_idle_time(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
191        let table = self.table();
192        let socket = table.get(&this)?;
193        Ok(socket.keep_alive_idle_time()?.as_nanos() as u64)
194    }
195
196    fn set_keep_alive_idle_time(
197        &mut self,
198        this: Resource<tcp::TcpSocket>,
199        value: u64,
200    ) -> SocketResult<()> {
201        let table = self.table();
202        let socket = table.get_mut(&this)?;
203        let duration = Duration::from_nanos(value);
204        socket.set_keep_alive_idle_time(duration)
205    }
206
207    fn keep_alive_interval(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
208        let table = self.table();
209        let socket = table.get(&this)?;
210        Ok(socket.keep_alive_interval()?.as_nanos() as u64)
211    }
212
213    fn set_keep_alive_interval(
214        &mut self,
215        this: Resource<tcp::TcpSocket>,
216        value: u64,
217    ) -> SocketResult<()> {
218        let table = self.table();
219        let socket = table.get(&this)?;
220        socket.set_keep_alive_interval(Duration::from_nanos(value))
221    }
222
223    fn keep_alive_count(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u32> {
224        let table = self.table();
225        let socket = table.get(&this)?;
226        socket.keep_alive_count()
227    }
228
229    fn set_keep_alive_count(
230        &mut self,
231        this: Resource<tcp::TcpSocket>,
232        value: u32,
233    ) -> SocketResult<()> {
234        let table = self.table();
235        let socket = table.get(&this)?;
236        socket.set_keep_alive_count(value)
237    }
238
239    fn hop_limit(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u8> {
240        let table = self.table();
241        let socket = table.get(&this)?;
242        socket.hop_limit()
243    }
244
245    fn set_hop_limit(&mut self, this: Resource<tcp::TcpSocket>, value: u8) -> SocketResult<()> {
246        let table = self.table();
247        let socket = table.get_mut(&this)?;
248        socket.set_hop_limit(value)
249    }
250
251    fn receive_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
252        let table = self.table();
253        let socket = table.get(&this)?;
254
255        Ok(socket.receive_buffer_size()? as u64)
256    }
257
258    fn set_receive_buffer_size(
259        &mut self,
260        this: Resource<tcp::TcpSocket>,
261        value: u64,
262    ) -> SocketResult<()> {
263        let table = self.table();
264        let socket = table.get_mut(&this)?;
265        let value = value.try_into().unwrap_or(usize::MAX);
266        socket.set_receive_buffer_size(value)
267    }
268
269    fn send_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
270        let table = self.table();
271        let socket = table.get(&this)?;
272
273        Ok(socket.send_buffer_size()? as u64)
274    }
275
276    fn set_send_buffer_size(
277        &mut self,
278        this: Resource<tcp::TcpSocket>,
279        value: u64,
280    ) -> SocketResult<()> {
281        let table = self.table();
282        let socket = table.get_mut(&this)?;
283        let value = value.try_into().unwrap_or(usize::MAX);
284        socket.set_send_buffer_size(value)
285    }
286
287    fn subscribe(
288        &mut self,
289        this: Resource<tcp::TcpSocket>,
290    ) -> anyhow::Result<Resource<DynPollable>> {
291        wasmtime_wasi_io::poll::subscribe(self.table(), this)
292    }
293
294    fn shutdown(
295        &mut self,
296        this: Resource<tcp::TcpSocket>,
297        shutdown_type: ShutdownType,
298    ) -> SocketResult<()> {
299        let table = self.table();
300        let socket = table.get(&this)?;
301
302        let how = match shutdown_type {
303            ShutdownType::Receive => std::net::Shutdown::Read,
304            ShutdownType::Send => std::net::Shutdown::Write,
305            ShutdownType::Both => std::net::Shutdown::Both,
306        };
307        socket.shutdown(how)
308    }
309
310    fn drop(&mut self, this: Resource<tcp::TcpSocket>) -> Result<(), anyhow::Error> {
311        let table = self.table();
312
313        // As in the filesystem implementation, we assume closing a socket
314        // doesn't block.
315        let dropped = table.delete(this)?;
316        drop(dropped);
317
318        Ok(())
319    }
320}
321
322pub mod sync {
323    use wasmtime::component::Resource;
324
325    use crate::{
326        bindings::{
327            sockets::{
328                network::Network,
329                tcp::{self as async_tcp, HostTcpSocket as AsyncHostTcpSocket},
330            },
331            sync::sockets::tcp::{
332                self, Duration, HostTcpSocket, InputStream, IpAddressFamily, IpSocketAddress,
333                OutputStream, Pollable, ShutdownType, TcpSocket,
334            },
335        },
336        runtime::in_tokio,
337        SocketError, WasiImpl, WasiView,
338    };
339
340    impl<T> tcp::Host for WasiImpl<T> where T: WasiView {}
341
342    impl<T> HostTcpSocket for WasiImpl<T>
343    where
344        T: WasiView,
345    {
346        fn start_bind(
347            &mut self,
348            self_: Resource<TcpSocket>,
349            network: Resource<Network>,
350            local_address: IpSocketAddress,
351        ) -> Result<(), SocketError> {
352            in_tokio(async {
353                AsyncHostTcpSocket::start_bind(self, self_, network, local_address).await
354            })
355        }
356
357        fn finish_bind(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
358            AsyncHostTcpSocket::finish_bind(self, self_)
359        }
360
361        fn start_connect(
362            &mut self,
363            self_: Resource<TcpSocket>,
364            network: Resource<Network>,
365            remote_address: IpSocketAddress,
366        ) -> Result<(), SocketError> {
367            in_tokio(async {
368                AsyncHostTcpSocket::start_connect(self, self_, network, remote_address).await
369            })
370        }
371
372        fn finish_connect(
373            &mut self,
374            self_: Resource<TcpSocket>,
375        ) -> Result<(Resource<InputStream>, Resource<OutputStream>), SocketError> {
376            AsyncHostTcpSocket::finish_connect(self, self_)
377        }
378
379        fn start_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
380            AsyncHostTcpSocket::start_listen(self, self_)
381        }
382
383        fn finish_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
384            AsyncHostTcpSocket::finish_listen(self, self_)
385        }
386
387        fn accept(
388            &mut self,
389            self_: Resource<TcpSocket>,
390        ) -> Result<
391            (
392                Resource<TcpSocket>,
393                Resource<InputStream>,
394                Resource<OutputStream>,
395            ),
396            SocketError,
397        > {
398            AsyncHostTcpSocket::accept(self, self_)
399        }
400
401        fn local_address(
402            &mut self,
403            self_: Resource<TcpSocket>,
404        ) -> Result<IpSocketAddress, SocketError> {
405            AsyncHostTcpSocket::local_address(self, self_)
406        }
407
408        fn remote_address(
409            &mut self,
410            self_: Resource<TcpSocket>,
411        ) -> Result<IpSocketAddress, SocketError> {
412            AsyncHostTcpSocket::remote_address(self, self_)
413        }
414
415        fn is_listening(&mut self, self_: Resource<TcpSocket>) -> wasmtime::Result<bool> {
416            AsyncHostTcpSocket::is_listening(self, self_)
417        }
418
419        fn address_family(
420            &mut self,
421            self_: Resource<TcpSocket>,
422        ) -> wasmtime::Result<IpAddressFamily> {
423            AsyncHostTcpSocket::address_family(self, self_)
424        }
425
426        fn set_listen_backlog_size(
427            &mut self,
428            self_: Resource<TcpSocket>,
429            value: u64,
430        ) -> Result<(), SocketError> {
431            AsyncHostTcpSocket::set_listen_backlog_size(self, self_, value)
432        }
433
434        fn keep_alive_enabled(&mut self, self_: Resource<TcpSocket>) -> Result<bool, SocketError> {
435            AsyncHostTcpSocket::keep_alive_enabled(self, self_)
436        }
437
438        fn set_keep_alive_enabled(
439            &mut self,
440            self_: Resource<TcpSocket>,
441            value: bool,
442        ) -> Result<(), SocketError> {
443            AsyncHostTcpSocket::set_keep_alive_enabled(self, self_, value)
444        }
445
446        fn keep_alive_idle_time(
447            &mut self,
448            self_: Resource<TcpSocket>,
449        ) -> Result<Duration, SocketError> {
450            AsyncHostTcpSocket::keep_alive_idle_time(self, self_)
451        }
452
453        fn set_keep_alive_idle_time(
454            &mut self,
455            self_: Resource<TcpSocket>,
456            value: Duration,
457        ) -> Result<(), SocketError> {
458            AsyncHostTcpSocket::set_keep_alive_idle_time(self, self_, value)
459        }
460
461        fn keep_alive_interval(
462            &mut self,
463            self_: Resource<TcpSocket>,
464        ) -> Result<Duration, SocketError> {
465            AsyncHostTcpSocket::keep_alive_interval(self, self_)
466        }
467
468        fn set_keep_alive_interval(
469            &mut self,
470            self_: Resource<TcpSocket>,
471            value: Duration,
472        ) -> Result<(), SocketError> {
473            AsyncHostTcpSocket::set_keep_alive_interval(self, self_, value)
474        }
475
476        fn keep_alive_count(&mut self, self_: Resource<TcpSocket>) -> Result<u32, SocketError> {
477            AsyncHostTcpSocket::keep_alive_count(self, self_)
478        }
479
480        fn set_keep_alive_count(
481            &mut self,
482            self_: Resource<TcpSocket>,
483            value: u32,
484        ) -> Result<(), SocketError> {
485            AsyncHostTcpSocket::set_keep_alive_count(self, self_, value)
486        }
487
488        fn hop_limit(&mut self, self_: Resource<TcpSocket>) -> Result<u8, SocketError> {
489            AsyncHostTcpSocket::hop_limit(self, self_)
490        }
491
492        fn set_hop_limit(
493            &mut self,
494            self_: Resource<TcpSocket>,
495            value: u8,
496        ) -> Result<(), SocketError> {
497            AsyncHostTcpSocket::set_hop_limit(self, self_, value)
498        }
499
500        fn receive_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
501            AsyncHostTcpSocket::receive_buffer_size(self, self_)
502        }
503
504        fn set_receive_buffer_size(
505            &mut self,
506            self_: Resource<TcpSocket>,
507            value: u64,
508        ) -> Result<(), SocketError> {
509            AsyncHostTcpSocket::set_receive_buffer_size(self, self_, value)
510        }
511
512        fn send_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
513            AsyncHostTcpSocket::send_buffer_size(self, self_)
514        }
515
516        fn set_send_buffer_size(
517            &mut self,
518            self_: Resource<TcpSocket>,
519            value: u64,
520        ) -> Result<(), SocketError> {
521            AsyncHostTcpSocket::set_send_buffer_size(self, self_, value)
522        }
523
524        fn subscribe(
525            &mut self,
526            self_: Resource<TcpSocket>,
527        ) -> wasmtime::Result<Resource<Pollable>> {
528            AsyncHostTcpSocket::subscribe(self, self_)
529        }
530
531        fn shutdown(
532            &mut self,
533            self_: Resource<TcpSocket>,
534            shutdown_type: ShutdownType,
535        ) -> Result<(), SocketError> {
536            AsyncHostTcpSocket::shutdown(self, self_, shutdown_type.into())
537        }
538
539        fn drop(&mut self, rep: Resource<TcpSocket>) -> wasmtime::Result<()> {
540            AsyncHostTcpSocket::drop(self, rep)
541        }
542    }
543
544    impl From<ShutdownType> for async_tcp::ShutdownType {
545        fn from(other: ShutdownType) -> Self {
546            match other {
547                ShutdownType::Receive => async_tcp::ShutdownType::Receive,
548                ShutdownType::Send => async_tcp::ShutdownType::Send,
549                ShutdownType::Both => async_tcp::ShutdownType::Both,
550            }
551        }
552    }
553}