wasmtime_wasi/p2/host/
udp.rs

1use crate::net::{SocketAddrUse, SocketAddressFamily};
2use crate::p2::bindings::sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network};
3use crate::p2::bindings::sockets::udp;
4use crate::p2::host::network::util;
5use crate::p2::udp::{IncomingDatagramStream, OutgoingDatagramStream, SendState, UdpState};
6use crate::p2::{IoView, Pollable, SocketError, SocketResult, WasiImpl, WasiView};
7use anyhow::anyhow;
8use async_trait::async_trait;
9use io_lifetimes::AsSocketlike;
10use rustix::io::Errno;
11use std::net::SocketAddr;
12use tokio::io::Interest;
13use wasmtime::component::Resource;
14use wasmtime_wasi_io::poll::DynPollable;
15
16/// Theoretical maximum byte size of a UDP datagram, the real limit is lower,
17/// but we do not account for e.g. the transport layer here for simplicity.
18/// In practice, datagrams are typically less than 1500 bytes.
19const MAX_UDP_DATAGRAM_SIZE: usize = u16::MAX as usize;
20
21impl<T> udp::Host for WasiImpl<T> where T: WasiView {}
22
23impl<T> udp::HostUdpSocket for WasiImpl<T>
24where
25    T: WasiView,
26{
27    async fn start_bind(
28        &mut self,
29        this: Resource<udp::UdpSocket>,
30        network: Resource<Network>,
31        local_address: IpSocketAddress,
32    ) -> SocketResult<()> {
33        self.ctx().allowed_network_uses.check_allowed_udp()?;
34        let table = self.table();
35
36        match table.get(&this)?.udp_state {
37            UdpState::Default => {}
38            UdpState::BindStarted => return Err(ErrorCode::ConcurrencyConflict.into()),
39            UdpState::Bound | UdpState::Connected => return Err(ErrorCode::InvalidState.into()),
40        }
41
42        // Set the socket addr check on the socket so later functions have access to it through the socket handle
43        let check = table.get(&network)?.socket_addr_check.clone();
44        table
45            .get_mut(&this)?
46            .socket_addr_check
47            .replace(check.clone());
48
49        let socket = table.get(&this)?;
50        let local_address: SocketAddr = local_address.into();
51
52        util::validate_address_family(&local_address, &socket.family)?;
53
54        {
55            check.check(local_address, SocketAddrUse::UdpBind).await?;
56
57            // Perform the OS bind call.
58            util::udp_bind(socket.udp_socket(), &local_address).map_err(|error| match error {
59                // From https://pubs.opengroup.org/onlinepubs/9699919799/functions/bind.html:
60                // > [EAFNOSUPPORT] The specified address is not a valid address for the address family of the specified socket
61                //
62                // The most common reasons for this error should have already
63                // been handled by our own validation slightly higher up in this
64                // function. This error mapping is here just in case there is
65                // an edge case we didn't catch.
66                Errno::AFNOSUPPORT => ErrorCode::InvalidArgument,
67                _ => ErrorCode::from(error),
68            })?;
69        }
70
71        let socket = table.get_mut(&this)?;
72        socket.udp_state = UdpState::BindStarted;
73
74        Ok(())
75    }
76
77    fn finish_bind(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<()> {
78        let table = self.table();
79        let socket = table.get_mut(&this)?;
80
81        match socket.udp_state {
82            UdpState::BindStarted => {
83                socket.udp_state = UdpState::Bound;
84                Ok(())
85            }
86            _ => Err(ErrorCode::NotInProgress.into()),
87        }
88    }
89
90    async fn stream(
91        &mut self,
92        this: Resource<udp::UdpSocket>,
93        remote_address: Option<IpSocketAddress>,
94    ) -> SocketResult<(
95        Resource<udp::IncomingDatagramStream>,
96        Resource<udp::OutgoingDatagramStream>,
97    )> {
98        let table = self.table();
99
100        let has_active_streams = table
101            .iter_children(&this)?
102            .any(|c| c.is::<IncomingDatagramStream>() || c.is::<OutgoingDatagramStream>());
103
104        if has_active_streams {
105            return Err(SocketError::trap(anyhow!("UDP streams not dropped yet")));
106        }
107
108        let socket = table.get_mut(&this)?;
109        let remote_address = remote_address.map(SocketAddr::from);
110
111        match socket.udp_state {
112            UdpState::Bound | UdpState::Connected => {}
113            _ => return Err(ErrorCode::InvalidState.into()),
114        }
115
116        // We disconnect & (re)connect in two distinct steps for two reasons:
117        // - To leave our socket instance in a consistent state in case the
118        //   connect fails.
119        // - When reconnecting to a different address, Linux sometimes fails
120        //   if there isn't a disconnect in between.
121
122        // Step #1: Disconnect
123        if let UdpState::Connected = socket.udp_state {
124            util::udp_disconnect(socket.udp_socket())?;
125            socket.udp_state = UdpState::Bound;
126        }
127
128        // Step #2: (Re)connect
129        if let Some(connect_addr) = remote_address {
130            let Some(check) = socket.socket_addr_check.as_ref() else {
131                return Err(ErrorCode::InvalidState.into());
132            };
133            util::validate_remote_address(&connect_addr)?;
134            util::validate_address_family(&connect_addr, &socket.family)?;
135            check.check(connect_addr, SocketAddrUse::UdpConnect).await?;
136
137            rustix::net::connect(socket.udp_socket(), &connect_addr).map_err(
138                |error| match error {
139                    Errno::AFNOSUPPORT => ErrorCode::InvalidArgument, // See `bind` implementation.
140                    Errno::INPROGRESS => {
141                        tracing::debug!(
142                            "UDP connect returned EINPROGRESS, which should never happen"
143                        );
144                        ErrorCode::Unknown
145                    }
146                    _ => ErrorCode::from(error),
147                },
148            )?;
149            socket.udp_state = UdpState::Connected;
150        }
151
152        let incoming_stream = IncomingDatagramStream {
153            inner: socket.inner.clone(),
154            remote_address,
155        };
156        let outgoing_stream = OutgoingDatagramStream {
157            inner: socket.inner.clone(),
158            remote_address,
159            family: socket.family,
160            send_state: SendState::Idle,
161            socket_addr_check: socket.socket_addr_check.clone(),
162        };
163
164        Ok((
165            self.table().push_child(incoming_stream, &this)?,
166            self.table().push_child(outgoing_stream, &this)?,
167        ))
168    }
169
170    fn local_address(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<IpSocketAddress> {
171        let table = self.table();
172        let socket = table.get(&this)?;
173
174        match socket.udp_state {
175            UdpState::Default => return Err(ErrorCode::InvalidState.into()),
176            UdpState::BindStarted => return Err(ErrorCode::ConcurrencyConflict.into()),
177            _ => {}
178        }
179
180        let addr = socket
181            .udp_socket()
182            .as_socketlike_view::<std::net::UdpSocket>()
183            .local_addr()?;
184        Ok(addr.into())
185    }
186
187    fn remote_address(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<IpSocketAddress> {
188        let table = self.table();
189        let socket = table.get(&this)?;
190
191        match socket.udp_state {
192            UdpState::Connected => {}
193            _ => return Err(ErrorCode::InvalidState.into()),
194        }
195
196        let addr = socket
197            .udp_socket()
198            .as_socketlike_view::<std::net::UdpSocket>()
199            .peer_addr()?;
200        Ok(addr.into())
201    }
202
203    fn address_family(
204        &mut self,
205        this: Resource<udp::UdpSocket>,
206    ) -> Result<IpAddressFamily, anyhow::Error> {
207        let table = self.table();
208        let socket = table.get(&this)?;
209
210        match socket.family {
211            SocketAddressFamily::Ipv4 => Ok(IpAddressFamily::Ipv4),
212            SocketAddressFamily::Ipv6 => Ok(IpAddressFamily::Ipv6),
213        }
214    }
215
216    fn unicast_hop_limit(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u8> {
217        let table = self.table();
218        let socket = table.get(&this)?;
219
220        let ttl = match socket.family {
221            SocketAddressFamily::Ipv4 => util::get_ip_ttl(socket.udp_socket())?,
222            SocketAddressFamily::Ipv6 => util::get_ipv6_unicast_hops(socket.udp_socket())?,
223        };
224
225        Ok(ttl)
226    }
227
228    fn set_unicast_hop_limit(
229        &mut self,
230        this: Resource<udp::UdpSocket>,
231        value: u8,
232    ) -> SocketResult<()> {
233        let table = self.table();
234        let socket = table.get(&this)?;
235
236        match socket.family {
237            SocketAddressFamily::Ipv4 => util::set_ip_ttl(socket.udp_socket(), value)?,
238            SocketAddressFamily::Ipv6 => util::set_ipv6_unicast_hops(socket.udp_socket(), value)?,
239        }
240
241        Ok(())
242    }
243
244    fn receive_buffer_size(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u64> {
245        let table = self.table();
246        let socket = table.get(&this)?;
247
248        let value = util::get_socket_recv_buffer_size(socket.udp_socket())?;
249        Ok(value as u64)
250    }
251
252    fn set_receive_buffer_size(
253        &mut self,
254        this: Resource<udp::UdpSocket>,
255        value: u64,
256    ) -> SocketResult<()> {
257        let table = self.table();
258        let socket = table.get(&this)?;
259        let value = value.try_into().unwrap_or(usize::MAX);
260
261        util::set_socket_recv_buffer_size(socket.udp_socket(), value)?;
262        Ok(())
263    }
264
265    fn send_buffer_size(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u64> {
266        let table = self.table();
267        let socket = table.get(&this)?;
268
269        let value = util::get_socket_send_buffer_size(socket.udp_socket())?;
270        Ok(value as u64)
271    }
272
273    fn set_send_buffer_size(
274        &mut self,
275        this: Resource<udp::UdpSocket>,
276        value: u64,
277    ) -> SocketResult<()> {
278        let table = self.table();
279        let socket = table.get(&this)?;
280        let value = value.try_into().unwrap_or(usize::MAX);
281
282        util::set_socket_send_buffer_size(socket.udp_socket(), value)?;
283        Ok(())
284    }
285
286    fn subscribe(
287        &mut self,
288        this: Resource<udp::UdpSocket>,
289    ) -> anyhow::Result<Resource<DynPollable>> {
290        wasmtime_wasi_io::poll::subscribe(self.table(), this)
291    }
292
293    fn drop(&mut self, this: Resource<udp::UdpSocket>) -> Result<(), anyhow::Error> {
294        let table = self.table();
295
296        // As in the filesystem implementation, we assume closing a socket
297        // doesn't block.
298        let dropped = table.delete(this)?;
299        drop(dropped);
300
301        Ok(())
302    }
303}
304
305impl<T> udp::HostIncomingDatagramStream for WasiImpl<T>
306where
307    T: WasiView,
308{
309    fn receive(
310        &mut self,
311        this: Resource<udp::IncomingDatagramStream>,
312        max_results: u64,
313    ) -> SocketResult<Vec<udp::IncomingDatagram>> {
314        // Returns Ok(None) when the message was dropped.
315        fn recv_one(
316            stream: &IncomingDatagramStream,
317        ) -> SocketResult<Option<udp::IncomingDatagram>> {
318            let mut buf = [0; MAX_UDP_DATAGRAM_SIZE];
319            let (size, received_addr) = stream.inner.try_recv_from(&mut buf)?;
320            debug_assert!(size <= buf.len());
321
322            match stream.remote_address {
323                Some(connected_addr) if connected_addr != received_addr => {
324                    // Normally, this should have already been checked for us by the OS.
325                    return Ok(None);
326                }
327                _ => {}
328            }
329
330            Ok(Some(udp::IncomingDatagram {
331                data: buf[..size].into(),
332                remote_address: received_addr.into(),
333            }))
334        }
335
336        let table = self.table();
337        let stream = table.get(&this)?;
338        let max_results: usize = max_results.try_into().unwrap_or(usize::MAX);
339
340        if max_results == 0 {
341            return Ok(vec![]);
342        }
343
344        let mut datagrams = vec![];
345
346        while datagrams.len() < max_results {
347            match recv_one(stream) {
348                Ok(Some(datagram)) => {
349                    datagrams.push(datagram);
350                }
351                Ok(None) => {
352                    // Message was dropped
353                }
354                Err(_) if datagrams.len() > 0 => {
355                    return Ok(datagrams);
356                }
357                Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => {
358                    return Ok(datagrams);
359                }
360                Err(e) => {
361                    return Err(e);
362                }
363            }
364        }
365
366        Ok(datagrams)
367    }
368
369    fn subscribe(
370        &mut self,
371        this: Resource<udp::IncomingDatagramStream>,
372    ) -> anyhow::Result<Resource<DynPollable>> {
373        wasmtime_wasi_io::poll::subscribe(self.table(), this)
374    }
375
376    fn drop(&mut self, this: Resource<udp::IncomingDatagramStream>) -> Result<(), anyhow::Error> {
377        let table = self.table();
378
379        // As in the filesystem implementation, we assume closing a socket
380        // doesn't block.
381        let dropped = table.delete(this)?;
382        drop(dropped);
383
384        Ok(())
385    }
386}
387
388#[async_trait]
389impl Pollable for IncomingDatagramStream {
390    async fn ready(&mut self) {
391        // FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
392        self.inner
393            .ready(Interest::READABLE)
394            .await
395            .expect("failed to await UDP socket readiness");
396    }
397}
398
399impl<T> udp::HostOutgoingDatagramStream for WasiImpl<T>
400where
401    T: WasiView,
402{
403    fn check_send(&mut self, this: Resource<udp::OutgoingDatagramStream>) -> SocketResult<u64> {
404        let table = self.table();
405        let stream = table.get_mut(&this)?;
406
407        let permit = match stream.send_state {
408            SendState::Idle => {
409                const PERMIT: usize = 16;
410                stream.send_state = SendState::Permitted(PERMIT);
411                PERMIT
412            }
413            SendState::Permitted(n) => n,
414            SendState::Waiting => 0,
415        };
416
417        Ok(permit.try_into().unwrap())
418    }
419
420    async fn send(
421        &mut self,
422        this: Resource<udp::OutgoingDatagramStream>,
423        datagrams: Vec<udp::OutgoingDatagram>,
424    ) -> SocketResult<u64> {
425        async fn send_one(
426            stream: &OutgoingDatagramStream,
427            datagram: &udp::OutgoingDatagram,
428        ) -> SocketResult<()> {
429            if datagram.data.len() > MAX_UDP_DATAGRAM_SIZE {
430                return Err(ErrorCode::DatagramTooLarge.into());
431            }
432
433            let provided_addr = datagram.remote_address.map(SocketAddr::from);
434            let addr = match (stream.remote_address, provided_addr) {
435                (None, Some(addr)) => {
436                    let Some(check) = stream.socket_addr_check.as_ref() else {
437                        return Err(ErrorCode::InvalidState.into());
438                    };
439                    check
440                        .check(addr, SocketAddrUse::UdpOutgoingDatagram)
441                        .await?;
442                    addr
443                }
444                (Some(addr), None) => addr,
445                (Some(connected_addr), Some(provided_addr)) if connected_addr == provided_addr => {
446                    connected_addr
447                }
448                _ => return Err(ErrorCode::InvalidArgument.into()),
449            };
450
451            util::validate_remote_address(&addr)?;
452            util::validate_address_family(&addr, &stream.family)?;
453
454            if stream.remote_address == Some(addr) {
455                stream.inner.try_send(&datagram.data)?;
456            } else {
457                stream.inner.try_send_to(&datagram.data, addr)?;
458            }
459
460            Ok(())
461        }
462
463        let table = self.table();
464        let stream = table.get_mut(&this)?;
465
466        match stream.send_state {
467            SendState::Permitted(n) if n >= datagrams.len() => {
468                stream.send_state = SendState::Idle;
469            }
470            SendState::Permitted(_) => {
471                return Err(SocketError::trap(anyhow::anyhow!(
472                    "unpermitted: argument exceeds permitted size"
473                )))
474            }
475            SendState::Idle | SendState::Waiting => {
476                return Err(SocketError::trap(anyhow::anyhow!(
477                    "unpermitted: must call check-send first"
478                )))
479            }
480        }
481
482        if datagrams.is_empty() {
483            return Ok(0);
484        }
485
486        let mut count = 0;
487
488        for datagram in datagrams {
489            match send_one(stream, &datagram).await {
490                Ok(_) => count += 1,
491                Err(_) if count > 0 => {
492                    // WIT: "If at least one datagram has been sent successfully, this function never returns an error."
493                    return Ok(count);
494                }
495                Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => {
496                    stream.send_state = SendState::Waiting;
497                    return Ok(count);
498                }
499                Err(e) => {
500                    return Err(e);
501                }
502            }
503        }
504
505        Ok(count)
506    }
507
508    fn subscribe(
509        &mut self,
510        this: Resource<udp::OutgoingDatagramStream>,
511    ) -> anyhow::Result<Resource<DynPollable>> {
512        wasmtime_wasi_io::poll::subscribe(self.table(), this)
513    }
514
515    fn drop(&mut self, this: Resource<udp::OutgoingDatagramStream>) -> Result<(), anyhow::Error> {
516        let table = self.table();
517
518        // As in the filesystem implementation, we assume closing a socket
519        // doesn't block.
520        let dropped = table.delete(this)?;
521        drop(dropped);
522
523        Ok(())
524    }
525}
526
527#[async_trait]
528impl Pollable for OutgoingDatagramStream {
529    async fn ready(&mut self) {
530        match self.send_state {
531            SendState::Idle | SendState::Permitted(_) => {}
532            SendState::Waiting => {
533                // FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
534                self.inner
535                    .ready(Interest::WRITABLE)
536                    .await
537                    .expect("failed to await UDP socket readiness");
538                self.send_state = SendState::Idle;
539            }
540        }
541    }
542}
543
544pub mod sync {
545    use wasmtime::component::Resource;
546
547    use crate::p2::{
548        bindings::{
549            sockets::{
550                network::Network,
551                udp::{
552                    self as async_udp,
553                    HostIncomingDatagramStream as AsyncHostIncomingDatagramStream,
554                    HostOutgoingDatagramStream as AsyncHostOutgoingDatagramStream,
555                    HostUdpSocket as AsyncHostUdpSocket, IncomingDatagramStream,
556                    OutgoingDatagramStream,
557                },
558            },
559            sync::sockets::udp::{
560                self, HostIncomingDatagramStream, HostOutgoingDatagramStream, HostUdpSocket,
561                IncomingDatagram, IpAddressFamily, IpSocketAddress, OutgoingDatagram, Pollable,
562                UdpSocket,
563            },
564        },
565        SocketError, WasiImpl, WasiView,
566    };
567    use crate::runtime::in_tokio;
568
569    impl<T> udp::Host for WasiImpl<T> where T: WasiView {}
570
571    impl<T> HostUdpSocket for WasiImpl<T>
572    where
573        T: WasiView,
574    {
575        fn start_bind(
576            &mut self,
577            self_: Resource<UdpSocket>,
578            network: Resource<Network>,
579            local_address: IpSocketAddress,
580        ) -> Result<(), SocketError> {
581            in_tokio(async {
582                AsyncHostUdpSocket::start_bind(self, self_, network, local_address).await
583            })
584        }
585
586        fn finish_bind(&mut self, self_: Resource<UdpSocket>) -> Result<(), SocketError> {
587            AsyncHostUdpSocket::finish_bind(self, self_)
588        }
589
590        fn stream(
591            &mut self,
592            self_: Resource<UdpSocket>,
593            remote_address: Option<IpSocketAddress>,
594        ) -> Result<
595            (
596                Resource<IncomingDatagramStream>,
597                Resource<OutgoingDatagramStream>,
598            ),
599            SocketError,
600        > {
601            in_tokio(async { AsyncHostUdpSocket::stream(self, self_, remote_address).await })
602        }
603
604        fn local_address(
605            &mut self,
606            self_: Resource<UdpSocket>,
607        ) -> Result<IpSocketAddress, SocketError> {
608            AsyncHostUdpSocket::local_address(self, self_)
609        }
610
611        fn remote_address(
612            &mut self,
613            self_: Resource<UdpSocket>,
614        ) -> Result<IpSocketAddress, SocketError> {
615            AsyncHostUdpSocket::remote_address(self, self_)
616        }
617
618        fn address_family(
619            &mut self,
620            self_: Resource<UdpSocket>,
621        ) -> wasmtime::Result<IpAddressFamily> {
622            AsyncHostUdpSocket::address_family(self, self_)
623        }
624
625        fn unicast_hop_limit(&mut self, self_: Resource<UdpSocket>) -> Result<u8, SocketError> {
626            AsyncHostUdpSocket::unicast_hop_limit(self, self_)
627        }
628
629        fn set_unicast_hop_limit(
630            &mut self,
631            self_: Resource<UdpSocket>,
632            value: u8,
633        ) -> Result<(), SocketError> {
634            AsyncHostUdpSocket::set_unicast_hop_limit(self, self_, value)
635        }
636
637        fn receive_buffer_size(&mut self, self_: Resource<UdpSocket>) -> Result<u64, SocketError> {
638            AsyncHostUdpSocket::receive_buffer_size(self, self_)
639        }
640
641        fn set_receive_buffer_size(
642            &mut self,
643            self_: Resource<UdpSocket>,
644            value: u64,
645        ) -> Result<(), SocketError> {
646            AsyncHostUdpSocket::set_receive_buffer_size(self, self_, value)
647        }
648
649        fn send_buffer_size(&mut self, self_: Resource<UdpSocket>) -> Result<u64, SocketError> {
650            AsyncHostUdpSocket::send_buffer_size(self, self_)
651        }
652
653        fn set_send_buffer_size(
654            &mut self,
655            self_: Resource<UdpSocket>,
656            value: u64,
657        ) -> Result<(), SocketError> {
658            AsyncHostUdpSocket::set_send_buffer_size(self, self_, value)
659        }
660
661        fn subscribe(
662            &mut self,
663            self_: Resource<UdpSocket>,
664        ) -> wasmtime::Result<Resource<Pollable>> {
665            AsyncHostUdpSocket::subscribe(self, self_)
666        }
667
668        fn drop(&mut self, rep: Resource<UdpSocket>) -> wasmtime::Result<()> {
669            AsyncHostUdpSocket::drop(self, rep)
670        }
671    }
672
673    impl<T> HostIncomingDatagramStream for WasiImpl<T>
674    where
675        T: WasiView,
676    {
677        fn receive(
678            &mut self,
679            self_: Resource<IncomingDatagramStream>,
680            max_results: u64,
681        ) -> Result<Vec<IncomingDatagram>, SocketError> {
682            Ok(
683                AsyncHostIncomingDatagramStream::receive(self, self_, max_results)?
684                    .into_iter()
685                    .map(Into::into)
686                    .collect(),
687            )
688        }
689
690        fn subscribe(
691            &mut self,
692            self_: Resource<IncomingDatagramStream>,
693        ) -> wasmtime::Result<Resource<Pollable>> {
694            AsyncHostIncomingDatagramStream::subscribe(self, self_)
695        }
696
697        fn drop(&mut self, rep: Resource<IncomingDatagramStream>) -> wasmtime::Result<()> {
698            AsyncHostIncomingDatagramStream::drop(self, rep)
699        }
700    }
701
702    impl From<async_udp::IncomingDatagram> for IncomingDatagram {
703        fn from(other: async_udp::IncomingDatagram) -> Self {
704            let async_udp::IncomingDatagram {
705                data,
706                remote_address,
707            } = other;
708            Self {
709                data,
710                remote_address,
711            }
712        }
713    }
714
715    impl<T> HostOutgoingDatagramStream for WasiImpl<T>
716    where
717        T: WasiView,
718    {
719        fn check_send(
720            &mut self,
721            self_: Resource<OutgoingDatagramStream>,
722        ) -> Result<u64, SocketError> {
723            AsyncHostOutgoingDatagramStream::check_send(self, self_)
724        }
725
726        fn send(
727            &mut self,
728            self_: Resource<OutgoingDatagramStream>,
729            datagrams: Vec<OutgoingDatagram>,
730        ) -> Result<u64, SocketError> {
731            let datagrams = datagrams.into_iter().map(Into::into).collect();
732            in_tokio(async { AsyncHostOutgoingDatagramStream::send(self, self_, datagrams).await })
733        }
734
735        fn subscribe(
736            &mut self,
737            self_: Resource<OutgoingDatagramStream>,
738        ) -> wasmtime::Result<Resource<Pollable>> {
739            AsyncHostOutgoingDatagramStream::subscribe(self, self_)
740        }
741
742        fn drop(&mut self, rep: Resource<OutgoingDatagramStream>) -> wasmtime::Result<()> {
743            AsyncHostOutgoingDatagramStream::drop(self, rep)
744        }
745    }
746
747    impl From<OutgoingDatagram> for async_udp::OutgoingDatagram {
748        fn from(other: OutgoingDatagram) -> Self {
749            let OutgoingDatagram {
750                data,
751                remote_address,
752            } = other;
753            Self {
754                data,
755                remote_address,
756            }
757        }
758    }
759}