From 91f9a84e89ff627957a068b15d295f9aeb5ea89c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=A3rebe=20-=20Romain=20GERARD?= Date: Fri, 1 Dec 2023 22:25:01 +0100 Subject: [PATCH] Add reverse socks5 tunnel --- src/main.rs | 24 ++++++++++++++++++++++-- src/tunnel/client.rs | 43 ++++++++++++++++++++++++++----------------- src/tunnel/mod.rs | 1 + src/tunnel/server.rs | 29 ++++++++++++++++++++++++----- 4 files changed, 73 insertions(+), 24 deletions(-) diff --git a/src/main.rs b/src/main.rs index ced7502..a55d47d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,6 +69,7 @@ struct Client { /// examples: /// 'tcp://1212:google.com:443' => listen on server for incoming tcp cnx on port 1212 and forward to google.com on port 443 from local machine /// 'udp://1212:1.1.1.1:53' => listen on server for incoming udp on port 1212 and forward to cloudflare dns 1.1.1.1 on port 53 from local machine + /// 'socks://[::1]:1212' => listen on server for incoming socks5 request on port 1212 and forward dynamically request from local machine #[arg(short='R', long, value_name = "{tcp,udp}://[BIND:]PORT:HOST:PORT", value_parser = parse_tunnel_arg, verbatim_doc_comment)] remote_to_local: Vec, @@ -181,6 +182,7 @@ enum LocalProtocol { TProxyTcp, ReverseTcp, ReverseUdp { timeout: Option }, + ReverseSocks5, } #[derive(Clone, Debug)] @@ -572,7 +574,7 @@ async fn main() { tokio::spawn(async move { let remote = tunnel.remote.clone(); let cfg = client_config.clone(); - let connect_to_dest = || async { + let connect_to_dest = |_| async { tcp::connect(&remote.0, remote.1, cfg.socket_so_mark, cfg.timeout_connect).await }; @@ -590,7 +592,24 @@ async fn main() { let cfg = client_config.clone(); let remote = tunnel.remote.clone(); let connect_to_dest = - || async { udp::connect(&remote.0, remote.1, cfg.timeout_connect).await }; + |_| async { udp::connect(&remote.0, remote.1, cfg.timeout_connect).await }; + + if let Err(err) = + tunnel::client::run_reverse_tunnel(client_config, tunnel, connect_to_dest).await + { + error!("{:?}", err); + } + }); + } + LocalProtocol::Socks5 => { + tunnel.local_protocol = LocalProtocol::ReverseSocks5; + tokio::spawn(async move { + let cfg = client_config.clone(); + let connect_to_dest = |remote: (Host, u16)| { + let so_mark = cfg.socket_so_mark; + let timeout = cfg.timeout_connect; + async move { tcp::connect(&remote.0, remote.1, so_mark, timeout).await } + }; if let Err(err) = tunnel::client::run_reverse_tunnel(client_config, tunnel, connect_to_dest).await @@ -693,6 +712,7 @@ async fn main() { } LocalProtocol::ReverseTcp => {} LocalProtocol::ReverseUdp { .. } => {} + LocalProtocol::ReverseSocks5 => {} } } } diff --git a/src/tunnel/client.rs b/src/tunnel/client.rs index fab5cea..dc4ba7b 100644 --- a/src/tunnel/client.rs +++ b/src/tunnel/client.rs @@ -1,15 +1,14 @@ -use super::{JwtTunnelConfig, JWT_KEY}; +use super::{to_host_port, JwtTunnelConfig, JWT_KEY}; use crate::{LocalToRemote, WsClientConfig}; use anyhow::{anyhow, Context}; use fastwebsockets::WebSocket; use futures_util::pin_mut; -use hyper::header::{AUTHORIZATION, SEC_WEBSOCKET_VERSION, UPGRADE}; +use hyper::header::{AUTHORIZATION, COOKIE, SEC_WEBSOCKET_VERSION, UPGRADE}; use hyper::header::{CONNECTION, HOST, SEC_WEBSOCKET_KEY}; use hyper::upgrade::Upgraded; -use hyper::{Body, Request}; +use hyper::{Body, Request, Response}; use std::future::Future; -use std::net::IpAddr; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; @@ -17,7 +16,7 @@ use tokio::sync::oneshot; use tokio_stream::{Stream, StreamExt}; use tracing::log::debug; use tracing::{error, span, Instrument, Level, Span}; -use url::Host; +use url::{Host, Url}; use uuid::Uuid; struct SpawnExecutor; @@ -42,7 +41,7 @@ pub async fn connect( request_id: Uuid, client_cfg: &WsClientConfig, tunnel_cfg: &LocalToRemote, -) -> anyhow::Result> { +) -> anyhow::Result<(WebSocket, Response)> { let mut pooled_cnx = match client_cfg.cnx_pool().get().await { Ok(tcp_stream) => tcp_stream, Err(err) => Err(anyhow!("failed to get a connection to the server from the pool: {err:?}"))?, @@ -77,11 +76,11 @@ pub async fn connect( })?; debug!("with HTTP upgrade request {:?}", req); let transport = pooled_cnx.deref_mut().take().unwrap(); - let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, transport) + let (ws, response) = fastwebsockets::handshake::client(&SpawnExecutor, req, transport) .await .with_context(|| format!("failed to do websocket handshake with the server {:?}", client_cfg.remote_addr))?; - Ok(ws) + Ok((ws, response)) } async fn connect_to_server( @@ -94,7 +93,7 @@ where R: AsyncRead + Send + 'static, W: AsyncWrite + Send + 'static, { - let mut ws = connect(request_id, client_cfg, remote_cfg).await?; + let (mut ws, _) = connect(request_id, client_cfg, remote_cfg).await?; ws.set_auto_apply_mask(client_cfg.websocket_mask_frame); let (ws_rx, ws_tx) = ws.split(tokio::io::split); @@ -155,16 +154,13 @@ pub async fn run_reverse_tunnel( connect_to_dest: F, ) -> anyhow::Result<()> where - F: Fn() -> Fut, + F: Fn((Host, u16)) -> Fut, Fut: Future>, T: AsyncRead + AsyncWrite + Send + 'static, { // Invert local with remote - let remote = tunnel_cfg.remote; - tunnel_cfg.remote = match tunnel_cfg.local.ip() { - IpAddr::V4(ip) => (Host::Ipv4(ip), tunnel_cfg.local.port()), - IpAddr::V6(ip) => (Host::Ipv6(ip), tunnel_cfg.local.port()), - }; + let remote_ori = tunnel_cfg.remote; + tunnel_cfg.remote = to_host_port(tunnel_cfg.local); loop { let client_config = client_config.clone(); @@ -178,13 +174,26 @@ where let _span = span.enter(); // Correctly configure tunnel cfg - let mut ws = connect(request_id, &client_config, &tunnel_cfg) + let (mut ws, response) = connect(request_id, &client_config, &tunnel_cfg) .instrument(span.clone()) .await?; ws.set_auto_apply_mask(client_config.websocket_mask_frame); // Connect to endpoint - let stream = connect_to_dest().instrument(span.clone()).await; + let remote: (Host, u16) = response + .headers() + .get(COOKIE) + .and_then(|h| { + h.to_str() + .ok() + .and_then(|s| Url::parse(s).ok()) + .and_then(|url| match (url.host(), url.port()) { + (Some(h), Some(p)) => Some((h.to_owned(), p)), + _ => None, + }) + }) + .unwrap_or(remote_ori.clone()); + let stream = connect_to_dest(remote.clone()).instrument(span.clone()).await; let stream = match stream { Ok(s) => s, diff --git a/src/tunnel/mod.rs b/src/tunnel/mod.rs index 682e0f7..c83ec00 100644 --- a/src/tunnel/mod.rs +++ b/src/tunnel/mod.rs @@ -38,6 +38,7 @@ impl JwtTunnelConfig { LocalProtocol::Socks5 => LocalProtocol::Tcp, LocalProtocol::ReverseTcp => LocalProtocol::ReverseTcp, LocalProtocol::ReverseUdp { .. } => tunnel.local_protocol, + LocalProtocol::ReverseSocks5 => LocalProtocol::ReverseSocks5, LocalProtocol::TProxyTcp => LocalProtocol::Tcp, }, r: tunnel.remote.0.to_string(), diff --git a/src/tunnel/server.rs b/src/tunnel/server.rs index 41838b1..faba112 100644 --- a/src/tunnel/server.rs +++ b/src/tunnel/server.rs @@ -2,15 +2,17 @@ use ahash::{HashMap, HashMapExt}; use anyhow::anyhow; use futures_util::{pin_mut, Stream, StreamExt}; use std::cmp::min; +use std::fmt::Debug; use std::future::Future; -use std::io; use std::ops::{Deref, Not}; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use super::{JwtTunnelConfig, JWT_DECODE}; -use crate::{tcp, tls, udp, LocalProtocol, WsServerConfig}; +use crate::{socks5, tcp, tls, udp, LocalProtocol, WsServerConfig}; +use hyper::header::COOKIE; +use hyper::http::HeaderValue; use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::{http, Body, Request, Response, StatusCode}; @@ -107,19 +109,33 @@ async fn from_query( Ok((jwt.claims.p, local_srv.0, local_srv.1, Box::pin(local_rx), Box::pin(local_tx))) } + LocalProtocol::ReverseSocks5 => { + #[allow(clippy::type_complexity)] + static SERVERS: Lazy, u16), mpsc::Receiver<(TcpStream, (Host, u16))>>>> = + Lazy::new(|| Mutex::new(HashMap::with_capacity(0))); + + let local_srv = (Host::parse(&jwt.claims.r)?, jwt.claims.rp); + let bind = format!("{}:{}", local_srv.0, local_srv.1); + let listening_server = socks5::run_server(bind.parse()?); + let (tcp, local_srv) = run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?; + let (local_rx, local_tx) = tokio::io::split(tcp); + + Ok((jwt.claims.p, local_srv.0, local_srv.1, Box::pin(local_rx), Box::pin(local_tx))) + } _ => Err(anyhow::anyhow!("Invalid upgrade request")), } } #[allow(clippy::type_complexity)] -async fn run_listening_server( +async fn run_listening_server( local_srv: &(Host, u16), servers: &Mutex, u16), mpsc::Receiver>>, gen_listening_server: Fut, ) -> anyhow::Result where Fut: Future>, - FutOut: Stream> + Send + 'static, + FutOut: Stream> + Send + 'static, + E: Debug + Send, T: Send + 'static, { let listening_server = servers.lock().remove(local_srv); @@ -214,7 +230,7 @@ async fn server_upgrade( }; info!("connected to {:?} {:?} {:?}", protocol, dest, port); - let (response, fut) = match fastwebsockets::upgrade::upgrade(&mut req) { + let (mut response, fut) = match fastwebsockets::upgrade::upgrade(&mut req) { Ok(ret) => ret, Err(err) => { warn!("Rejecting connection with bad upgrade request: {} {}", err, req.uri()); @@ -244,6 +260,9 @@ async fn server_upgrade( .instrument(Span::current()), ); + response + .headers_mut() + .insert(COOKIE, HeaderValue::from_str(&format!("fake://{}:{}", dest, port)).unwrap()); Ok(response) }