feat(udp): Increase recv buffer length to avoid packet loss

This commit is contained in:
Σrebe - Romain GERARD 2023-11-02 09:05:22 +01:00
parent 9883b8b32b
commit d8747443d6
No known key found for this signature in database
GPG key ID: 7A42B4B97E0332F4
2 changed files with 34 additions and 17 deletions

View file

@ -19,8 +19,8 @@ pub(super) async fn propagate_read(
info!("Closing local tx ==> websocket tx tunnel"); info!("Closing local tx ==> websocket tx tunnel");
}); });
static JUMBO_FRAME_SIZE: usize = 9 * 1024; // enough for a jumbo frame static MAX_PACKET_LENGTH: usize = 64 * 1024;
let mut buffer = vec![0u8; JUMBO_FRAME_SIZE]; let mut buffer = vec![0u8; MAX_PACKET_LENGTH];
// We do our own pin_mut! to avoid shadowing timeout and be able to reset it, on next loop iteration // We do our own pin_mut! to avoid shadowing timeout and be able to reset it, on next loop iteration
// We reuse the future to avoid creating a timer in the tight loop // We reuse the future to avoid creating a timer in the tight loop

View file

@ -8,6 +8,7 @@ use std::future::Future;
use std::io; use std::io;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::os::fd::AsRawFd;
use log::warn; use log::warn;
use std::pin::{pin, Pin}; use std::pin::{pin, Pin};
@ -29,13 +30,28 @@ struct IoInner {
} }
struct UdpServer { struct UdpServer {
listener: Arc<UdpSocket>, listener: Arc<UdpSocket>,
peers: HashMap<SocketAddr, Arc<IoInner>, ahash::RandomState>, peers: HashMap<SocketAddr, Pin<Arc<IoInner>>, ahash::RandomState>,
keys_to_delete: Arc<RwLock<Vec<SocketAddr>>>, keys_to_delete: Arc<RwLock<Vec<SocketAddr>>>,
cnx_timeout: Option<Duration>, cnx_timeout: Option<Duration>,
} }
impl UdpServer { impl UdpServer {
pub fn new(listener: Arc<UdpSocket>, timeout: Option<Duration>) -> Self { pub fn new(listener: Arc<UdpSocket>, timeout: Option<Duration>) -> Self {
unsafe {
// Increase socket buffer length to 64MB
let buf_len: libc::c_int = 64 * 1024 * 1024;
let ret = libc::setsockopt(
listener.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_RCVBUF,
&buf_len as *const _ as *const libc::c_void,
std::mem::size_of_val(&buf_len) as libc::socklen_t,
);
if ret != 0 {
warn!("Cannot set UDP server recv buffer: {}", io::Error::last_os_error());
}
};
Self { Self {
listener, listener,
peers: HashMap::with_hasher(ahash::RandomState::new()), peers: HashMap::with_hasher(ahash::RandomState::new()),
@ -72,7 +88,7 @@ pub struct UdpStream {
has_been_notified: bool, has_been_notified: bool,
#[pin] #[pin]
pending_notification: Option<Notified<'static>>, pending_notification: Option<Notified<'static>>,
io: Arc<IoInner>, io: Pin<Arc<IoInner>>,
keys_to_delete: Weak<RwLock<Vec<SocketAddr>>>, keys_to_delete: Weak<RwLock<Vec<SocketAddr>>>,
} }
@ -97,10 +113,10 @@ impl UdpStream {
peer: SocketAddr, peer: SocketAddr,
watchdog_deadline: Option<Duration>, watchdog_deadline: Option<Duration>,
keys_to_delete: Weak<RwLock<Vec<SocketAddr>>>, keys_to_delete: Weak<RwLock<Vec<SocketAddr>>>,
) -> (Self, Arc<IoInner>) { ) -> (Self, Pin<Arc<IoInner>>) {
let has_data_to_read = Notify::new(); let has_data_to_read = Notify::new();
let has_read_data = Notify::new(); let has_read_data = Notify::new();
let io = Arc::new(IoInner { let io = Arc::pin(IoInner {
has_data_to_read, has_data_to_read,
has_read_data, has_read_data,
}); });
@ -133,16 +149,15 @@ impl AsyncRead for UdpStream {
// Look that the timeout for client has not elapsed // Look that the timeout for client has not elapsed
if let Some(mut deadline) = project.watchdog_deadline.as_pin_mut() { if let Some(mut deadline) = project.watchdog_deadline.as_pin_mut() {
if deadline.poll_tick(cx).is_ready() { if deadline.poll_tick(cx).is_ready() {
return if *project.data_read_before_deadline { if !*project.data_read_before_deadline {
*project.data_read_before_deadline = false; return Poll::Ready(Err(Error::new(
let _ = deadline.poll_tick(cx);
Poll::Pending
} else {
Poll::Ready(Err(Error::new(
ErrorKind::TimedOut, ErrorKind::TimedOut,
format!("UDP stream timeout with {}", project.peer), format!("UDP stream timeout with {}", project.peer),
))) )));
}; };
*project.data_read_before_deadline = false;
while deadline.poll_tick(cx).is_ready() {}
} }
} }
@ -154,9 +169,15 @@ impl AsyncRead for UdpStream {
let peer = ready!(project.socket.poll_recv_from(cx, obuf))?; let peer = ready!(project.socket.poll_recv_from(cx, obuf))?;
debug_assert_eq!(peer, *project.peer); debug_assert_eq!(peer, *project.peer);
*project.data_read_before_deadline = true; *project.data_read_before_deadline = true;
// re-arm notification
let notified: Notified<'static> = unsafe { std::mem::transmute(project.io.has_data_to_read.notified()) }; let notified: Notified<'static> = unsafe { std::mem::transmute(project.io.has_data_to_read.notified()) };
project.pending_notification.as_mut().set(Some(notified)); project.pending_notification.as_mut().set(Some(notified));
project.pending_notification.as_pin_mut().unwrap().enable();
// Let know server that we have read data
project.io.has_read_data.notify_one(); project.io.has_read_data.notify_one();
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
} }
@ -194,9 +215,7 @@ pub async fn run_server(
// New returned peer hasn't read its data yet, await for it. // New returned peer hasn't read its data yet, await for it.
if let Some(await_peer) = peer_with_data { if let Some(await_peer) = peer_with_data {
if let Some(peer) = server.peers.get(&await_peer) { if let Some(peer) = server.peers.get(&await_peer) {
info!("waiting for peer {} to read its first data", await_peer.port());
peer.has_read_data.notified().await; peer.has_read_data.notified().await;
info!("peer {} to read its first data", await_peer.port());
} }
}; };
@ -212,10 +231,8 @@ pub async fn run_server(
match server.peers.get(&peer_addr) { match server.peers.get(&peer_addr) {
Some(io) => { Some(io) => {
info!("waiting for peer {} to read its data", peer_addr.port());
io.has_data_to_read.notify_one(); io.has_data_to_read.notify_one();
io.has_read_data.notified().await; io.has_read_data.notified().await;
info!("peer {} to read its data", peer_addr.port());
} }
None => { None => {
info!("New UDP connection from {}", peer_addr); info!("New UDP connection from {}", peer_addr);