From d8747443d606382b0d4ed028e7a31c2580b869eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=A3rebe=20-=20Romain=20GERARD?= Date: Thu, 2 Nov 2023 09:05:22 +0100 Subject: [PATCH] feat(udp): Increase recv buffer length to avoid packet loss --- src/tunnel/io.rs | 4 ++-- src/udp.rs | 47 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/src/tunnel/io.rs b/src/tunnel/io.rs index 427259c..71d1e13 100644 --- a/src/tunnel/io.rs +++ b/src/tunnel/io.rs @@ -19,8 +19,8 @@ pub(super) async fn propagate_read( info!("Closing local tx ==> websocket tx tunnel"); }); - static JUMBO_FRAME_SIZE: usize = 9 * 1024; // enough for a jumbo frame - let mut buffer = vec![0u8; JUMBO_FRAME_SIZE]; + static MAX_PACKET_LENGTH: usize = 64 * 1024; + let mut buffer = vec![0u8; MAX_PACKET_LENGTH]; // We do our own pin_mut! to avoid shadowing timeout and be able to reset it, on next loop iteration // We reuse the future to avoid creating a timer in the tight loop diff --git a/src/udp.rs b/src/udp.rs index d1a9bb0..d0078d5 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -8,6 +8,7 @@ use std::future::Future; use std::io; use std::io::{Error, ErrorKind}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::os::fd::AsRawFd; use log::warn; use std::pin::{pin, Pin}; @@ -29,13 +30,28 @@ struct IoInner { } struct UdpServer { listener: Arc, - peers: HashMap, ahash::RandomState>, + peers: HashMap>, ahash::RandomState>, keys_to_delete: Arc>>, cnx_timeout: Option, } impl UdpServer { pub fn new(listener: Arc, timeout: Option) -> Self { + unsafe { + // Increase socket buffer length to 64MB + let buf_len: libc::c_int = 64 * 1024 * 1024; + let ret = libc::setsockopt( + listener.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_RCVBUF, + &buf_len as *const _ as *const libc::c_void, + std::mem::size_of_val(&buf_len) as libc::socklen_t, + ); + if ret != 0 { + warn!("Cannot set UDP server recv buffer: {}", io::Error::last_os_error()); + } + }; + Self { listener, peers: HashMap::with_hasher(ahash::RandomState::new()), @@ -72,7 +88,7 @@ pub struct UdpStream { has_been_notified: bool, #[pin] pending_notification: Option>, - io: Arc, + io: Pin>, keys_to_delete: Weak>>, } @@ -97,10 +113,10 @@ impl UdpStream { peer: SocketAddr, watchdog_deadline: Option, keys_to_delete: Weak>>, - ) -> (Self, Arc) { + ) -> (Self, Pin>) { let has_data_to_read = Notify::new(); let has_read_data = Notify::new(); - let io = Arc::new(IoInner { + let io = Arc::pin(IoInner { has_data_to_read, has_read_data, }); @@ -133,16 +149,15 @@ impl AsyncRead for UdpStream { // Look that the timeout for client has not elapsed if let Some(mut deadline) = project.watchdog_deadline.as_pin_mut() { if deadline.poll_tick(cx).is_ready() { - return if *project.data_read_before_deadline { - *project.data_read_before_deadline = false; - let _ = deadline.poll_tick(cx); - Poll::Pending - } else { - Poll::Ready(Err(Error::new( + if !*project.data_read_before_deadline { + return Poll::Ready(Err(Error::new( ErrorKind::TimedOut, format!("UDP stream timeout with {}", project.peer), - ))) + ))); }; + + *project.data_read_before_deadline = false; + while deadline.poll_tick(cx).is_ready() {} } } @@ -154,9 +169,15 @@ impl AsyncRead for UdpStream { let peer = ready!(project.socket.poll_recv_from(cx, obuf))?; debug_assert_eq!(peer, *project.peer); *project.data_read_before_deadline = true; + + // re-arm notification let notified: Notified<'static> = unsafe { std::mem::transmute(project.io.has_data_to_read.notified()) }; project.pending_notification.as_mut().set(Some(notified)); + project.pending_notification.as_pin_mut().unwrap().enable(); + + // Let know server that we have read data project.io.has_read_data.notify_one(); + Poll::Ready(Ok(())) } } @@ -194,9 +215,7 @@ pub async fn run_server( // New returned peer hasn't read its data yet, await for it. if let Some(await_peer) = peer_with_data { if let Some(peer) = server.peers.get(&await_peer) { - info!("waiting for peer {} to read its first data", await_peer.port()); peer.has_read_data.notified().await; - info!("peer {} to read its first data", await_peer.port()); } }; @@ -212,10 +231,8 @@ pub async fn run_server( match server.peers.get(&peer_addr) { Some(io) => { - info!("waiting for peer {} to read its data", peer_addr.port()); io.has_data_to_read.notify_one(); io.has_read_data.notified().await; - info!("peer {} to read its data", peer_addr.port()); } None => { info!("New UDP connection from {}", peer_addr);