1use crate::p2::P2TcpStreamingState;
2use crate::runtime::with_ambient_tokio_runtime;
3use crate::sockets::util::{
4 ErrorCode, get_unicast_hop_limit, is_valid_address_family, is_valid_remote_address,
5 is_valid_unicast_address, receive_buffer_size, send_buffer_size, set_keep_alive_count,
6 set_keep_alive_idle_time, set_keep_alive_interval, set_receive_buffer_size,
7 set_send_buffer_size, set_unicast_hop_limit, tcp_bind,
8};
9use crate::sockets::{DEFAULT_TCP_BACKLOG, SocketAddressFamily, WasiSocketsCtx};
10use io_lifetimes::AsSocketlike as _;
11use io_lifetimes::views::SocketlikeView;
12use rustix::io::Errno;
13use rustix::net::sockopt;
14use std::fmt::Debug;
15use std::io;
16use std::mem;
17use std::net::SocketAddr;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll, Waker};
21use std::time::Duration;
22
23enum TcpState {
29 Default(tokio::net::TcpSocket),
34
35 BindStarted(tokio::net::TcpSocket),
40
41 Bound(tokio::net::TcpSocket),
46
47 ListenStarted(tokio::net::TcpSocket),
52
53 Listening {
57 listener: Arc<tokio::net::TcpListener>,
59
60 pending_accept: Option<io::Result<tokio::net::TcpStream>>,
64 },
65
66 Connecting(Option<Pin<Box<dyn Future<Output = io::Result<tokio::net::TcpStream>> + Send>>>),
75
76 ConnectReady(io::Result<tokio::net::TcpStream>),
84
85 Connected(Arc<tokio::net::TcpStream>),
92
93 #[cfg(feature = "p3")]
97 Receiving(Arc<tokio::net::TcpStream>),
98
99 P2Streaming(Box<P2TcpStreamingState>),
104
105 #[cfg(feature = "p3")]
110 Error(io::Error),
111
112 Closed,
114}
115
116impl Debug for TcpState {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 match self {
119 Self::Default(_) => f.debug_tuple("Default").finish(),
120 Self::BindStarted(_) => f.debug_tuple("BindStarted").finish(),
121 Self::Bound(_) => f.debug_tuple("Bound").finish(),
122 Self::ListenStarted { .. } => f.debug_tuple("ListenStarted").finish(),
123 Self::Listening { .. } => f.debug_tuple("Listening").finish(),
124 Self::Connecting(..) => f.debug_tuple("Connecting").finish(),
125 Self::ConnectReady(..) => f.debug_tuple("ConnectReady").finish(),
126 Self::Connected { .. } => f.debug_tuple("Connected").finish(),
127 #[cfg(feature = "p3")]
128 Self::Receiving { .. } => f.debug_tuple("Receiving").finish(),
129 Self::P2Streaming(_) => f.debug_tuple("P2Streaming").finish(),
130 #[cfg(feature = "p3")]
131 Self::Error(..) => f.debug_tuple("Error").finish(),
132 Self::Closed => write!(f, "Closed"),
133 }
134 }
135}
136
137pub struct TcpSocket {
139 tcp_state: TcpState,
141
142 listen_backlog_size: u32,
144
145 family: SocketAddressFamily,
146
147 options: NonInheritedOptions,
148}
149
150impl TcpSocket {
151 pub(crate) fn new(
153 ctx: &WasiSocketsCtx,
154 family: SocketAddressFamily,
155 ) -> Result<Self, ErrorCode> {
156 ctx.allowed_network_uses.check_allowed_tcp()?;
157
158 with_ambient_tokio_runtime(|| {
159 let socket = match family {
160 SocketAddressFamily::Ipv4 => tokio::net::TcpSocket::new_v4()?,
161 SocketAddressFamily::Ipv6 => {
162 let socket = tokio::net::TcpSocket::new_v6()?;
163 sockopt::set_ipv6_v6only(&socket, true)?;
164 socket
165 }
166 };
167
168 Ok(Self::from_state(TcpState::Default(socket), family))
169 })
170 }
171
172 #[cfg(feature = "p3")]
173 pub(crate) fn new_error(err: io::Error, family: SocketAddressFamily) -> Self {
174 TcpSocket::from_state(TcpState::Error(err), family)
175 }
176
177 pub(crate) fn new_accept(
183 result: io::Result<tokio::net::TcpStream>,
184 options: &NonInheritedOptions,
185 family: SocketAddressFamily,
186 ) -> io::Result<Self> {
187 let client = result.map_err(|err| match Errno::from_io_error(&err) {
188 #[cfg(windows)]
196 Some(Errno::INPROGRESS) => Errno::INTR.into(),
197
198 #[cfg(target_os = "linux")]
205 Some(
206 Errno::CONNRESET
207 | Errno::NETRESET
208 | Errno::HOSTUNREACH
209 | Errno::HOSTDOWN
210 | Errno::NETDOWN
211 | Errno::NETUNREACH
212 | Errno::PROTO
213 | Errno::NOPROTOOPT
214 | Errno::NONET
215 | Errno::OPNOTSUPP,
216 ) => Errno::CONNABORTED.into(),
217
218 _ => err,
219 })?;
220 options.apply(family, &client);
221 Ok(Self::from_state(
222 TcpState::Connected(Arc::new(client)),
223 family,
224 ))
225 }
226
227 fn from_state(state: TcpState, family: SocketAddressFamily) -> Self {
229 Self {
230 tcp_state: state,
231 listen_backlog_size: DEFAULT_TCP_BACKLOG,
232 family,
233 options: Default::default(),
234 }
235 }
236
237 pub(crate) fn as_std_view(&self) -> Result<SocketlikeView<'_, std::net::TcpStream>, ErrorCode> {
238 match &self.tcp_state {
239 TcpState::Default(socket)
240 | TcpState::BindStarted(socket)
241 | TcpState::Bound(socket)
242 | TcpState::ListenStarted(socket) => Ok(socket.as_socketlike_view()),
243 TcpState::Connected(stream) => Ok(stream.as_socketlike_view()),
244 #[cfg(feature = "p3")]
245 TcpState::Receiving(stream) => Ok(stream.as_socketlike_view()),
246 TcpState::Listening { listener, .. } => Ok(listener.as_socketlike_view()),
247 TcpState::P2Streaming(state) => Ok(state.stream.as_socketlike_view()),
248 TcpState::Connecting(..) | TcpState::ConnectReady(_) | TcpState::Closed => {
249 Err(ErrorCode::InvalidState)
250 }
251 #[cfg(feature = "p3")]
252 TcpState::Error(err) => Err(err.into()),
253 }
254 }
255
256 pub(crate) fn start_bind(&mut self, addr: SocketAddr) -> Result<(), ErrorCode> {
257 let ip = addr.ip();
258 if !is_valid_unicast_address(ip) || !is_valid_address_family(ip, self.family) {
259 return Err(ErrorCode::InvalidArgument);
260 }
261 match mem::replace(&mut self.tcp_state, TcpState::Closed) {
262 TcpState::Default(sock) => {
263 if let Err(err) = tcp_bind(&sock, addr) {
264 self.tcp_state = TcpState::Default(sock);
265 Err(err)
266 } else {
267 self.tcp_state = TcpState::BindStarted(sock);
268 Ok(())
269 }
270 }
271 tcp_state => {
272 self.tcp_state = tcp_state;
273 Err(ErrorCode::InvalidState)
274 }
275 }
276 }
277
278 pub(crate) fn finish_bind(&mut self) -> Result<(), ErrorCode> {
279 match mem::replace(&mut self.tcp_state, TcpState::Closed) {
280 TcpState::BindStarted(socket) => {
281 self.tcp_state = TcpState::Bound(socket);
282 Ok(())
283 }
284 current_state => {
285 self.tcp_state = current_state;
287 Err(ErrorCode::NotInProgress)
288 }
289 }
290 }
291
292 pub(crate) fn start_connect(
293 &mut self,
294 addr: &SocketAddr,
295 ) -> Result<tokio::net::TcpSocket, ErrorCode> {
296 match self.tcp_state {
297 TcpState::Default(..) | TcpState::Bound(..) => {}
298 TcpState::Connecting(..) => {
299 return Err(ErrorCode::ConcurrencyConflict);
300 }
301 _ => return Err(ErrorCode::InvalidState),
302 };
303
304 if !is_valid_unicast_address(addr.ip())
305 || !is_valid_remote_address(*addr)
306 || !is_valid_address_family(addr.ip(), self.family)
307 {
308 return Err(ErrorCode::InvalidArgument);
309 };
310
311 let (TcpState::Default(tokio_socket) | TcpState::Bound(tokio_socket)) =
312 mem::replace(&mut self.tcp_state, TcpState::Connecting(None))
313 else {
314 unreachable!();
315 };
316
317 Ok(tokio_socket)
318 }
319
320 pub(crate) fn set_pending_connect(
323 &mut self,
324 future: impl Future<Output = io::Result<tokio::net::TcpStream>> + Send + 'static,
325 ) -> Result<(), ErrorCode> {
326 match &mut self.tcp_state {
327 TcpState::Connecting(slot @ None) => {
328 *slot = Some(Box::pin(future));
329 Ok(())
330 }
331 _ => Err(ErrorCode::InvalidState),
332 }
333 }
334
335 pub(crate) fn take_pending_connect(
344 &mut self,
345 ) -> Result<Option<io::Result<tokio::net::TcpStream>>, ErrorCode> {
346 match mem::replace(&mut self.tcp_state, TcpState::Connecting(None)) {
347 TcpState::ConnectReady(result) => Ok(Some(result)),
348 TcpState::Connecting(Some(mut future)) => {
349 let mut cx = Context::from_waker(Waker::noop());
350 match with_ambient_tokio_runtime(|| future.as_mut().poll(&mut cx)) {
351 Poll::Ready(result) => Ok(Some(result)),
352 Poll::Pending => {
353 self.tcp_state = TcpState::Connecting(Some(future));
354 Ok(None)
355 }
356 }
357 }
358 current_state => {
359 self.tcp_state = current_state;
360 Err(ErrorCode::NotInProgress)
361 }
362 }
363 }
364
365 pub(crate) fn finish_connect(
366 &mut self,
367 result: io::Result<tokio::net::TcpStream>,
368 ) -> Result<(), ErrorCode> {
369 if !matches!(self.tcp_state, TcpState::Connecting(None)) {
370 return Err(ErrorCode::InvalidState);
371 }
372 match result {
373 Ok(stream) => {
374 self.tcp_state = TcpState::Connected(Arc::new(stream));
375 Ok(())
376 }
377 Err(err) => {
378 self.tcp_state = TcpState::Closed;
379 Err(ErrorCode::from(err))
380 }
381 }
382 }
383
384 pub(crate) fn start_listen_p2(&mut self) -> Result<(), ErrorCode> {
386 match mem::replace(&mut self.tcp_state, TcpState::Closed) {
387 TcpState::Bound(tokio_socket) => {
388 self.tcp_state = TcpState::ListenStarted(tokio_socket);
389 Ok(())
390 }
391 previous_state => {
392 self.tcp_state = previous_state;
393 Err(ErrorCode::InvalidState)
394 }
395 }
396 }
397
398 pub(crate) fn finish_listen_p2(&mut self) -> Result<(), ErrorCode> {
399 let tokio_socket = match mem::replace(&mut self.tcp_state, TcpState::Closed) {
400 TcpState::ListenStarted(tokio_socket) => tokio_socket,
401 previous_state => {
402 self.tcp_state = previous_state;
403 return Err(ErrorCode::NotInProgress);
404 }
405 };
406
407 self.listen_common(tokio_socket)
408 }
409
410 #[cfg(feature = "p3")]
412 pub(crate) fn listen_p3(&mut self) -> Result<(), ErrorCode> {
413 let tokio_socket = match mem::replace(&mut self.tcp_state, TcpState::Closed) {
414 TcpState::Bound(tokio_socket) => tokio_socket,
415 TcpState::Default(tokio_socket) => {
416 let implicit_addr = crate::sockets::util::implicit_bind_addr(self.family);
444 tcp_bind(&tokio_socket, implicit_addr)?;
445 tokio_socket
446 }
447 previous_state => {
448 self.tcp_state = previous_state;
449 return Err(ErrorCode::InvalidState);
450 }
451 };
452
453 self.listen_common(tokio_socket)
454 }
455
456 fn listen_common(&mut self, tokio_socket: tokio::net::TcpSocket) -> Result<(), ErrorCode> {
457 match with_ambient_tokio_runtime(|| tokio_socket.listen(self.listen_backlog_size)) {
458 Ok(listener) => {
459 self.tcp_state = TcpState::Listening {
460 listener: Arc::new(listener),
461 pending_accept: None,
462 };
463 Ok(())
464 }
465 Err(err) => {
466 self.tcp_state = TcpState::Closed;
467
468 Err(match Errno::from_io_error(&err) {
469 #[cfg(windows)]
479 Some(Errno::MFILE) => Errno::NOBUFS.into(),
480
481 _ => err.into(),
482 })
483 }
484 }
485 }
486
487 pub(crate) fn accept(&mut self) -> Result<Option<Self>, ErrorCode> {
488 let TcpState::Listening {
489 listener,
490 pending_accept,
491 } = &mut self.tcp_state
492 else {
493 return Err(ErrorCode::InvalidState);
494 };
495
496 let result = match pending_accept.take() {
497 Some(result) => result,
498 None => {
499 let mut cx = std::task::Context::from_waker(Waker::noop());
500 match with_ambient_tokio_runtime(|| listener.poll_accept(&mut cx))
501 .map_ok(|(stream, _)| stream)
502 {
503 Poll::Ready(result) => result,
504 Poll::Pending => return Ok(None),
505 }
506 }
507 };
508
509 Ok(Some(Self::new_accept(result, &self.options, self.family)?))
510 }
511
512 #[cfg(feature = "p3")]
513 pub(crate) fn start_receive(&mut self) -> Option<&Arc<tokio::net::TcpStream>> {
514 match mem::replace(&mut self.tcp_state, TcpState::Closed) {
515 TcpState::Connected(stream) => {
516 self.tcp_state = TcpState::Receiving(stream);
517 Some(self.tcp_stream_arc().unwrap())
518 }
519 prev => {
520 self.tcp_state = prev;
521 None
522 }
523 }
524 }
525
526 pub(crate) fn local_address(&self) -> Result<SocketAddr, ErrorCode> {
527 match &self.tcp_state {
528 TcpState::Bound(socket) => Ok(socket.local_addr()?),
529 TcpState::Connected(stream) => Ok(stream.local_addr()?),
530 #[cfg(feature = "p3")]
531 TcpState::Receiving(stream) => Ok(stream.local_addr()?),
532 TcpState::P2Streaming(state) => Ok(state.stream.local_addr()?),
533 TcpState::Listening { listener, .. } => Ok(listener.local_addr()?),
534 #[cfg(feature = "p3")]
535 TcpState::Error(err) => Err(err.into()),
536 _ => Err(ErrorCode::InvalidState),
537 }
538 }
539
540 pub(crate) fn remote_address(&self) -> Result<SocketAddr, ErrorCode> {
541 let stream = self.tcp_stream_arc()?;
542 let addr = stream.peer_addr()?;
543 Ok(addr)
544 }
545
546 pub(crate) fn is_listening(&self) -> bool {
547 matches!(self.tcp_state, TcpState::Listening { .. })
548 }
549
550 pub(crate) fn address_family(&self) -> SocketAddressFamily {
551 self.family
552 }
553
554 pub(crate) fn set_listen_backlog_size(&mut self, value: u64) -> Result<(), ErrorCode> {
555 const MIN_BACKLOG: u32 = 1;
556 const MAX_BACKLOG: u32 = i32::MAX as u32; if value == 0 {
559 return Err(ErrorCode::InvalidArgument);
560 }
561 let value = value
563 .try_into()
564 .unwrap_or(MAX_BACKLOG)
565 .clamp(MIN_BACKLOG, MAX_BACKLOG);
566 match &self.tcp_state {
567 TcpState::Default(..) | TcpState::Bound(..) => {
568 self.listen_backlog_size = value;
570 Ok(())
571 }
572 TcpState::Listening { listener, .. } => {
573 if rustix::net::listen(&listener, value.try_into().unwrap_or(i32::MAX)).is_err() {
576 return Err(ErrorCode::NotSupported);
577 }
578 self.listen_backlog_size = value;
579 Ok(())
580 }
581 #[cfg(feature = "p3")]
582 TcpState::Error(err) => Err(err.into()),
583 _ => Err(ErrorCode::InvalidState),
584 }
585 }
586
587 pub(crate) fn keep_alive_enabled(&self) -> Result<bool, ErrorCode> {
588 let fd = &*self.as_std_view()?;
589 let v = sockopt::socket_keepalive(fd)?;
590 Ok(v)
591 }
592
593 pub(crate) fn set_keep_alive_enabled(&self, value: bool) -> Result<(), ErrorCode> {
594 let fd = &*self.as_std_view()?;
595 sockopt::set_socket_keepalive(fd, value)?;
596 Ok(())
597 }
598
599 pub(crate) fn keep_alive_idle_time(&self) -> Result<u64, ErrorCode> {
600 let fd = &*self.as_std_view()?;
601 let v = sockopt::tcp_keepidle(fd)?;
602 Ok(v.as_nanos().try_into().unwrap_or(u64::MAX))
603 }
604
605 pub(crate) fn set_keep_alive_idle_time(&mut self, value: u64) -> Result<(), ErrorCode> {
606 let value = {
607 let fd = self.as_std_view()?;
608 set_keep_alive_idle_time(&*fd, value)?
609 };
610 self.options.set_keep_alive_idle_time(value);
611 Ok(())
612 }
613
614 pub(crate) fn keep_alive_interval(&self) -> Result<u64, ErrorCode> {
615 let fd = &*self.as_std_view()?;
616 let v = sockopt::tcp_keepintvl(fd)?;
617 Ok(v.as_nanos().try_into().unwrap_or(u64::MAX))
618 }
619
620 pub(crate) fn set_keep_alive_interval(&self, value: u64) -> Result<(), ErrorCode> {
621 let fd = &*self.as_std_view()?;
622 set_keep_alive_interval(fd, Duration::from_nanos(value))?;
623 Ok(())
624 }
625
626 pub(crate) fn keep_alive_count(&self) -> Result<u32, ErrorCode> {
627 let fd = &*self.as_std_view()?;
628 let v = sockopt::tcp_keepcnt(fd)?;
629 Ok(v)
630 }
631
632 pub(crate) fn set_keep_alive_count(&self, value: u32) -> Result<(), ErrorCode> {
633 let fd = &*self.as_std_view()?;
634 set_keep_alive_count(fd, value)?;
635 Ok(())
636 }
637
638 pub(crate) fn hop_limit(&self) -> Result<u8, ErrorCode> {
639 let fd = &*self.as_std_view()?;
640 let n = get_unicast_hop_limit(fd, self.family)?;
641 Ok(n)
642 }
643
644 pub(crate) fn set_hop_limit(&mut self, value: u8) -> Result<(), ErrorCode> {
645 {
646 let fd = &*self.as_std_view()?;
647 set_unicast_hop_limit(fd, self.family, value)?;
648 }
649 self.options.set_hop_limit(value);
650 Ok(())
651 }
652
653 pub(crate) fn receive_buffer_size(&self) -> Result<u64, ErrorCode> {
654 let fd = &*self.as_std_view()?;
655 let n = receive_buffer_size(fd)?;
656 Ok(n)
657 }
658
659 pub(crate) fn set_receive_buffer_size(&mut self, value: u64) -> Result<(), ErrorCode> {
660 let res = {
661 let fd = &*self.as_std_view()?;
662 set_receive_buffer_size(fd, value)?
663 };
664 self.options.set_receive_buffer_size(res);
665 Ok(())
666 }
667
668 pub(crate) fn send_buffer_size(&self) -> Result<u64, ErrorCode> {
669 let fd = &*self.as_std_view()?;
670 let n = send_buffer_size(fd)?;
671 Ok(n)
672 }
673
674 pub(crate) fn set_send_buffer_size(&mut self, value: u64) -> Result<(), ErrorCode> {
675 let res = {
676 let fd = &*self.as_std_view()?;
677 set_send_buffer_size(fd, value)?
678 };
679 self.options.set_send_buffer_size(res);
680 Ok(())
681 }
682
683 #[cfg(feature = "p3")]
684 pub(crate) fn non_inherited_options(&self) -> &NonInheritedOptions {
685 &self.options
686 }
687
688 #[cfg(feature = "p3")]
689 pub(crate) fn tcp_listener_arc(&self) -> Result<&Arc<tokio::net::TcpListener>, ErrorCode> {
690 match &self.tcp_state {
691 TcpState::Listening { listener, .. } => Ok(listener),
692 #[cfg(feature = "p3")]
693 TcpState::Error(err) => Err(err.into()),
694 _ => Err(ErrorCode::InvalidState),
695 }
696 }
697
698 pub(crate) fn tcp_stream_arc(&self) -> Result<&Arc<tokio::net::TcpStream>, ErrorCode> {
699 match &self.tcp_state {
700 TcpState::Connected(socket) => Ok(socket),
701 #[cfg(feature = "p3")]
702 TcpState::Receiving(socket) => Ok(socket),
703 TcpState::P2Streaming(state) => Ok(&state.stream),
704 #[cfg(feature = "p3")]
705 TcpState::Error(err) => Err(err.into()),
706 _ => Err(ErrorCode::InvalidState),
707 }
708 }
709
710 pub(crate) fn p2_streaming_state(&self) -> Result<&P2TcpStreamingState, ErrorCode> {
711 match &self.tcp_state {
712 TcpState::P2Streaming(state) => Ok(state),
713 #[cfg(feature = "p3")]
714 TcpState::Error(err) => Err(err.into()),
715 _ => Err(ErrorCode::InvalidState),
716 }
717 }
718
719 pub(crate) fn set_p2_streaming_state(
720 &mut self,
721 state: P2TcpStreamingState,
722 ) -> Result<(), ErrorCode> {
723 if !matches!(self.tcp_state, TcpState::Connected(_)) {
724 return Err(ErrorCode::InvalidState);
725 }
726 self.tcp_state = TcpState::P2Streaming(Box::new(state));
727 Ok(())
728 }
729
730 pub(crate) async fn ready(&mut self) {
736 match &mut self.tcp_state {
737 TcpState::Default(..)
738 | TcpState::BindStarted(..)
739 | TcpState::Bound(..)
740 | TcpState::ListenStarted(..)
741 | TcpState::ConnectReady(..)
742 | TcpState::Closed
743 | TcpState::Connected { .. }
744 | TcpState::Connecting(None)
745 | TcpState::Listening {
746 pending_accept: Some(_),
747 ..
748 }
749 | TcpState::P2Streaming(_) => {}
750
751 #[cfg(feature = "p3")]
752 TcpState::Receiving(_) | TcpState::Error(_) => {}
753
754 TcpState::Connecting(Some(future)) => {
755 self.tcp_state = TcpState::ConnectReady(future.as_mut().await);
756 }
757
758 TcpState::Listening {
759 listener,
760 pending_accept: slot @ None,
761 } => {
762 let result = futures::future::poll_fn(|cx| {
763 listener.poll_accept(cx).map_ok(|(stream, _)| stream)
764 })
765 .await;
766 *slot = Some(result);
767 }
768 }
769 }
770}
771
772#[cfg(not(target_os = "macos"))]
773pub use inherits_option::*;
774#[cfg(not(target_os = "macos"))]
775mod inherits_option {
776 use crate::sockets::SocketAddressFamily;
777 use tokio::net::TcpStream;
778
779 #[derive(Default, Clone)]
780 pub struct NonInheritedOptions;
781
782 impl NonInheritedOptions {
783 pub fn set_keep_alive_idle_time(&mut self, _value: u64) {}
784
785 pub fn set_hop_limit(&mut self, _value: u8) {}
786
787 pub fn set_receive_buffer_size(&mut self, _value: usize) {}
788
789 pub fn set_send_buffer_size(&mut self, _value: usize) {}
790
791 pub(crate) fn apply(&self, _family: SocketAddressFamily, _stream: &TcpStream) {}
792 }
793}
794
795#[cfg(target_os = "macos")]
796pub use does_not_inherit_options::*;
797#[cfg(target_os = "macos")]
798mod does_not_inherit_options {
799 use crate::sockets::SocketAddressFamily;
800 use rustix::net::sockopt;
801 use std::sync::Arc;
802 use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering::Relaxed};
803 use std::time::Duration;
804 use tokio::net::TcpStream;
805
806 #[derive(Default, Clone)]
810 pub struct NonInheritedOptions(Arc<Inner>);
811
812 #[derive(Default)]
813 struct Inner {
814 receive_buffer_size: AtomicUsize,
815 send_buffer_size: AtomicUsize,
816 hop_limit: AtomicU8,
817 keep_alive_idle_time: AtomicU64, }
819
820 impl NonInheritedOptions {
821 pub fn set_keep_alive_idle_time(&mut self, value: u64) {
822 self.0.keep_alive_idle_time.store(value, Relaxed);
823 }
824
825 pub fn set_hop_limit(&mut self, value: u8) {
826 self.0.hop_limit.store(value, Relaxed);
827 }
828
829 pub fn set_receive_buffer_size(&mut self, value: usize) {
830 self.0.receive_buffer_size.store(value, Relaxed);
831 }
832
833 pub fn set_send_buffer_size(&mut self, value: usize) {
834 self.0.send_buffer_size.store(value, Relaxed);
835 }
836
837 pub(crate) fn apply(&self, family: SocketAddressFamily, stream: &TcpStream) {
838 let receive_buffer_size = self.0.receive_buffer_size.load(Relaxed);
843 if receive_buffer_size > 0 {
844 _ = sockopt::set_socket_recv_buffer_size(&stream, receive_buffer_size);
846 }
847
848 let send_buffer_size = self.0.send_buffer_size.load(Relaxed);
849 if send_buffer_size > 0 {
850 _ = sockopt::set_socket_send_buffer_size(&stream, send_buffer_size);
852 }
853
854 if family == SocketAddressFamily::Ipv6 {
856 let hop_limit = self.0.hop_limit.load(Relaxed);
857 if hop_limit > 0 {
858 _ = sockopt::set_ipv6_unicast_hops(&stream, Some(hop_limit));
860 }
861 }
862
863 let keep_alive_idle_time = self.0.keep_alive_idle_time.load(Relaxed);
864 if keep_alive_idle_time > 0 {
865 _ = sockopt::set_tcp_keepidle(&stream, Duration::from_nanos(keep_alive_idle_time));
867 }
868 }
869 }
870}