diff --git a/Cargo.lock b/Cargo.lock index d2b8d91..8b983fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,6 +148,18 @@ dependencies = [ "syn", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.81" @@ -340,9 +352,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cc" @@ -401,9 +413,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" +checksum = "0fbb260a053428790f3de475e304ff84cdbc4face759ea7a3e64c1edd938a7fc" dependencies = [ "clap_builder", "clap_derive", @@ -411,9 +423,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" +checksum = "64b17d7ea74e9f833c7dbf2cbe4fb12ff26783eda4782a8975b72f895c9b4d99" dependencies = [ "anstream", "anstyle", @@ -423,9 +435,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d029b67f89d30bbb547c89fd5161293c0aec155fc691d7924b64550662db93e" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -454,6 +466,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -496,15 +517,15 @@ checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crossterm" -version = "0.27.0" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" +checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" dependencies = [ "bitflags 2.6.0", "crossterm_winapi", - "libc", - "mio 0.8.11", + "mio 1.0.1", "parking_lot", + "rustix", "signal-hook", "signal-hook-mio", "winapi", @@ -687,6 +708,27 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fast-socks5" version = "0.9.6" @@ -1507,6 +1549,7 @@ checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" dependencies = [ "hermit-abi", "libc", + "log", "wasi", "windows-sys 0.52.0", ] @@ -1654,6 +1697,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.3" @@ -2285,12 +2334,12 @@ dependencies = [ [[package]] name = "signal-hook-mio" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" dependencies = [ "libc", - "mio 0.8.11", + "mio 1.0.1", "signal-hook", ] @@ -3075,6 +3124,7 @@ version = "9.8.0-rc1" dependencies = [ "ahash", "anyhow", + "async-channel", "async-trait", "base64 0.22.1", "bb8", diff --git a/Cargo.toml b/Cargo.toml index 44abf12..5d2af79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,13 +13,14 @@ base64 = "0.22.1" scopeguard = "1.2.0" bb8 = { version = "0.8", features = [] } -bytes = { version = "1.6.1", features = [] } -clap = { version = "4.5.11", features = ["derive", "env"] } +bytes = { version = "1.7.1", features = [] } +clap = { version = "4.5.13", features = ["derive", "env"] } fast-socks5 = { version = "0.9.6", features = [] } fastwebsockets = { version = "0.8.0", features = ["upgrade", "simd", "unstable-split"] } futures-util = { version = "0.3.30" } hickory-resolver = { version = "0.24.1", features = ["tokio", "dns-over-https-rustls", "dns-over-rustls", "native-certs"] } ppp = { version = "2.2.0", features = [] } +async-channel = { version = "2.3.1", features = [] } # For config file parsing regex = { version = "1.10.5", default-features = false, features = ["std", "perf"] } @@ -58,7 +59,7 @@ urlencoding = "2.1.3" uuid = { version = "1.10.0", features = ["v7", "serde"] } [target.'cfg(not(target_family = "unix"))'.dependencies] -crossterm = { version = "0.27.0" } +crossterm = { version = "0.28.1" } tokio-util = { version = "0.7.11", features = ["io"] } [target.'cfg(target_family = "unix")'.dependencies] diff --git a/src/main.rs b/src/main.rs index d1f194d..5d3a86a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use crate::restrictions::types::RestrictionsRules; use crate::tunnel::client::{TlsClientConfig, WsClient, WsClientConfig}; use crate::tunnel::connectors::{Socks5TunnelConnector, TcpTunnelConnector, UdpTunnelConnector}; use crate::tunnel::listeners::{ - new_stdio_listener, new_udp_listener, HttpProxyTunnelListener, Socks5TunnelListener, TcpTunnelListener, + new_stdio_listener, HttpProxyTunnelListener, Socks5TunnelListener, TcpTunnelListener, UdpTunnelListener, }; use crate::tunnel::server::{TlsServerConfig, WsServer, WsServerConfig}; use crate::tunnel::{to_host_port, LocalProtocol, RemoteAddr, TransportAddr, TransportScheme}; @@ -1004,7 +1004,7 @@ async fn main() -> anyhow::Result<()> { panic!("Transparent proxy is not available for non Linux platform") } LocalProtocol::Udp { timeout } => { - let server = new_udp_listener(tunnel.local, tunnel.remote.clone(), *timeout).await?; + let server = UdpTunnelListener::new(tunnel.local, tunnel.remote.clone(), *timeout).await?; tokio::spawn(async move { if let Err(err) = client.run_tunnel(server).await { diff --git a/src/tunnel/listeners/mod.rs b/src/tunnel/listeners/mod.rs index d42d03c..5c0b33c 100644 --- a/src/tunnel/listeners/mod.rs +++ b/src/tunnel/listeners/mod.rs @@ -18,7 +18,7 @@ pub use http_proxy::HttpProxyTunnelListener; pub use socks5::Socks5TunnelListener; pub use stdio::new_stdio_listener; pub use tcp::TcpTunnelListener; -pub use udp::new_udp_listener; +pub use udp::UdpTunnelListener; #[cfg(unix)] pub use unix_sock::UnixTunnelListener; @@ -30,7 +30,6 @@ use tokio_stream::Stream; pub trait TunnelListener: Stream> { type Reader: AsyncRead + Send + 'static; type Writer: AsyncWrite + Send + 'static; - type OkReturn; // = ((Self::Reader, Self::Writer), RemoteAddr); } impl TunnelListener for T @@ -41,5 +40,4 @@ where { type Reader = R; type Writer = W; - type OkReturn = ((R, W), RemoteAddr); } diff --git a/src/tunnel/listeners/udp.rs b/src/tunnel/listeners/udp.rs index 2df81b8..94068b9 100644 --- a/src/tunnel/listeners/udp.rs +++ b/src/tunnel/listeners/udp.rs @@ -10,35 +10,31 @@ use std::time::Duration; use tokio_stream::Stream; use url::Host; -pub struct UdpTunnelListener -where - S: Stream>, -{ - listener: S, +pub struct UdpTunnelListener { + listener: Pin> + Send>>, dest: (Host, u16), timeout: Option, } -pub async fn new_udp_listener( - bind_addr: SocketAddr, - dest: (Host, u16), - timeout: Option, -) -> anyhow::Result>>> { - let listener = udp::run_server(bind_addr, timeout, |_| Ok(()), |s| Ok(s.clone())) - .await - .with_context(|| anyhow!("Cannot start UDP server on {}", bind_addr))?; +impl UdpTunnelListener { + pub async fn new( + bind_addr: SocketAddr, + dest: (Host, u16), + timeout: Option, + ) -> anyhow::Result { + let listener = udp::run_server(bind_addr, timeout, |_| Ok(()), |s| Ok(s.clone())) + .await + .with_context(|| anyhow!("Cannot start UDP server on {}", bind_addr))?; - Ok(UdpTunnelListener { - listener, - dest, - timeout, - }) + Ok(UdpTunnelListener { + listener: Box::pin(listener), + dest, + timeout, + }) + } } -impl Stream for UdpTunnelListener -where - S: Stream>, -{ +impl Stream for UdpTunnelListener { type Item = anyhow::Result<((UdpStream, UdpStreamWriter), RemoteAddr)>; fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { diff --git a/src/tunnel/server/mod.rs b/src/tunnel/server/mod.rs index d318810..0b7ef09 100644 --- a/src/tunnel/server/mod.rs +++ b/src/tunnel/server/mod.rs @@ -1,6 +1,7 @@ #![allow(clippy::module_inception)] mod handler_http2; mod handler_websocket; +mod reverse_tunnel; mod server; mod utils; diff --git a/src/tunnel/server/reverse_tunnel.rs b/src/tunnel/server/reverse_tunnel.rs new file mode 100644 index 0000000..0f60ec5 --- /dev/null +++ b/src/tunnel/server/reverse_tunnel.rs @@ -0,0 +1,118 @@ +use crate::tunnel::listeners::TunnelListener; +use crate::tunnel::RemoteAddr; +use ahash::{HashMap, HashMapExt}; +use anyhow::anyhow; +use futures_util::{pin_mut, StreamExt}; +use log::warn; +use parking_lot::Mutex; +use std::future::Future; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::{select, time}; +use tracing::{info, Instrument, Span}; + +struct ReverseTunnelItem { + #[allow(clippy::type_complexity)] + receiver: async_channel::Receiver<((::Reader, ::Writer), RemoteAddr)>, + nb_seen_clients: Arc, +} + +impl Clone for ReverseTunnelItem { + fn clone(&self) -> Self { + Self { + receiver: self.receiver.clone(), + nb_seen_clients: self.nb_seen_clients.clone(), + } + } +} + +pub struct ReverseTunnelServer { + servers: Arc>>>, +} + +impl ReverseTunnelServer { + pub fn new() -> Self { + Self { + servers: Arc::new(Mutex::new(HashMap::with_capacity(1))), + } + } + + pub async fn run_listening_server( + &self, + bind_addr: SocketAddr, + gen_listening_server: impl Future>, + ) -> anyhow::Result<((::Reader, ::Writer), RemoteAddr)> + where + T: TunnelListener + Send + 'static, + { + let listening_server = self.servers.lock().get(&bind_addr).cloned(); + let item = if let Some(listening_server) = listening_server { + listening_server + } else { + let listening_server = gen_listening_server.await?; + let send_timeout = Duration::from_secs(60 * 3); + let (tx, rx) = async_channel::bounded(10); + let nb_seen_clients = Arc::new(AtomicUsize::new(0)); + let seen_clients = nb_seen_clients.clone(); + let server = self.servers.clone(); + let local_srv2 = bind_addr; + + let fut = async move { + scopeguard::defer!({ + server.lock().remove(&local_srv2); + }); + + let mut timer = time::interval(send_timeout); + pin_mut!(listening_server); + loop { + select! { + biased; + cnx = listening_server.next() => { + match cnx { + None => break, + Some(Err(err)) => { + warn!("Error while listening for incoming connections {err:?}"); + continue; + } + Some(Ok(cnx)) => { + if time::timeout(send_timeout, tx.send(cnx)).await.is_err() { + info!("New reverse connection failed to be picked by client after {}s. Closing reverse tunnel server", send_timeout.as_secs()); + break; + } + } + } + }, + _ = timer.tick() => { + + // if no client connected to the reverse tunnel server, close it + // <= 1 because the server itself has a receiver + if seen_clients.swap(0, Ordering::Relaxed) == 0 && tx.receiver_count() <= 1 { + info!("No client connected to reverse tunnel server for {}s. Closing reverse tunnel server", send_timeout.as_secs()); + break; + } + }, + } + } + info!("Stopping listening reverse server"); + }; + + tokio::spawn(fut.instrument(Span::current())); + let item = ReverseTunnelItem { + receiver: rx, + nb_seen_clients, + }; + self.servers.lock().insert(bind_addr, item.clone()); + item + }; + + item.nb_seen_clients.fetch_add(1, Ordering::Relaxed); + let cnx = item + .receiver + .recv() + .await + .map_err(|_| anyhow!("listening reverse server stopped"))?; + Ok(cnx) + } +} diff --git a/src/tunnel/server/server.rs b/src/tunnel/server/server.rs index 5914e62..ae4102f 100644 --- a/src/tunnel/server/server.rs +++ b/src/tunnel/server/server.rs @@ -1,15 +1,12 @@ -use ahash::{HashMap, HashMapExt}; use anyhow::anyhow; -use futures_util::{pin_mut, FutureExt, StreamExt}; +use futures_util::FutureExt; use http_body_util::Either; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::future::Future; use bytes::Bytes; use http_body_util::combinators::BoxBody; use std::net::SocketAddr; -use std::ops::Deref; use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; @@ -28,27 +25,25 @@ use socket2::SockRef; use crate::protocols::dns::DnsResolver; use crate::protocols::tls; -use crate::protocols::udp::{UdpStream, UdpStreamWriter}; use crate::restrictions::config_reloader::RestrictionsRulesReloader; use crate::restrictions::types::{RestrictionConfig, RestrictionsRules}; use crate::tunnel::connectors::{TcpTunnelConnector, TunnelConnector, UdpTunnelConnector}; -use crate::tunnel::listeners::{ - new_udp_listener, HttpProxyTunnelListener, Socks5TunnelListener, TcpTunnelListener, TunnelListener, -}; +use crate::tunnel::listeners::{HttpProxyTunnelListener, Socks5TunnelListener, TcpTunnelListener, UdpTunnelListener}; use crate::tunnel::server::handler_http2::http_server_upgrade; use crate::tunnel::server::handler_websocket::ws_server_upgrade; +use crate::tunnel::server::reverse_tunnel::ReverseTunnelServer; use crate::tunnel::server::utils::{ - bad_request, extract_path_prefix, extract_tunnel_info, extract_x_forwarded_for, find_mapped_port, validate_tunnel, + bad_request, extract_path_prefix, extract_tunnel_info, extract_x_forwarded_for, find_mapped_port, try_to_sock_aadr, + validate_tunnel, }; use crate::tunnel::tls_reloader::TlsReloader; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpListener; use tokio::select; -use tokio::sync::mpsc; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer}; use tokio_rustls::TlsAcceptor; use tracing::{error, info, span, warn, Instrument, Level, Span}; -use url::{Host, Url}; +use url::Url; #[derive(Debug)] pub struct TlsServerConfig { @@ -214,85 +209,59 @@ impl WsServer { Ok((remote, Box::pin(rx), Box::pin(tx))) } LocalProtocol::ReverseTcp => { - type Item = ::OkReturn; - #[allow(clippy::type_complexity)] - static SERVERS: Lazy, u16), mpsc::Receiver>>> = - Lazy::new(|| Mutex::new(HashMap::with_capacity(0))); + static SERVERS: Lazy> = Lazy::new(ReverseTunnelServer::new); let remote_port = find_mapped_port(remote.port, restriction); let local_srv = (remote.host, remote_port); - let listening_server = async { - let bind = format!("{}:{}", local_srv.0, local_srv.1); - TcpTunnelListener::new(bind.parse()?, local_srv.clone(), false).await - }; - let ((local_rx, local_tx), remote) = - run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?; + let bind = try_to_sock_aadr(local_srv.clone())?; + let listening_server = async { TcpTunnelListener::new(bind, local_srv.clone(), false).await }; + let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?; Ok((remote, Box::pin(local_rx), Box::pin(local_tx))) } LocalProtocol::ReverseUdp { timeout } => { - type Item = ((UdpStream, UdpStreamWriter), RemoteAddr); - #[allow(clippy::type_complexity)] - static SERVERS: Lazy, u16), mpsc::Receiver>>> = - Lazy::new(|| Mutex::new(HashMap::with_capacity(0))); + static SERVERS: Lazy> = Lazy::new(ReverseTunnelServer::new); let remote_port = find_mapped_port(remote.port, restriction); let local_srv = (remote.host, remote_port); - let listening_server = async { - let bind = format!("{}:{}", local_srv.0, local_srv.1); - new_udp_listener(bind.parse()?, local_srv.clone(), timeout).await - }; - let ((local_rx, local_tx), remote) = - run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?; + let bind = try_to_sock_aadr(local_srv.clone())?; + let listening_server = async { UdpTunnelListener::new(bind, local_srv.clone(), timeout).await }; + let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?; Ok((remote, Box::pin(local_rx), Box::pin(local_tx))) } LocalProtocol::ReverseSocks5 { timeout, credentials } => { - type Item = ::OkReturn; - #[allow(clippy::type_complexity)] - static SERVERS: Lazy, u16), mpsc::Receiver>>> = - Lazy::new(|| Mutex::new(HashMap::with_capacity(0))); + static SERVERS: Lazy> = Lazy::new(ReverseTunnelServer::new); let remote_port = find_mapped_port(remote.port, restriction); let local_srv = (remote.host, remote_port); - let listening_server = async { - let bind = format!("{}:{}", local_srv.0, local_srv.1); - Socks5TunnelListener::new(bind.parse()?, timeout, credentials).await - }; - let ((local_rx, local_tx), remote) = - run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?; + let bind = try_to_sock_aadr(local_srv.clone())?; + let listening_server = async { Socks5TunnelListener::new(bind, timeout, credentials).await }; + let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?; Ok((remote, Box::pin(local_rx), Box::pin(local_tx))) } LocalProtocol::ReverseHttpProxy { timeout, credentials } => { - type Item = ::OkReturn; - #[allow(clippy::type_complexity)] - static SERVERS: Lazy, u16), mpsc::Receiver>>> = - Lazy::new(|| Mutex::new(HashMap::with_capacity(0))); + static SERVERS: Lazy> = + Lazy::new(ReverseTunnelServer::new); let remote_port = find_mapped_port(remote.port, restriction); let local_srv = (remote.host, remote_port); - let listening_server = async { - let bind = format!("{}:{}", local_srv.0, local_srv.1); - HttpProxyTunnelListener::new(bind.parse()?, timeout, credentials, false).await - }; - let ((local_rx, local_tx), remote) = - run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?; + let bind = try_to_sock_aadr(local_srv.clone())?; + let listening_server = async { HttpProxyTunnelListener::new(bind, timeout, credentials, false).await }; + let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?; Ok((remote, Box::pin(local_rx), Box::pin(local_tx))) } #[cfg(unix)] LocalProtocol::ReverseUnix { ref path } => { use crate::tunnel::listeners::UnixTunnelListener; - type Item = ::OkReturn; - #[allow(clippy::type_complexity)] - static SERVERS: Lazy, u16), mpsc::Receiver>>> = - Lazy::new(|| Mutex::new(HashMap::with_capacity(0))); + static SERVERS: Lazy> = Lazy::new(ReverseTunnelServer::new); let remote_port = find_mapped_port(remote.port, restriction); let local_srv = (remote.host, remote_port); + let bind = try_to_sock_aadr(local_srv.clone())?; let listening_server = async { UnixTunnelListener::new(path, local_srv.clone(), false).await }; - let ((local_rx, local_tx), remote) = - run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?; + let ((local_rx, local_tx), remote) = SERVERS.run_listening_server(bind, listening_server).await?; Ok((remote, Box::pin(local_rx), Box::pin(local_tx))) } @@ -551,65 +520,3 @@ impl TlsContext<'_> { &self.tls_acceptor } } - -#[allow(clippy::type_complexity)] -async fn run_listening_server( - local_srv: &(Host, u16), - servers: &Mutex< - HashMap< - (Host, u16), - mpsc::Receiver<((::Reader, ::Writer), RemoteAddr)>, - >, - >, - gen_listening_server: impl Future>, -) -> anyhow::Result<((::Reader, ::Writer), RemoteAddr)> -where - T: TunnelListener + Send + 'static, -{ - let listening_server = servers.lock().remove(local_srv); - let mut listening_server = if let Some(listening_server) = listening_server { - listening_server - } else { - let listening_server = gen_listening_server.await?; - let send_timeout = Duration::from_secs(60 * 3); - let (tx, rx) = mpsc::channel(1); - let fut = async move { - pin_mut!(listening_server); - loop { - select! { - biased; - cnx = listening_server.next() => { - match cnx { - None => break, - Some(Err(err)) => { - warn!("Error while listening for incoming connections {err:?}"); - continue; - } - Some(Ok(cnx)) => { - if tx.send_timeout(cnx, send_timeout).await.is_err() { - info!("New reverse connection failed to be picked by client after {}s. Closing reverse tunnel server", send_timeout.as_secs()); - break; - } - } - } - }, - - _ = tx.closed() => { - break; - } - } - } - info!("Stopping listening reverse server"); - }; - - tokio::spawn(fut.instrument(Span::current())); - rx - }; - - let cnx = listening_server - .recv() - .await - .ok_or_else(|| anyhow!("listening reverse server stopped"))?; - servers.lock().insert(local_srv.clone(), listening_server); - Ok(cnx) -} diff --git a/src/tunnel/server/utils.rs b/src/tunnel/server/utils.rs index e0a2153..1cff621 100644 --- a/src/tunnel/server/utils.rs +++ b/src/tunnel/server/utils.rs @@ -10,7 +10,7 @@ use hyper::header::{HeaderValue, COOKIE, SEC_WEBSOCKET_PROTOCOL}; use hyper::{http, Request, Response, StatusCode}; use jsonwebtoken::TokenData; use std::cmp::min; -use std::net::IpAddr; +use std::net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::ops::Deref; use tracing::{error, info, warn}; use url::Host; @@ -218,3 +218,11 @@ pub(super) fn inject_cookie(response: &mut http::Response, remote_add Ok(()) } + +pub fn try_to_sock_aadr((host, port): (Host, u16)) -> anyhow::Result { + match host { + Host::Domain(_) => Err(anyhow::anyhow!("Cannot convert domain to socket address")), + Host::Ipv4(ip) => Ok(SocketAddr::V4(SocketAddrV4::new(ip, port))), + Host::Ipv6(ip) => Ok(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))), + } +}