1use crate::net::{SocketAddrUse, SocketAddressFamily};
2use crate::p2::bindings::{
3 sockets::network::{IpAddressFamily, IpSocketAddress, Network},
4 sockets::tcp::{self, ShutdownType},
5};
6use crate::p2::{SocketResult, WasiImpl, WasiView};
7use std::net::SocketAddr;
8use std::time::Duration;
9use wasmtime::component::Resource;
10use wasmtime_wasi_io::{
11 poll::DynPollable,
12 streams::{DynInputStream, DynOutputStream},
13 IoView,
14};
15
16impl<T> tcp::Host for WasiImpl<T> where T: WasiView {}
17
18impl<T> crate::p2::host::tcp::tcp::HostTcpSocket for WasiImpl<T>
19where
20 T: WasiView,
21{
22 async fn start_bind(
23 &mut self,
24 this: Resource<tcp::TcpSocket>,
25 network: Resource<Network>,
26 local_address: IpSocketAddress,
27 ) -> SocketResult<()> {
28 self.ctx().allowed_network_uses.check_allowed_tcp()?;
29 let table = self.table();
30 let network = table.get(&network)?;
31 let local_address: SocketAddr = local_address.into();
32
33 network
35 .check_socket_addr(local_address, SocketAddrUse::TcpBind)
36 .await?;
37
38 table.get_mut(&this)?.start_bind(local_address)?;
40
41 Ok(())
42 }
43
44 fn finish_bind(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
45 let table = self.table();
46 let socket = table.get_mut(&this)?;
47
48 socket.finish_bind()
49 }
50
51 async fn start_connect(
52 &mut self,
53 this: Resource<tcp::TcpSocket>,
54 network: Resource<Network>,
55 remote_address: IpSocketAddress,
56 ) -> SocketResult<()> {
57 self.ctx().allowed_network_uses.check_allowed_tcp()?;
58 let table = self.table();
59 let network = table.get(&network)?;
60 let remote_address: SocketAddr = remote_address.into();
61
62 network
64 .check_socket_addr(remote_address, SocketAddrUse::TcpConnect)
65 .await?;
66
67 table.get_mut(&this)?.start_connect(remote_address)?;
69
70 Ok(())
71 }
72
73 fn finish_connect(
74 &mut self,
75 this: Resource<tcp::TcpSocket>,
76 ) -> SocketResult<(Resource<DynInputStream>, Resource<DynOutputStream>)> {
77 let table = self.table();
78 let socket = table.get_mut(&this)?;
79
80 let (input, output) = socket.finish_connect()?;
81
82 let input_stream = self.table().push_child(input, &this)?;
83 let output_stream = self.table().push_child(output, &this)?;
84
85 Ok((input_stream, output_stream))
86 }
87
88 fn start_listen(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
89 self.ctx().allowed_network_uses.check_allowed_tcp()?;
90 let table = self.table();
91 let socket = table.get_mut(&this)?;
92
93 socket.start_listen()
94 }
95
96 fn finish_listen(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<()> {
97 let table = self.table();
98 let socket = table.get_mut(&this)?;
99 socket.finish_listen()
100 }
101
102 fn accept(
103 &mut self,
104 this: Resource<tcp::TcpSocket>,
105 ) -> SocketResult<(
106 Resource<tcp::TcpSocket>,
107 Resource<DynInputStream>,
108 Resource<DynOutputStream>,
109 )> {
110 self.ctx().allowed_network_uses.check_allowed_tcp()?;
111 let table = self.table();
112 let socket = table.get_mut(&this)?;
113
114 let (tcp_socket, input, output) = socket.accept()?;
115
116 let tcp_socket = self.table().push(tcp_socket)?;
117 let input_stream = self.table().push_child(input, &tcp_socket)?;
118 let output_stream = self.table().push_child(output, &tcp_socket)?;
119
120 Ok((tcp_socket, input_stream, output_stream))
121 }
122
123 fn local_address(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<IpSocketAddress> {
124 let table = self.table();
125 let socket = table.get(&this)?;
126
127 socket.local_address().map(Into::into)
128 }
129
130 fn remote_address(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<IpSocketAddress> {
131 let table = self.table();
132 let socket = table.get(&this)?;
133
134 socket.remote_address().map(Into::into)
135 }
136
137 fn is_listening(&mut self, this: Resource<tcp::TcpSocket>) -> Result<bool, anyhow::Error> {
138 let table = self.table();
139 let socket = table.get(&this)?;
140
141 Ok(socket.is_listening())
142 }
143
144 fn address_family(
145 &mut self,
146 this: Resource<tcp::TcpSocket>,
147 ) -> Result<IpAddressFamily, anyhow::Error> {
148 let table = self.table();
149 let socket = table.get(&this)?;
150
151 match socket.address_family() {
152 SocketAddressFamily::Ipv4 => Ok(IpAddressFamily::Ipv4),
153 SocketAddressFamily::Ipv6 => Ok(IpAddressFamily::Ipv6),
154 }
155 }
156
157 fn set_listen_backlog_size(
158 &mut self,
159 this: Resource<tcp::TcpSocket>,
160 value: u64,
161 ) -> SocketResult<()> {
162 let table = self.table();
163 let socket = table.get_mut(&this)?;
164
165 let value = value.try_into().unwrap_or(u32::MAX);
167
168 socket.set_listen_backlog_size(value)
169 }
170
171 fn keep_alive_enabled(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<bool> {
172 let table = self.table();
173 let socket = table.get(&this)?;
174 socket.keep_alive_enabled()
175 }
176
177 fn set_keep_alive_enabled(
178 &mut self,
179 this: Resource<tcp::TcpSocket>,
180 value: bool,
181 ) -> SocketResult<()> {
182 let table = self.table();
183 let socket = table.get(&this)?;
184 socket.set_keep_alive_enabled(value)
185 }
186
187 fn keep_alive_idle_time(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
188 let table = self.table();
189 let socket = table.get(&this)?;
190 Ok(socket.keep_alive_idle_time()?.as_nanos() as u64)
191 }
192
193 fn set_keep_alive_idle_time(
194 &mut self,
195 this: Resource<tcp::TcpSocket>,
196 value: u64,
197 ) -> SocketResult<()> {
198 let table = self.table();
199 let socket = table.get_mut(&this)?;
200 let duration = Duration::from_nanos(value);
201 socket.set_keep_alive_idle_time(duration)
202 }
203
204 fn keep_alive_interval(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
205 let table = self.table();
206 let socket = table.get(&this)?;
207 Ok(socket.keep_alive_interval()?.as_nanos() as u64)
208 }
209
210 fn set_keep_alive_interval(
211 &mut self,
212 this: Resource<tcp::TcpSocket>,
213 value: u64,
214 ) -> SocketResult<()> {
215 let table = self.table();
216 let socket = table.get(&this)?;
217 socket.set_keep_alive_interval(Duration::from_nanos(value))
218 }
219
220 fn keep_alive_count(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u32> {
221 let table = self.table();
222 let socket = table.get(&this)?;
223 socket.keep_alive_count()
224 }
225
226 fn set_keep_alive_count(
227 &mut self,
228 this: Resource<tcp::TcpSocket>,
229 value: u32,
230 ) -> SocketResult<()> {
231 let table = self.table();
232 let socket = table.get(&this)?;
233 socket.set_keep_alive_count(value)
234 }
235
236 fn hop_limit(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u8> {
237 let table = self.table();
238 let socket = table.get(&this)?;
239 socket.hop_limit()
240 }
241
242 fn set_hop_limit(&mut self, this: Resource<tcp::TcpSocket>, value: u8) -> SocketResult<()> {
243 let table = self.table();
244 let socket = table.get_mut(&this)?;
245 socket.set_hop_limit(value)
246 }
247
248 fn receive_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
249 let table = self.table();
250 let socket = table.get(&this)?;
251
252 Ok(socket.receive_buffer_size()? as u64)
253 }
254
255 fn set_receive_buffer_size(
256 &mut self,
257 this: Resource<tcp::TcpSocket>,
258 value: u64,
259 ) -> SocketResult<()> {
260 let table = self.table();
261 let socket = table.get_mut(&this)?;
262 let value = value.try_into().unwrap_or(usize::MAX);
263 socket.set_receive_buffer_size(value)
264 }
265
266 fn send_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> SocketResult<u64> {
267 let table = self.table();
268 let socket = table.get(&this)?;
269
270 Ok(socket.send_buffer_size()? as u64)
271 }
272
273 fn set_send_buffer_size(
274 &mut self,
275 this: Resource<tcp::TcpSocket>,
276 value: u64,
277 ) -> SocketResult<()> {
278 let table = self.table();
279 let socket = table.get_mut(&this)?;
280 let value = value.try_into().unwrap_or(usize::MAX);
281 socket.set_send_buffer_size(value)
282 }
283
284 fn subscribe(
285 &mut self,
286 this: Resource<tcp::TcpSocket>,
287 ) -> anyhow::Result<Resource<DynPollable>> {
288 wasmtime_wasi_io::poll::subscribe(self.table(), this)
289 }
290
291 fn shutdown(
292 &mut self,
293 this: Resource<tcp::TcpSocket>,
294 shutdown_type: ShutdownType,
295 ) -> SocketResult<()> {
296 let table = self.table();
297 let socket = table.get(&this)?;
298
299 let how = match shutdown_type {
300 ShutdownType::Receive => std::net::Shutdown::Read,
301 ShutdownType::Send => std::net::Shutdown::Write,
302 ShutdownType::Both => std::net::Shutdown::Both,
303 };
304 socket.shutdown(how)
305 }
306
307 fn drop(&mut self, this: Resource<tcp::TcpSocket>) -> Result<(), anyhow::Error> {
308 let table = self.table();
309
310 let dropped = table.delete(this)?;
313 drop(dropped);
314
315 Ok(())
316 }
317}
318
319pub mod sync {
320 use wasmtime::component::Resource;
321
322 use crate::p2::{
323 bindings::{
324 sockets::{
325 network::Network,
326 tcp::{self as async_tcp, HostTcpSocket as AsyncHostTcpSocket},
327 },
328 sync::sockets::tcp::{
329 self, Duration, HostTcpSocket, InputStream, IpAddressFamily, IpSocketAddress,
330 OutputStream, Pollable, ShutdownType, TcpSocket,
331 },
332 },
333 SocketError, WasiImpl, WasiView,
334 };
335 use crate::runtime::in_tokio;
336
337 impl<T> tcp::Host for WasiImpl<T> where T: WasiView {}
338
339 impl<T> HostTcpSocket for WasiImpl<T>
340 where
341 T: WasiView,
342 {
343 fn start_bind(
344 &mut self,
345 self_: Resource<TcpSocket>,
346 network: Resource<Network>,
347 local_address: IpSocketAddress,
348 ) -> Result<(), SocketError> {
349 in_tokio(async {
350 AsyncHostTcpSocket::start_bind(self, self_, network, local_address).await
351 })
352 }
353
354 fn finish_bind(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
355 AsyncHostTcpSocket::finish_bind(self, self_)
356 }
357
358 fn start_connect(
359 &mut self,
360 self_: Resource<TcpSocket>,
361 network: Resource<Network>,
362 remote_address: IpSocketAddress,
363 ) -> Result<(), SocketError> {
364 in_tokio(async {
365 AsyncHostTcpSocket::start_connect(self, self_, network, remote_address).await
366 })
367 }
368
369 fn finish_connect(
370 &mut self,
371 self_: Resource<TcpSocket>,
372 ) -> Result<(Resource<InputStream>, Resource<OutputStream>), SocketError> {
373 AsyncHostTcpSocket::finish_connect(self, self_)
374 }
375
376 fn start_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
377 AsyncHostTcpSocket::start_listen(self, self_)
378 }
379
380 fn finish_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
381 AsyncHostTcpSocket::finish_listen(self, self_)
382 }
383
384 fn accept(
385 &mut self,
386 self_: Resource<TcpSocket>,
387 ) -> Result<
388 (
389 Resource<TcpSocket>,
390 Resource<InputStream>,
391 Resource<OutputStream>,
392 ),
393 SocketError,
394 > {
395 AsyncHostTcpSocket::accept(self, self_)
396 }
397
398 fn local_address(
399 &mut self,
400 self_: Resource<TcpSocket>,
401 ) -> Result<IpSocketAddress, SocketError> {
402 AsyncHostTcpSocket::local_address(self, self_)
403 }
404
405 fn remote_address(
406 &mut self,
407 self_: Resource<TcpSocket>,
408 ) -> Result<IpSocketAddress, SocketError> {
409 AsyncHostTcpSocket::remote_address(self, self_)
410 }
411
412 fn is_listening(&mut self, self_: Resource<TcpSocket>) -> wasmtime::Result<bool> {
413 AsyncHostTcpSocket::is_listening(self, self_)
414 }
415
416 fn address_family(
417 &mut self,
418 self_: Resource<TcpSocket>,
419 ) -> wasmtime::Result<IpAddressFamily> {
420 AsyncHostTcpSocket::address_family(self, self_)
421 }
422
423 fn set_listen_backlog_size(
424 &mut self,
425 self_: Resource<TcpSocket>,
426 value: u64,
427 ) -> Result<(), SocketError> {
428 AsyncHostTcpSocket::set_listen_backlog_size(self, self_, value)
429 }
430
431 fn keep_alive_enabled(&mut self, self_: Resource<TcpSocket>) -> Result<bool, SocketError> {
432 AsyncHostTcpSocket::keep_alive_enabled(self, self_)
433 }
434
435 fn set_keep_alive_enabled(
436 &mut self,
437 self_: Resource<TcpSocket>,
438 value: bool,
439 ) -> Result<(), SocketError> {
440 AsyncHostTcpSocket::set_keep_alive_enabled(self, self_, value)
441 }
442
443 fn keep_alive_idle_time(
444 &mut self,
445 self_: Resource<TcpSocket>,
446 ) -> Result<Duration, SocketError> {
447 AsyncHostTcpSocket::keep_alive_idle_time(self, self_)
448 }
449
450 fn set_keep_alive_idle_time(
451 &mut self,
452 self_: Resource<TcpSocket>,
453 value: Duration,
454 ) -> Result<(), SocketError> {
455 AsyncHostTcpSocket::set_keep_alive_idle_time(self, self_, value)
456 }
457
458 fn keep_alive_interval(
459 &mut self,
460 self_: Resource<TcpSocket>,
461 ) -> Result<Duration, SocketError> {
462 AsyncHostTcpSocket::keep_alive_interval(self, self_)
463 }
464
465 fn set_keep_alive_interval(
466 &mut self,
467 self_: Resource<TcpSocket>,
468 value: Duration,
469 ) -> Result<(), SocketError> {
470 AsyncHostTcpSocket::set_keep_alive_interval(self, self_, value)
471 }
472
473 fn keep_alive_count(&mut self, self_: Resource<TcpSocket>) -> Result<u32, SocketError> {
474 AsyncHostTcpSocket::keep_alive_count(self, self_)
475 }
476
477 fn set_keep_alive_count(
478 &mut self,
479 self_: Resource<TcpSocket>,
480 value: u32,
481 ) -> Result<(), SocketError> {
482 AsyncHostTcpSocket::set_keep_alive_count(self, self_, value)
483 }
484
485 fn hop_limit(&mut self, self_: Resource<TcpSocket>) -> Result<u8, SocketError> {
486 AsyncHostTcpSocket::hop_limit(self, self_)
487 }
488
489 fn set_hop_limit(
490 &mut self,
491 self_: Resource<TcpSocket>,
492 value: u8,
493 ) -> Result<(), SocketError> {
494 AsyncHostTcpSocket::set_hop_limit(self, self_, value)
495 }
496
497 fn receive_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
498 AsyncHostTcpSocket::receive_buffer_size(self, self_)
499 }
500
501 fn set_receive_buffer_size(
502 &mut self,
503 self_: Resource<TcpSocket>,
504 value: u64,
505 ) -> Result<(), SocketError> {
506 AsyncHostTcpSocket::set_receive_buffer_size(self, self_, value)
507 }
508
509 fn send_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
510 AsyncHostTcpSocket::send_buffer_size(self, self_)
511 }
512
513 fn set_send_buffer_size(
514 &mut self,
515 self_: Resource<TcpSocket>,
516 value: u64,
517 ) -> Result<(), SocketError> {
518 AsyncHostTcpSocket::set_send_buffer_size(self, self_, value)
519 }
520
521 fn subscribe(
522 &mut self,
523 self_: Resource<TcpSocket>,
524 ) -> wasmtime::Result<Resource<Pollable>> {
525 AsyncHostTcpSocket::subscribe(self, self_)
526 }
527
528 fn shutdown(
529 &mut self,
530 self_: Resource<TcpSocket>,
531 shutdown_type: ShutdownType,
532 ) -> Result<(), SocketError> {
533 AsyncHostTcpSocket::shutdown(self, self_, shutdown_type.into())
534 }
535
536 fn drop(&mut self, rep: Resource<TcpSocket>) -> wasmtime::Result<()> {
537 AsyncHostTcpSocket::drop(self, rep)
538 }
539 }
540
541 impl From<ShutdownType> for async_tcp::ShutdownType {
542 fn from(other: ShutdownType) -> Self {
543 match other {
544 ShutdownType::Receive => async_tcp::ShutdownType::Receive,
545 ShutdownType::Send => async_tcp::ShutdownType::Send,
546 ShutdownType::Both => async_tcp::ShutdownType::Both,
547 }
548 }
549 }
550}