1use crate::net::{SocketAddrUse, SocketAddressFamily};
2use crate::p2::bindings::sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network};
3use crate::p2::bindings::sockets::udp;
4use crate::p2::host::network::util;
5use crate::p2::udp::{IncomingDatagramStream, OutgoingDatagramStream, SendState, UdpState};
6use crate::p2::{IoView, Pollable, SocketError, SocketResult, WasiImpl, WasiView};
7use anyhow::anyhow;
8use async_trait::async_trait;
9use io_lifetimes::AsSocketlike;
10use rustix::io::Errno;
11use std::net::SocketAddr;
12use tokio::io::Interest;
13use wasmtime::component::Resource;
14use wasmtime_wasi_io::poll::DynPollable;
15
16const MAX_UDP_DATAGRAM_SIZE: usize = u16::MAX as usize;
20
21impl<T> udp::Host for WasiImpl<T> where T: WasiView {}
22
23impl<T> udp::HostUdpSocket for WasiImpl<T>
24where
25 T: WasiView,
26{
27 async fn start_bind(
28 &mut self,
29 this: Resource<udp::UdpSocket>,
30 network: Resource<Network>,
31 local_address: IpSocketAddress,
32 ) -> SocketResult<()> {
33 self.ctx().allowed_network_uses.check_allowed_udp()?;
34 let table = self.table();
35
36 match table.get(&this)?.udp_state {
37 UdpState::Default => {}
38 UdpState::BindStarted => return Err(ErrorCode::ConcurrencyConflict.into()),
39 UdpState::Bound | UdpState::Connected => return Err(ErrorCode::InvalidState.into()),
40 }
41
42 let check = table.get(&network)?.socket_addr_check.clone();
44 table
45 .get_mut(&this)?
46 .socket_addr_check
47 .replace(check.clone());
48
49 let socket = table.get(&this)?;
50 let local_address: SocketAddr = local_address.into();
51
52 util::validate_address_family(&local_address, &socket.family)?;
53
54 {
55 check.check(local_address, SocketAddrUse::UdpBind).await?;
56
57 util::udp_bind(socket.udp_socket(), &local_address).map_err(|error| match error {
59 Errno::AFNOSUPPORT => ErrorCode::InvalidArgument,
67 _ => ErrorCode::from(error),
68 })?;
69 }
70
71 let socket = table.get_mut(&this)?;
72 socket.udp_state = UdpState::BindStarted;
73
74 Ok(())
75 }
76
77 fn finish_bind(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<()> {
78 let table = self.table();
79 let socket = table.get_mut(&this)?;
80
81 match socket.udp_state {
82 UdpState::BindStarted => {
83 socket.udp_state = UdpState::Bound;
84 Ok(())
85 }
86 _ => Err(ErrorCode::NotInProgress.into()),
87 }
88 }
89
90 async fn stream(
91 &mut self,
92 this: Resource<udp::UdpSocket>,
93 remote_address: Option<IpSocketAddress>,
94 ) -> SocketResult<(
95 Resource<udp::IncomingDatagramStream>,
96 Resource<udp::OutgoingDatagramStream>,
97 )> {
98 let table = self.table();
99
100 let has_active_streams = table
101 .iter_children(&this)?
102 .any(|c| c.is::<IncomingDatagramStream>() || c.is::<OutgoingDatagramStream>());
103
104 if has_active_streams {
105 return Err(SocketError::trap(anyhow!("UDP streams not dropped yet")));
106 }
107
108 let socket = table.get_mut(&this)?;
109 let remote_address = remote_address.map(SocketAddr::from);
110
111 match socket.udp_state {
112 UdpState::Bound | UdpState::Connected => {}
113 _ => return Err(ErrorCode::InvalidState.into()),
114 }
115
116 if let UdpState::Connected = socket.udp_state {
124 util::udp_disconnect(socket.udp_socket())?;
125 socket.udp_state = UdpState::Bound;
126 }
127
128 if let Some(connect_addr) = remote_address {
130 let Some(check) = socket.socket_addr_check.as_ref() else {
131 return Err(ErrorCode::InvalidState.into());
132 };
133 util::validate_remote_address(&connect_addr)?;
134 util::validate_address_family(&connect_addr, &socket.family)?;
135 check.check(connect_addr, SocketAddrUse::UdpConnect).await?;
136
137 rustix::net::connect(socket.udp_socket(), &connect_addr).map_err(
138 |error| match error {
139 Errno::AFNOSUPPORT => ErrorCode::InvalidArgument, Errno::INPROGRESS => {
141 tracing::debug!(
142 "UDP connect returned EINPROGRESS, which should never happen"
143 );
144 ErrorCode::Unknown
145 }
146 _ => ErrorCode::from(error),
147 },
148 )?;
149 socket.udp_state = UdpState::Connected;
150 }
151
152 let incoming_stream = IncomingDatagramStream {
153 inner: socket.inner.clone(),
154 remote_address,
155 };
156 let outgoing_stream = OutgoingDatagramStream {
157 inner: socket.inner.clone(),
158 remote_address,
159 family: socket.family,
160 send_state: SendState::Idle,
161 socket_addr_check: socket.socket_addr_check.clone(),
162 };
163
164 Ok((
165 self.table().push_child(incoming_stream, &this)?,
166 self.table().push_child(outgoing_stream, &this)?,
167 ))
168 }
169
170 fn local_address(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<IpSocketAddress> {
171 let table = self.table();
172 let socket = table.get(&this)?;
173
174 match socket.udp_state {
175 UdpState::Default => return Err(ErrorCode::InvalidState.into()),
176 UdpState::BindStarted => return Err(ErrorCode::ConcurrencyConflict.into()),
177 _ => {}
178 }
179
180 let addr = socket
181 .udp_socket()
182 .as_socketlike_view::<std::net::UdpSocket>()
183 .local_addr()?;
184 Ok(addr.into())
185 }
186
187 fn remote_address(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<IpSocketAddress> {
188 let table = self.table();
189 let socket = table.get(&this)?;
190
191 match socket.udp_state {
192 UdpState::Connected => {}
193 _ => return Err(ErrorCode::InvalidState.into()),
194 }
195
196 let addr = socket
197 .udp_socket()
198 .as_socketlike_view::<std::net::UdpSocket>()
199 .peer_addr()?;
200 Ok(addr.into())
201 }
202
203 fn address_family(
204 &mut self,
205 this: Resource<udp::UdpSocket>,
206 ) -> Result<IpAddressFamily, anyhow::Error> {
207 let table = self.table();
208 let socket = table.get(&this)?;
209
210 match socket.family {
211 SocketAddressFamily::Ipv4 => Ok(IpAddressFamily::Ipv4),
212 SocketAddressFamily::Ipv6 => Ok(IpAddressFamily::Ipv6),
213 }
214 }
215
216 fn unicast_hop_limit(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u8> {
217 let table = self.table();
218 let socket = table.get(&this)?;
219
220 let ttl = match socket.family {
221 SocketAddressFamily::Ipv4 => util::get_ip_ttl(socket.udp_socket())?,
222 SocketAddressFamily::Ipv6 => util::get_ipv6_unicast_hops(socket.udp_socket())?,
223 };
224
225 Ok(ttl)
226 }
227
228 fn set_unicast_hop_limit(
229 &mut self,
230 this: Resource<udp::UdpSocket>,
231 value: u8,
232 ) -> SocketResult<()> {
233 let table = self.table();
234 let socket = table.get(&this)?;
235
236 match socket.family {
237 SocketAddressFamily::Ipv4 => util::set_ip_ttl(socket.udp_socket(), value)?,
238 SocketAddressFamily::Ipv6 => util::set_ipv6_unicast_hops(socket.udp_socket(), value)?,
239 }
240
241 Ok(())
242 }
243
244 fn receive_buffer_size(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u64> {
245 let table = self.table();
246 let socket = table.get(&this)?;
247
248 let value = util::get_socket_recv_buffer_size(socket.udp_socket())?;
249 Ok(value as u64)
250 }
251
252 fn set_receive_buffer_size(
253 &mut self,
254 this: Resource<udp::UdpSocket>,
255 value: u64,
256 ) -> SocketResult<()> {
257 let table = self.table();
258 let socket = table.get(&this)?;
259 let value = value.try_into().unwrap_or(usize::MAX);
260
261 util::set_socket_recv_buffer_size(socket.udp_socket(), value)?;
262 Ok(())
263 }
264
265 fn send_buffer_size(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u64> {
266 let table = self.table();
267 let socket = table.get(&this)?;
268
269 let value = util::get_socket_send_buffer_size(socket.udp_socket())?;
270 Ok(value as u64)
271 }
272
273 fn set_send_buffer_size(
274 &mut self,
275 this: Resource<udp::UdpSocket>,
276 value: u64,
277 ) -> SocketResult<()> {
278 let table = self.table();
279 let socket = table.get(&this)?;
280 let value = value.try_into().unwrap_or(usize::MAX);
281
282 util::set_socket_send_buffer_size(socket.udp_socket(), value)?;
283 Ok(())
284 }
285
286 fn subscribe(
287 &mut self,
288 this: Resource<udp::UdpSocket>,
289 ) -> anyhow::Result<Resource<DynPollable>> {
290 wasmtime_wasi_io::poll::subscribe(self.table(), this)
291 }
292
293 fn drop(&mut self, this: Resource<udp::UdpSocket>) -> Result<(), anyhow::Error> {
294 let table = self.table();
295
296 let dropped = table.delete(this)?;
299 drop(dropped);
300
301 Ok(())
302 }
303}
304
305impl<T> udp::HostIncomingDatagramStream for WasiImpl<T>
306where
307 T: WasiView,
308{
309 fn receive(
310 &mut self,
311 this: Resource<udp::IncomingDatagramStream>,
312 max_results: u64,
313 ) -> SocketResult<Vec<udp::IncomingDatagram>> {
314 fn recv_one(
316 stream: &IncomingDatagramStream,
317 ) -> SocketResult<Option<udp::IncomingDatagram>> {
318 let mut buf = [0; MAX_UDP_DATAGRAM_SIZE];
319 let (size, received_addr) = stream.inner.try_recv_from(&mut buf)?;
320 debug_assert!(size <= buf.len());
321
322 match stream.remote_address {
323 Some(connected_addr) if connected_addr != received_addr => {
324 return Ok(None);
326 }
327 _ => {}
328 }
329
330 Ok(Some(udp::IncomingDatagram {
331 data: buf[..size].into(),
332 remote_address: received_addr.into(),
333 }))
334 }
335
336 let table = self.table();
337 let stream = table.get(&this)?;
338 let max_results: usize = max_results.try_into().unwrap_or(usize::MAX);
339
340 if max_results == 0 {
341 return Ok(vec![]);
342 }
343
344 let mut datagrams = vec![];
345
346 while datagrams.len() < max_results {
347 match recv_one(stream) {
348 Ok(Some(datagram)) => {
349 datagrams.push(datagram);
350 }
351 Ok(None) => {
352 }
354 Err(_) if datagrams.len() > 0 => {
355 return Ok(datagrams);
356 }
357 Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => {
358 return Ok(datagrams);
359 }
360 Err(e) => {
361 return Err(e);
362 }
363 }
364 }
365
366 Ok(datagrams)
367 }
368
369 fn subscribe(
370 &mut self,
371 this: Resource<udp::IncomingDatagramStream>,
372 ) -> anyhow::Result<Resource<DynPollable>> {
373 wasmtime_wasi_io::poll::subscribe(self.table(), this)
374 }
375
376 fn drop(&mut self, this: Resource<udp::IncomingDatagramStream>) -> Result<(), anyhow::Error> {
377 let table = self.table();
378
379 let dropped = table.delete(this)?;
382 drop(dropped);
383
384 Ok(())
385 }
386}
387
388#[async_trait]
389impl Pollable for IncomingDatagramStream {
390 async fn ready(&mut self) {
391 self.inner
393 .ready(Interest::READABLE)
394 .await
395 .expect("failed to await UDP socket readiness");
396 }
397}
398
399impl<T> udp::HostOutgoingDatagramStream for WasiImpl<T>
400where
401 T: WasiView,
402{
403 fn check_send(&mut self, this: Resource<udp::OutgoingDatagramStream>) -> SocketResult<u64> {
404 let table = self.table();
405 let stream = table.get_mut(&this)?;
406
407 let permit = match stream.send_state {
408 SendState::Idle => {
409 const PERMIT: usize = 16;
410 stream.send_state = SendState::Permitted(PERMIT);
411 PERMIT
412 }
413 SendState::Permitted(n) => n,
414 SendState::Waiting => 0,
415 };
416
417 Ok(permit.try_into().unwrap())
418 }
419
420 async fn send(
421 &mut self,
422 this: Resource<udp::OutgoingDatagramStream>,
423 datagrams: Vec<udp::OutgoingDatagram>,
424 ) -> SocketResult<u64> {
425 async fn send_one(
426 stream: &OutgoingDatagramStream,
427 datagram: &udp::OutgoingDatagram,
428 ) -> SocketResult<()> {
429 if datagram.data.len() > MAX_UDP_DATAGRAM_SIZE {
430 return Err(ErrorCode::DatagramTooLarge.into());
431 }
432
433 let provided_addr = datagram.remote_address.map(SocketAddr::from);
434 let addr = match (stream.remote_address, provided_addr) {
435 (None, Some(addr)) => {
436 let Some(check) = stream.socket_addr_check.as_ref() else {
437 return Err(ErrorCode::InvalidState.into());
438 };
439 check
440 .check(addr, SocketAddrUse::UdpOutgoingDatagram)
441 .await?;
442 addr
443 }
444 (Some(addr), None) => addr,
445 (Some(connected_addr), Some(provided_addr)) if connected_addr == provided_addr => {
446 connected_addr
447 }
448 _ => return Err(ErrorCode::InvalidArgument.into()),
449 };
450
451 util::validate_remote_address(&addr)?;
452 util::validate_address_family(&addr, &stream.family)?;
453
454 if stream.remote_address == Some(addr) {
455 stream.inner.try_send(&datagram.data)?;
456 } else {
457 stream.inner.try_send_to(&datagram.data, addr)?;
458 }
459
460 Ok(())
461 }
462
463 let table = self.table();
464 let stream = table.get_mut(&this)?;
465
466 match stream.send_state {
467 SendState::Permitted(n) if n >= datagrams.len() => {
468 stream.send_state = SendState::Idle;
469 }
470 SendState::Permitted(_) => {
471 return Err(SocketError::trap(anyhow::anyhow!(
472 "unpermitted: argument exceeds permitted size"
473 )))
474 }
475 SendState::Idle | SendState::Waiting => {
476 return Err(SocketError::trap(anyhow::anyhow!(
477 "unpermitted: must call check-send first"
478 )))
479 }
480 }
481
482 if datagrams.is_empty() {
483 return Ok(0);
484 }
485
486 let mut count = 0;
487
488 for datagram in datagrams {
489 match send_one(stream, &datagram).await {
490 Ok(_) => count += 1,
491 Err(_) if count > 0 => {
492 return Ok(count);
494 }
495 Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => {
496 stream.send_state = SendState::Waiting;
497 return Ok(count);
498 }
499 Err(e) => {
500 return Err(e);
501 }
502 }
503 }
504
505 Ok(count)
506 }
507
508 fn subscribe(
509 &mut self,
510 this: Resource<udp::OutgoingDatagramStream>,
511 ) -> anyhow::Result<Resource<DynPollable>> {
512 wasmtime_wasi_io::poll::subscribe(self.table(), this)
513 }
514
515 fn drop(&mut self, this: Resource<udp::OutgoingDatagramStream>) -> Result<(), anyhow::Error> {
516 let table = self.table();
517
518 let dropped = table.delete(this)?;
521 drop(dropped);
522
523 Ok(())
524 }
525}
526
527#[async_trait]
528impl Pollable for OutgoingDatagramStream {
529 async fn ready(&mut self) {
530 match self.send_state {
531 SendState::Idle | SendState::Permitted(_) => {}
532 SendState::Waiting => {
533 self.inner
535 .ready(Interest::WRITABLE)
536 .await
537 .expect("failed to await UDP socket readiness");
538 self.send_state = SendState::Idle;
539 }
540 }
541 }
542}
543
544pub mod sync {
545 use wasmtime::component::Resource;
546
547 use crate::p2::{
548 bindings::{
549 sockets::{
550 network::Network,
551 udp::{
552 self as async_udp,
553 HostIncomingDatagramStream as AsyncHostIncomingDatagramStream,
554 HostOutgoingDatagramStream as AsyncHostOutgoingDatagramStream,
555 HostUdpSocket as AsyncHostUdpSocket, IncomingDatagramStream,
556 OutgoingDatagramStream,
557 },
558 },
559 sync::sockets::udp::{
560 self, HostIncomingDatagramStream, HostOutgoingDatagramStream, HostUdpSocket,
561 IncomingDatagram, IpAddressFamily, IpSocketAddress, OutgoingDatagram, Pollable,
562 UdpSocket,
563 },
564 },
565 SocketError, WasiImpl, WasiView,
566 };
567 use crate::runtime::in_tokio;
568
569 impl<T> udp::Host for WasiImpl<T> where T: WasiView {}
570
571 impl<T> HostUdpSocket for WasiImpl<T>
572 where
573 T: WasiView,
574 {
575 fn start_bind(
576 &mut self,
577 self_: Resource<UdpSocket>,
578 network: Resource<Network>,
579 local_address: IpSocketAddress,
580 ) -> Result<(), SocketError> {
581 in_tokio(async {
582 AsyncHostUdpSocket::start_bind(self, self_, network, local_address).await
583 })
584 }
585
586 fn finish_bind(&mut self, self_: Resource<UdpSocket>) -> Result<(), SocketError> {
587 AsyncHostUdpSocket::finish_bind(self, self_)
588 }
589
590 fn stream(
591 &mut self,
592 self_: Resource<UdpSocket>,
593 remote_address: Option<IpSocketAddress>,
594 ) -> Result<
595 (
596 Resource<IncomingDatagramStream>,
597 Resource<OutgoingDatagramStream>,
598 ),
599 SocketError,
600 > {
601 in_tokio(async { AsyncHostUdpSocket::stream(self, self_, remote_address).await })
602 }
603
604 fn local_address(
605 &mut self,
606 self_: Resource<UdpSocket>,
607 ) -> Result<IpSocketAddress, SocketError> {
608 AsyncHostUdpSocket::local_address(self, self_)
609 }
610
611 fn remote_address(
612 &mut self,
613 self_: Resource<UdpSocket>,
614 ) -> Result<IpSocketAddress, SocketError> {
615 AsyncHostUdpSocket::remote_address(self, self_)
616 }
617
618 fn address_family(
619 &mut self,
620 self_: Resource<UdpSocket>,
621 ) -> wasmtime::Result<IpAddressFamily> {
622 AsyncHostUdpSocket::address_family(self, self_)
623 }
624
625 fn unicast_hop_limit(&mut self, self_: Resource<UdpSocket>) -> Result<u8, SocketError> {
626 AsyncHostUdpSocket::unicast_hop_limit(self, self_)
627 }
628
629 fn set_unicast_hop_limit(
630 &mut self,
631 self_: Resource<UdpSocket>,
632 value: u8,
633 ) -> Result<(), SocketError> {
634 AsyncHostUdpSocket::set_unicast_hop_limit(self, self_, value)
635 }
636
637 fn receive_buffer_size(&mut self, self_: Resource<UdpSocket>) -> Result<u64, SocketError> {
638 AsyncHostUdpSocket::receive_buffer_size(self, self_)
639 }
640
641 fn set_receive_buffer_size(
642 &mut self,
643 self_: Resource<UdpSocket>,
644 value: u64,
645 ) -> Result<(), SocketError> {
646 AsyncHostUdpSocket::set_receive_buffer_size(self, self_, value)
647 }
648
649 fn send_buffer_size(&mut self, self_: Resource<UdpSocket>) -> Result<u64, SocketError> {
650 AsyncHostUdpSocket::send_buffer_size(self, self_)
651 }
652
653 fn set_send_buffer_size(
654 &mut self,
655 self_: Resource<UdpSocket>,
656 value: u64,
657 ) -> Result<(), SocketError> {
658 AsyncHostUdpSocket::set_send_buffer_size(self, self_, value)
659 }
660
661 fn subscribe(
662 &mut self,
663 self_: Resource<UdpSocket>,
664 ) -> wasmtime::Result<Resource<Pollable>> {
665 AsyncHostUdpSocket::subscribe(self, self_)
666 }
667
668 fn drop(&mut self, rep: Resource<UdpSocket>) -> wasmtime::Result<()> {
669 AsyncHostUdpSocket::drop(self, rep)
670 }
671 }
672
673 impl<T> HostIncomingDatagramStream for WasiImpl<T>
674 where
675 T: WasiView,
676 {
677 fn receive(
678 &mut self,
679 self_: Resource<IncomingDatagramStream>,
680 max_results: u64,
681 ) -> Result<Vec<IncomingDatagram>, SocketError> {
682 Ok(
683 AsyncHostIncomingDatagramStream::receive(self, self_, max_results)?
684 .into_iter()
685 .map(Into::into)
686 .collect(),
687 )
688 }
689
690 fn subscribe(
691 &mut self,
692 self_: Resource<IncomingDatagramStream>,
693 ) -> wasmtime::Result<Resource<Pollable>> {
694 AsyncHostIncomingDatagramStream::subscribe(self, self_)
695 }
696
697 fn drop(&mut self, rep: Resource<IncomingDatagramStream>) -> wasmtime::Result<()> {
698 AsyncHostIncomingDatagramStream::drop(self, rep)
699 }
700 }
701
702 impl From<async_udp::IncomingDatagram> for IncomingDatagram {
703 fn from(other: async_udp::IncomingDatagram) -> Self {
704 let async_udp::IncomingDatagram {
705 data,
706 remote_address,
707 } = other;
708 Self {
709 data,
710 remote_address,
711 }
712 }
713 }
714
715 impl<T> HostOutgoingDatagramStream for WasiImpl<T>
716 where
717 T: WasiView,
718 {
719 fn check_send(
720 &mut self,
721 self_: Resource<OutgoingDatagramStream>,
722 ) -> Result<u64, SocketError> {
723 AsyncHostOutgoingDatagramStream::check_send(self, self_)
724 }
725
726 fn send(
727 &mut self,
728 self_: Resource<OutgoingDatagramStream>,
729 datagrams: Vec<OutgoingDatagram>,
730 ) -> Result<u64, SocketError> {
731 let datagrams = datagrams.into_iter().map(Into::into).collect();
732 in_tokio(async { AsyncHostOutgoingDatagramStream::send(self, self_, datagrams).await })
733 }
734
735 fn subscribe(
736 &mut self,
737 self_: Resource<OutgoingDatagramStream>,
738 ) -> wasmtime::Result<Resource<Pollable>> {
739 AsyncHostOutgoingDatagramStream::subscribe(self, self_)
740 }
741
742 fn drop(&mut self, rep: Resource<OutgoingDatagramStream>) -> wasmtime::Result<()> {
743 AsyncHostOutgoingDatagramStream::drop(self, rep)
744 }
745 }
746
747 impl From<OutgoingDatagram> for async_udp::OutgoingDatagram {
748 fn from(other: OutgoingDatagram) -> Self {
749 let OutgoingDatagram {
750 data,
751 remote_address,
752 } = other;
753 Self {
754 data,
755 remote_address,
756 }
757 }
758 }
759}