fix udp association. Use peer & destination for stream map

This commit is contained in:
Σrebe - Romain GERARD 2024-01-08 20:52:34 +01:00
parent 1a88c1735d
commit 7d88446453
No known key found for this signature in database
GPG key ID: 7A42B4B97E0332F4

View file

@ -29,8 +29,8 @@ struct IoInner {
} }
struct Socks5UdpServer { struct Socks5UdpServer {
listener: Arc<UdpSocket>, listener: Arc<UdpSocket>,
peers: HashMap<TargetAddr, Pin<Arc<IoInner>>, ahash::RandomState>, peers: HashMap<(SocketAddr, TargetAddr), Pin<Arc<IoInner>>, ahash::RandomState>,
keys_to_delete: Arc<RwLock<Vec<TargetAddr>>>, keys_to_delete: Arc<RwLock<Vec<(SocketAddr, TargetAddr)>>>,
cnx_timeout: Option<Duration>, cnx_timeout: Option<Duration>,
} }
@ -82,14 +82,14 @@ pub struct Socks5UdpStream {
pub watchdog_deadline: Option<Interval>, pub watchdog_deadline: Option<Interval>,
data_read_before_deadline: bool, data_read_before_deadline: bool,
io: Pin<Arc<IoInner>>, io: Pin<Arc<IoInner>>,
keys_to_delete: Weak<RwLock<Vec<TargetAddr>>>, keys_to_delete: Weak<RwLock<Vec<(SocketAddr, TargetAddr)>>>,
} }
#[pinned_drop] #[pinned_drop]
impl PinnedDrop for Socks5UdpStream { impl PinnedDrop for Socks5UdpStream {
fn drop(self: Pin<&mut Self>) { fn drop(self: Pin<&mut Self>) {
if let Some(keys_to_delete) = self.keys_to_delete.upgrade() { if let Some(keys_to_delete) = self.keys_to_delete.upgrade() {
keys_to_delete.write().push(self.destination.clone()); keys_to_delete.write().push((self.peer, self.destination.clone()));
} }
} }
} }
@ -100,7 +100,7 @@ impl Socks5UdpStream {
peer: SocketAddr, peer: SocketAddr,
destination: TargetAddr, destination: TargetAddr,
watchdog_deadline: Option<Duration>, watchdog_deadline: Option<Duration>,
keys_to_delete: Weak<RwLock<Vec<TargetAddr>>>, keys_to_delete: Weak<RwLock<Vec<(SocketAddr, TargetAddr)>>>,
) -> (Self, Pin<Arc<IoInner>>) { ) -> (Self, Pin<Arc<IoInner>>) {
let (tx, rx) = mpsc::channel(1024); let (tx, rx) = mpsc::channel(1024);
let io = Arc::pin(IoInner { sender: tx }); let io = Arc::pin(IoInner { sender: tx });
@ -223,28 +223,30 @@ pub async fn run_server(
let (frag, destination_addr, data) = fast_socks5::parse_udp_request(payload.chunk()).await.unwrap(); let (frag, destination_addr, data) = fast_socks5::parse_udp_request(payload.chunk()).await.unwrap();
// We don't support udp fragmentation // We don't support udp fragmentation
if frag != 0 { if frag != 0 {
warn!("dropping UDP socks5 fragmented");
continue; continue;
} }
(destination_addr, payload.slice_ref(data)) (destination_addr, payload.slice_ref(data))
}; };
match server.peers.get(&destination_addr) { let addr = (peer_addr, destination_addr);
match server.peers.get(&addr) {
Some(io) => { Some(io) => {
if io.sender.send(data).await.is_err() { if io.sender.send(data).await.is_err() {
server.peers.remove(&destination_addr); server.peers.remove(&addr);
} }
} }
None => { None => {
info!("New UDP connection for {}", destination_addr); info!("New UDP connection for {}", addr.1);
let (udp_client, io) = Socks5UdpStream::new( let (udp_client, io) = Socks5UdpStream::new(
server.listener.clone(), server.listener.clone(),
peer_addr, addr.0,
destination_addr.clone(), addr.1.clone(),
server.cnx_timeout, server.cnx_timeout,
Arc::downgrade(&server.keys_to_delete), Arc::downgrade(&server.keys_to_delete),
); );
let _ = io.sender.send(data).await; let _ = io.sender.send(data).await;
server.peers.insert(destination_addr, io); server.peers.insert(addr, io);
return Some((Ok(udp_client), (server, buf))); return Some((Ok(udp_client), (server, buf)));
} }
} }