1use crate::p2::bindings::{
2 sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network},
3 sockets::tcp::{self, ShutdownType},
4};
5use crate::p2::{Pollable, SocketResult};
6use crate::sockets::{SocketAddrUse, TcpSocket, WasiSocketsCtxView};
7use std::net::SocketAddr;
8use wasmtime::component::Resource;
9use wasmtime_wasi_io::{
10 poll::DynPollable,
11 streams::{DynInputStream, DynOutputStream},
12};
13
14impl tcp::Host for WasiSocketsCtxView<'_> {}
15
16impl crate::p2::host::tcp::tcp::HostTcpSocket for WasiSocketsCtxView<'_> {
17 async fn start_bind(
18 &mut self,
19 this: Resource<TcpSocket>,
20 network: Resource<Network>,
21 local_address: IpSocketAddress,
22 ) -> SocketResult<()> {
23 let network = self.table.get(&network)?;
24 let local_address: SocketAddr = local_address.into();
25
26 network
28 .check_socket_addr(local_address, SocketAddrUse::TcpBind)
29 .await?;
30
31 self.table.get_mut(&this)?.start_bind(local_address)?;
33
34 Ok(())
35 }
36
37 fn finish_bind(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
38 let socket = self.table.get_mut(&this)?;
39 socket.finish_bind()?;
40 Ok(())
41 }
42
43 async fn start_connect(
44 &mut self,
45 this: Resource<TcpSocket>,
46 network: Resource<Network>,
47 remote_address: IpSocketAddress,
48 ) -> SocketResult<()> {
49 let network = self.table.get(&network)?;
50 let remote_address: SocketAddr = remote_address.into();
51
52 network
54 .check_socket_addr(remote_address, SocketAddrUse::TcpConnect)
55 .await?;
56
57 let socket = self.table.get_mut(&this)?;
59 let future = socket
60 .start_connect(&remote_address)?
61 .connect(remote_address);
62 socket.set_pending_connect(future)?;
63
64 Ok(())
65 }
66
67 fn finish_connect(
68 &mut self,
69 this: Resource<TcpSocket>,
70 ) -> SocketResult<(Resource<DynInputStream>, Resource<DynOutputStream>)> {
71 let socket = self.table.get_mut(&this)?;
72
73 let result = socket
74 .take_pending_connect()?
75 .ok_or(ErrorCode::WouldBlock)?;
76 socket.finish_connect(result)?;
77 let (input, output) = socket.p2_streams()?;
78 let input = self.table.push_child(input, &this)?;
79 let output = self.table.push_child(output, &this)?;
80 Ok((input, output))
81 }
82
83 fn start_listen(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
84 let socket = self.table.get_mut(&this)?;
85
86 socket.start_listen()?;
87 Ok(())
88 }
89
90 fn finish_listen(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
91 let socket = self.table.get_mut(&this)?;
92 socket.finish_listen()?;
93 Ok(())
94 }
95
96 fn accept(
97 &mut self,
98 this: Resource<TcpSocket>,
99 ) -> SocketResult<(
100 Resource<TcpSocket>,
101 Resource<DynInputStream>,
102 Resource<DynOutputStream>,
103 )> {
104 let socket = self.table.get_mut(&this)?;
105
106 let mut tcp_socket = socket.accept()?.ok_or(ErrorCode::WouldBlock)?;
107 let (input, output) = tcp_socket.p2_streams()?;
108
109 let tcp_socket = self.table.push(tcp_socket)?;
110 let input_stream = self.table.push_child(input, &tcp_socket)?;
111 let output_stream = self.table.push_child(output, &tcp_socket)?;
112
113 Ok((tcp_socket, input_stream, output_stream))
114 }
115
116 fn local_address(&mut self, this: Resource<TcpSocket>) -> SocketResult<IpSocketAddress> {
117 let socket = self.table.get(&this)?;
118 Ok(socket.local_address()?.into())
119 }
120
121 fn remote_address(&mut self, this: Resource<TcpSocket>) -> SocketResult<IpSocketAddress> {
122 let socket = self.table.get(&this)?;
123 Ok(socket.remote_address()?.into())
124 }
125
126 fn is_listening(&mut self, this: Resource<TcpSocket>) -> Result<bool, anyhow::Error> {
127 let socket = self.table.get(&this)?;
128
129 Ok(socket.is_listening())
130 }
131
132 fn address_family(
133 &mut self,
134 this: Resource<TcpSocket>,
135 ) -> Result<IpAddressFamily, anyhow::Error> {
136 let socket = self.table.get(&this)?;
137 Ok(socket.address_family().into())
138 }
139
140 fn set_listen_backlog_size(
141 &mut self,
142 this: Resource<TcpSocket>,
143 value: u64,
144 ) -> SocketResult<()> {
145 let socket = self.table.get_mut(&this)?;
146 socket.set_listen_backlog_size(value)?;
147 Ok(())
148 }
149
150 fn keep_alive_enabled(&mut self, this: Resource<TcpSocket>) -> SocketResult<bool> {
151 let socket = self.table.get(&this)?;
152 Ok(socket.keep_alive_enabled()?)
153 }
154
155 fn set_keep_alive_enabled(
156 &mut self,
157 this: Resource<TcpSocket>,
158 value: bool,
159 ) -> SocketResult<()> {
160 let socket = self.table.get(&this)?;
161 socket.set_keep_alive_enabled(value)?;
162 Ok(())
163 }
164
165 fn keep_alive_idle_time(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
166 let socket = self.table.get(&this)?;
167 Ok(socket.keep_alive_idle_time()?)
168 }
169
170 fn set_keep_alive_idle_time(
171 &mut self,
172 this: Resource<TcpSocket>,
173 value: u64,
174 ) -> SocketResult<()> {
175 let socket = self.table.get_mut(&this)?;
176 socket.set_keep_alive_idle_time(value)?;
177 Ok(())
178 }
179
180 fn keep_alive_interval(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
181 let socket = self.table.get(&this)?;
182 Ok(socket.keep_alive_interval()?)
183 }
184
185 fn set_keep_alive_interval(
186 &mut self,
187 this: Resource<TcpSocket>,
188 value: u64,
189 ) -> SocketResult<()> {
190 let socket = self.table.get(&this)?;
191 socket.set_keep_alive_interval(value)?;
192 Ok(())
193 }
194
195 fn keep_alive_count(&mut self, this: Resource<TcpSocket>) -> SocketResult<u32> {
196 let socket = self.table.get(&this)?;
197 Ok(socket.keep_alive_count()?)
198 }
199
200 fn set_keep_alive_count(&mut self, this: Resource<TcpSocket>, value: u32) -> SocketResult<()> {
201 let socket = self.table.get(&this)?;
202 socket.set_keep_alive_count(value)?;
203 Ok(())
204 }
205
206 fn hop_limit(&mut self, this: Resource<TcpSocket>) -> SocketResult<u8> {
207 let socket = self.table.get(&this)?;
208 Ok(socket.hop_limit()?)
209 }
210
211 fn set_hop_limit(&mut self, this: Resource<TcpSocket>, value: u8) -> SocketResult<()> {
212 let socket = self.table.get_mut(&this)?;
213 socket.set_hop_limit(value)?;
214 Ok(())
215 }
216
217 fn receive_buffer_size(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
218 let socket = self.table.get(&this)?;
219 Ok(socket.receive_buffer_size()?)
220 }
221
222 fn set_receive_buffer_size(
223 &mut self,
224 this: Resource<TcpSocket>,
225 value: u64,
226 ) -> SocketResult<()> {
227 let socket = self.table.get_mut(&this)?;
228 socket.set_receive_buffer_size(value)?;
229 Ok(())
230 }
231
232 fn send_buffer_size(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
233 let socket = self.table.get(&this)?;
234 Ok(socket.send_buffer_size()?)
235 }
236
237 fn set_send_buffer_size(&mut self, this: Resource<TcpSocket>, value: u64) -> SocketResult<()> {
238 let socket = self.table.get_mut(&this)?;
239 socket.set_send_buffer_size(value)?;
240 Ok(())
241 }
242
243 fn subscribe(&mut self, this: Resource<TcpSocket>) -> anyhow::Result<Resource<DynPollable>> {
244 wasmtime_wasi_io::poll::subscribe(self.table, this)
245 }
246
247 fn shutdown(
248 &mut self,
249 this: Resource<TcpSocket>,
250 shutdown_type: ShutdownType,
251 ) -> SocketResult<()> {
252 let socket = self.table.get(&this)?;
253
254 let how = match shutdown_type {
255 ShutdownType::Receive => std::net::Shutdown::Read,
256 ShutdownType::Send => std::net::Shutdown::Write,
257 ShutdownType::Both => std::net::Shutdown::Both,
258 };
259
260 let state = socket.p2_streaming_state()?;
261 state.shutdown(how)?;
262 Ok(())
263 }
264
265 fn drop(&mut self, this: Resource<TcpSocket>) -> Result<(), anyhow::Error> {
266 let dropped = self.table.delete(this)?;
269 drop(dropped);
270
271 Ok(())
272 }
273}
274
275#[async_trait::async_trait]
276impl Pollable for TcpSocket {
277 async fn ready(&mut self) {
278 <TcpSocket>::ready(self).await;
279 }
280}
281
282pub mod sync {
283 use crate::p2::{
284 SocketError,
285 bindings::{
286 sockets::{
287 network::Network,
288 tcp::{self as async_tcp, HostTcpSocket as AsyncHostTcpSocket},
289 },
290 sync::sockets::tcp::{
291 self, Duration, HostTcpSocket, InputStream, IpAddressFamily, IpSocketAddress,
292 OutputStream, Pollable, ShutdownType, TcpSocket,
293 },
294 },
295 };
296 use crate::runtime::in_tokio;
297 use crate::sockets::WasiSocketsCtxView;
298 use wasmtime::component::Resource;
299
300 impl tcp::Host for WasiSocketsCtxView<'_> {}
301
302 impl HostTcpSocket for WasiSocketsCtxView<'_> {
303 fn start_bind(
304 &mut self,
305 self_: Resource<TcpSocket>,
306 network: Resource<Network>,
307 local_address: IpSocketAddress,
308 ) -> Result<(), SocketError> {
309 in_tokio(async {
310 AsyncHostTcpSocket::start_bind(self, self_, network, local_address).await
311 })
312 }
313
314 fn finish_bind(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
315 AsyncHostTcpSocket::finish_bind(self, self_)
316 }
317
318 fn start_connect(
319 &mut self,
320 self_: Resource<TcpSocket>,
321 network: Resource<Network>,
322 remote_address: IpSocketAddress,
323 ) -> Result<(), SocketError> {
324 in_tokio(async {
325 AsyncHostTcpSocket::start_connect(self, self_, network, remote_address).await
326 })
327 }
328
329 fn finish_connect(
330 &mut self,
331 self_: Resource<TcpSocket>,
332 ) -> Result<(Resource<InputStream>, Resource<OutputStream>), SocketError> {
333 AsyncHostTcpSocket::finish_connect(self, self_)
334 }
335
336 fn start_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
337 AsyncHostTcpSocket::start_listen(self, self_)
338 }
339
340 fn finish_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
341 AsyncHostTcpSocket::finish_listen(self, self_)
342 }
343
344 fn accept(
345 &mut self,
346 self_: Resource<TcpSocket>,
347 ) -> Result<
348 (
349 Resource<TcpSocket>,
350 Resource<InputStream>,
351 Resource<OutputStream>,
352 ),
353 SocketError,
354 > {
355 AsyncHostTcpSocket::accept(self, self_)
356 }
357
358 fn local_address(
359 &mut self,
360 self_: Resource<TcpSocket>,
361 ) -> Result<IpSocketAddress, SocketError> {
362 AsyncHostTcpSocket::local_address(self, self_)
363 }
364
365 fn remote_address(
366 &mut self,
367 self_: Resource<TcpSocket>,
368 ) -> Result<IpSocketAddress, SocketError> {
369 AsyncHostTcpSocket::remote_address(self, self_)
370 }
371
372 fn is_listening(&mut self, self_: Resource<TcpSocket>) -> wasmtime::Result<bool> {
373 AsyncHostTcpSocket::is_listening(self, self_)
374 }
375
376 fn address_family(
377 &mut self,
378 self_: Resource<TcpSocket>,
379 ) -> wasmtime::Result<IpAddressFamily> {
380 AsyncHostTcpSocket::address_family(self, self_)
381 }
382
383 fn set_listen_backlog_size(
384 &mut self,
385 self_: Resource<TcpSocket>,
386 value: u64,
387 ) -> Result<(), SocketError> {
388 AsyncHostTcpSocket::set_listen_backlog_size(self, self_, value)
389 }
390
391 fn keep_alive_enabled(&mut self, self_: Resource<TcpSocket>) -> Result<bool, SocketError> {
392 AsyncHostTcpSocket::keep_alive_enabled(self, self_)
393 }
394
395 fn set_keep_alive_enabled(
396 &mut self,
397 self_: Resource<TcpSocket>,
398 value: bool,
399 ) -> Result<(), SocketError> {
400 AsyncHostTcpSocket::set_keep_alive_enabled(self, self_, value)
401 }
402
403 fn keep_alive_idle_time(
404 &mut self,
405 self_: Resource<TcpSocket>,
406 ) -> Result<Duration, SocketError> {
407 AsyncHostTcpSocket::keep_alive_idle_time(self, self_)
408 }
409
410 fn set_keep_alive_idle_time(
411 &mut self,
412 self_: Resource<TcpSocket>,
413 value: Duration,
414 ) -> Result<(), SocketError> {
415 AsyncHostTcpSocket::set_keep_alive_idle_time(self, self_, value)
416 }
417
418 fn keep_alive_interval(
419 &mut self,
420 self_: Resource<TcpSocket>,
421 ) -> Result<Duration, SocketError> {
422 AsyncHostTcpSocket::keep_alive_interval(self, self_)
423 }
424
425 fn set_keep_alive_interval(
426 &mut self,
427 self_: Resource<TcpSocket>,
428 value: Duration,
429 ) -> Result<(), SocketError> {
430 AsyncHostTcpSocket::set_keep_alive_interval(self, self_, value)
431 }
432
433 fn keep_alive_count(&mut self, self_: Resource<TcpSocket>) -> Result<u32, SocketError> {
434 AsyncHostTcpSocket::keep_alive_count(self, self_)
435 }
436
437 fn set_keep_alive_count(
438 &mut self,
439 self_: Resource<TcpSocket>,
440 value: u32,
441 ) -> Result<(), SocketError> {
442 AsyncHostTcpSocket::set_keep_alive_count(self, self_, value)
443 }
444
445 fn hop_limit(&mut self, self_: Resource<TcpSocket>) -> Result<u8, SocketError> {
446 AsyncHostTcpSocket::hop_limit(self, self_)
447 }
448
449 fn set_hop_limit(
450 &mut self,
451 self_: Resource<TcpSocket>,
452 value: u8,
453 ) -> Result<(), SocketError> {
454 AsyncHostTcpSocket::set_hop_limit(self, self_, value)
455 }
456
457 fn receive_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
458 AsyncHostTcpSocket::receive_buffer_size(self, self_)
459 }
460
461 fn set_receive_buffer_size(
462 &mut self,
463 self_: Resource<TcpSocket>,
464 value: u64,
465 ) -> Result<(), SocketError> {
466 AsyncHostTcpSocket::set_receive_buffer_size(self, self_, value)
467 }
468
469 fn send_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
470 AsyncHostTcpSocket::send_buffer_size(self, self_)
471 }
472
473 fn set_send_buffer_size(
474 &mut self,
475 self_: Resource<TcpSocket>,
476 value: u64,
477 ) -> Result<(), SocketError> {
478 AsyncHostTcpSocket::set_send_buffer_size(self, self_, value)
479 }
480
481 fn subscribe(
482 &mut self,
483 self_: Resource<TcpSocket>,
484 ) -> wasmtime::Result<Resource<Pollable>> {
485 AsyncHostTcpSocket::subscribe(self, self_)
486 }
487
488 fn shutdown(
489 &mut self,
490 self_: Resource<TcpSocket>,
491 shutdown_type: ShutdownType,
492 ) -> Result<(), SocketError> {
493 AsyncHostTcpSocket::shutdown(self, self_, shutdown_type.into())
494 }
495
496 fn drop(&mut self, rep: Resource<TcpSocket>) -> wasmtime::Result<()> {
497 AsyncHostTcpSocket::drop(self, rep)
498 }
499 }
500
501 impl From<ShutdownType> for async_tcp::ShutdownType {
502 fn from(other: ShutdownType) -> Self {
503 match other {
504 ShutdownType::Receive => async_tcp::ShutdownType::Receive,
505 ShutdownType::Send => async_tcp::ShutdownType::Send,
506 ShutdownType::Both => async_tcp::ShutdownType::Both,
507 }
508 }
509 }
510}