Add reverse socks5 tunnel
This commit is contained in:
parent
4c736ccf57
commit
91f9a84e89
4 changed files with 73 additions and 24 deletions
24
src/main.rs
24
src/main.rs
|
@ -69,6 +69,7 @@ struct Client {
|
||||||
/// examples:
|
/// examples:
|
||||||
/// 'tcp://1212:google.com:443' => listen on server for incoming tcp cnx on port 1212 and forward to google.com on port 443 from local machine
|
/// 'tcp://1212:google.com:443' => listen on server for incoming tcp cnx on port 1212 and forward to google.com on port 443 from local machine
|
||||||
/// 'udp://1212:1.1.1.1:53' => listen on server for incoming udp on port 1212 and forward to cloudflare dns 1.1.1.1 on port 53 from local machine
|
/// 'udp://1212:1.1.1.1:53' => listen on server for incoming udp on port 1212 and forward to cloudflare dns 1.1.1.1 on port 53 from local machine
|
||||||
|
/// 'socks://[::1]:1212' => listen on server for incoming socks5 request on port 1212 and forward dynamically request from local machine
|
||||||
#[arg(short='R', long, value_name = "{tcp,udp}://[BIND:]PORT:HOST:PORT", value_parser = parse_tunnel_arg, verbatim_doc_comment)]
|
#[arg(short='R', long, value_name = "{tcp,udp}://[BIND:]PORT:HOST:PORT", value_parser = parse_tunnel_arg, verbatim_doc_comment)]
|
||||||
remote_to_local: Vec<LocalToRemote>,
|
remote_to_local: Vec<LocalToRemote>,
|
||||||
|
|
||||||
|
@ -181,6 +182,7 @@ enum LocalProtocol {
|
||||||
TProxyTcp,
|
TProxyTcp,
|
||||||
ReverseTcp,
|
ReverseTcp,
|
||||||
ReverseUdp { timeout: Option<Duration> },
|
ReverseUdp { timeout: Option<Duration> },
|
||||||
|
ReverseSocks5,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -572,7 +574,7 @@ async fn main() {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let remote = tunnel.remote.clone();
|
let remote = tunnel.remote.clone();
|
||||||
let cfg = client_config.clone();
|
let cfg = client_config.clone();
|
||||||
let connect_to_dest = || async {
|
let connect_to_dest = |_| async {
|
||||||
tcp::connect(&remote.0, remote.1, cfg.socket_so_mark, cfg.timeout_connect).await
|
tcp::connect(&remote.0, remote.1, cfg.socket_so_mark, cfg.timeout_connect).await
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -590,7 +592,24 @@ async fn main() {
|
||||||
let cfg = client_config.clone();
|
let cfg = client_config.clone();
|
||||||
let remote = tunnel.remote.clone();
|
let remote = tunnel.remote.clone();
|
||||||
let connect_to_dest =
|
let connect_to_dest =
|
||||||
|| async { udp::connect(&remote.0, remote.1, cfg.timeout_connect).await };
|
|_| async { udp::connect(&remote.0, remote.1, cfg.timeout_connect).await };
|
||||||
|
|
||||||
|
if let Err(err) =
|
||||||
|
tunnel::client::run_reverse_tunnel(client_config, tunnel, connect_to_dest).await
|
||||||
|
{
|
||||||
|
error!("{:?}", err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
LocalProtocol::Socks5 => {
|
||||||
|
tunnel.local_protocol = LocalProtocol::ReverseSocks5;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let cfg = client_config.clone();
|
||||||
|
let connect_to_dest = |remote: (Host, u16)| {
|
||||||
|
let so_mark = cfg.socket_so_mark;
|
||||||
|
let timeout = cfg.timeout_connect;
|
||||||
|
async move { tcp::connect(&remote.0, remote.1, so_mark, timeout).await }
|
||||||
|
};
|
||||||
|
|
||||||
if let Err(err) =
|
if let Err(err) =
|
||||||
tunnel::client::run_reverse_tunnel(client_config, tunnel, connect_to_dest).await
|
tunnel::client::run_reverse_tunnel(client_config, tunnel, connect_to_dest).await
|
||||||
|
@ -693,6 +712,7 @@ async fn main() {
|
||||||
}
|
}
|
||||||
LocalProtocol::ReverseTcp => {}
|
LocalProtocol::ReverseTcp => {}
|
||||||
LocalProtocol::ReverseUdp { .. } => {}
|
LocalProtocol::ReverseUdp { .. } => {}
|
||||||
|
LocalProtocol::ReverseSocks5 => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,15 +1,14 @@
|
||||||
use super::{JwtTunnelConfig, JWT_KEY};
|
use super::{to_host_port, JwtTunnelConfig, JWT_KEY};
|
||||||
use crate::{LocalToRemote, WsClientConfig};
|
use crate::{LocalToRemote, WsClientConfig};
|
||||||
use anyhow::{anyhow, Context};
|
use anyhow::{anyhow, Context};
|
||||||
|
|
||||||
use fastwebsockets::WebSocket;
|
use fastwebsockets::WebSocket;
|
||||||
use futures_util::pin_mut;
|
use futures_util::pin_mut;
|
||||||
use hyper::header::{AUTHORIZATION, SEC_WEBSOCKET_VERSION, UPGRADE};
|
use hyper::header::{AUTHORIZATION, COOKIE, SEC_WEBSOCKET_VERSION, UPGRADE};
|
||||||
use hyper::header::{CONNECTION, HOST, SEC_WEBSOCKET_KEY};
|
use hyper::header::{CONNECTION, HOST, SEC_WEBSOCKET_KEY};
|
||||||
use hyper::upgrade::Upgraded;
|
use hyper::upgrade::Upgraded;
|
||||||
use hyper::{Body, Request};
|
use hyper::{Body, Request, Response};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
@ -17,7 +16,7 @@ use tokio::sync::oneshot;
|
||||||
use tokio_stream::{Stream, StreamExt};
|
use tokio_stream::{Stream, StreamExt};
|
||||||
use tracing::log::debug;
|
use tracing::log::debug;
|
||||||
use tracing::{error, span, Instrument, Level, Span};
|
use tracing::{error, span, Instrument, Level, Span};
|
||||||
use url::Host;
|
use url::{Host, Url};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
struct SpawnExecutor;
|
struct SpawnExecutor;
|
||||||
|
@ -42,7 +41,7 @@ pub async fn connect(
|
||||||
request_id: Uuid,
|
request_id: Uuid,
|
||||||
client_cfg: &WsClientConfig,
|
client_cfg: &WsClientConfig,
|
||||||
tunnel_cfg: &LocalToRemote,
|
tunnel_cfg: &LocalToRemote,
|
||||||
) -> anyhow::Result<WebSocket<Upgraded>> {
|
) -> anyhow::Result<(WebSocket<Upgraded>, Response<Body>)> {
|
||||||
let mut pooled_cnx = match client_cfg.cnx_pool().get().await {
|
let mut pooled_cnx = match client_cfg.cnx_pool().get().await {
|
||||||
Ok(tcp_stream) => tcp_stream,
|
Ok(tcp_stream) => tcp_stream,
|
||||||
Err(err) => Err(anyhow!("failed to get a connection to the server from the pool: {err:?}"))?,
|
Err(err) => Err(anyhow!("failed to get a connection to the server from the pool: {err:?}"))?,
|
||||||
|
@ -77,11 +76,11 @@ pub async fn connect(
|
||||||
})?;
|
})?;
|
||||||
debug!("with HTTP upgrade request {:?}", req);
|
debug!("with HTTP upgrade request {:?}", req);
|
||||||
let transport = pooled_cnx.deref_mut().take().unwrap();
|
let transport = pooled_cnx.deref_mut().take().unwrap();
|
||||||
let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, transport)
|
let (ws, response) = fastwebsockets::handshake::client(&SpawnExecutor, req, transport)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("failed to do websocket handshake with the server {:?}", client_cfg.remote_addr))?;
|
.with_context(|| format!("failed to do websocket handshake with the server {:?}", client_cfg.remote_addr))?;
|
||||||
|
|
||||||
Ok(ws)
|
Ok((ws, response))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connect_to_server<R, W>(
|
async fn connect_to_server<R, W>(
|
||||||
|
@ -94,7 +93,7 @@ where
|
||||||
R: AsyncRead + Send + 'static,
|
R: AsyncRead + Send + 'static,
|
||||||
W: AsyncWrite + Send + 'static,
|
W: AsyncWrite + Send + 'static,
|
||||||
{
|
{
|
||||||
let mut ws = connect(request_id, client_cfg, remote_cfg).await?;
|
let (mut ws, _) = connect(request_id, client_cfg, remote_cfg).await?;
|
||||||
ws.set_auto_apply_mask(client_cfg.websocket_mask_frame);
|
ws.set_auto_apply_mask(client_cfg.websocket_mask_frame);
|
||||||
|
|
||||||
let (ws_rx, ws_tx) = ws.split(tokio::io::split);
|
let (ws_rx, ws_tx) = ws.split(tokio::io::split);
|
||||||
|
@ -155,16 +154,13 @@ pub async fn run_reverse_tunnel<F, Fut, T>(
|
||||||
connect_to_dest: F,
|
connect_to_dest: F,
|
||||||
) -> anyhow::Result<()>
|
) -> anyhow::Result<()>
|
||||||
where
|
where
|
||||||
F: Fn() -> Fut,
|
F: Fn((Host, u16)) -> Fut,
|
||||||
Fut: Future<Output = anyhow::Result<T>>,
|
Fut: Future<Output = anyhow::Result<T>>,
|
||||||
T: AsyncRead + AsyncWrite + Send + 'static,
|
T: AsyncRead + AsyncWrite + Send + 'static,
|
||||||
{
|
{
|
||||||
// Invert local with remote
|
// Invert local with remote
|
||||||
let remote = tunnel_cfg.remote;
|
let remote_ori = tunnel_cfg.remote;
|
||||||
tunnel_cfg.remote = match tunnel_cfg.local.ip() {
|
tunnel_cfg.remote = to_host_port(tunnel_cfg.local);
|
||||||
IpAddr::V4(ip) => (Host::Ipv4(ip), tunnel_cfg.local.port()),
|
|
||||||
IpAddr::V6(ip) => (Host::Ipv6(ip), tunnel_cfg.local.port()),
|
|
||||||
};
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let client_config = client_config.clone();
|
let client_config = client_config.clone();
|
||||||
|
@ -178,13 +174,26 @@ where
|
||||||
let _span = span.enter();
|
let _span = span.enter();
|
||||||
|
|
||||||
// Correctly configure tunnel cfg
|
// Correctly configure tunnel cfg
|
||||||
let mut ws = connect(request_id, &client_config, &tunnel_cfg)
|
let (mut ws, response) = connect(request_id, &client_config, &tunnel_cfg)
|
||||||
.instrument(span.clone())
|
.instrument(span.clone())
|
||||||
.await?;
|
.await?;
|
||||||
ws.set_auto_apply_mask(client_config.websocket_mask_frame);
|
ws.set_auto_apply_mask(client_config.websocket_mask_frame);
|
||||||
|
|
||||||
// Connect to endpoint
|
// Connect to endpoint
|
||||||
let stream = connect_to_dest().instrument(span.clone()).await;
|
let remote: (Host, u16) = response
|
||||||
|
.headers()
|
||||||
|
.get(COOKIE)
|
||||||
|
.and_then(|h| {
|
||||||
|
h.to_str()
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| Url::parse(s).ok())
|
||||||
|
.and_then(|url| match (url.host(), url.port()) {
|
||||||
|
(Some(h), Some(p)) => Some((h.to_owned(), p)),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.unwrap_or(remote_ori.clone());
|
||||||
|
let stream = connect_to_dest(remote.clone()).instrument(span.clone()).await;
|
||||||
|
|
||||||
let stream = match stream {
|
let stream = match stream {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
|
|
|
@ -38,6 +38,7 @@ impl JwtTunnelConfig {
|
||||||
LocalProtocol::Socks5 => LocalProtocol::Tcp,
|
LocalProtocol::Socks5 => LocalProtocol::Tcp,
|
||||||
LocalProtocol::ReverseTcp => LocalProtocol::ReverseTcp,
|
LocalProtocol::ReverseTcp => LocalProtocol::ReverseTcp,
|
||||||
LocalProtocol::ReverseUdp { .. } => tunnel.local_protocol,
|
LocalProtocol::ReverseUdp { .. } => tunnel.local_protocol,
|
||||||
|
LocalProtocol::ReverseSocks5 => LocalProtocol::ReverseSocks5,
|
||||||
LocalProtocol::TProxyTcp => LocalProtocol::Tcp,
|
LocalProtocol::TProxyTcp => LocalProtocol::Tcp,
|
||||||
},
|
},
|
||||||
r: tunnel.remote.0.to_string(),
|
r: tunnel.remote.0.to_string(),
|
||||||
|
|
|
@ -2,15 +2,17 @@ use ahash::{HashMap, HashMapExt};
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
use futures_util::{pin_mut, Stream, StreamExt};
|
use futures_util::{pin_mut, Stream, StreamExt};
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
|
use std::fmt::Debug;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::io;
|
|
||||||
use std::ops::{Deref, Not};
|
use std::ops::{Deref, Not};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use super::{JwtTunnelConfig, JWT_DECODE};
|
use super::{JwtTunnelConfig, JWT_DECODE};
|
||||||
use crate::{tcp, tls, udp, LocalProtocol, WsServerConfig};
|
use crate::{socks5, tcp, tls, udp, LocalProtocol, WsServerConfig};
|
||||||
|
use hyper::header::COOKIE;
|
||||||
|
use hyper::http::HeaderValue;
|
||||||
use hyper::server::conn::Http;
|
use hyper::server::conn::Http;
|
||||||
use hyper::service::service_fn;
|
use hyper::service::service_fn;
|
||||||
use hyper::{http, Body, Request, Response, StatusCode};
|
use hyper::{http, Body, Request, Response, StatusCode};
|
||||||
|
@ -107,19 +109,33 @@ async fn from_query(
|
||||||
|
|
||||||
Ok((jwt.claims.p, local_srv.0, local_srv.1, Box::pin(local_rx), Box::pin(local_tx)))
|
Ok((jwt.claims.p, local_srv.0, local_srv.1, Box::pin(local_rx), Box::pin(local_tx)))
|
||||||
}
|
}
|
||||||
|
LocalProtocol::ReverseSocks5 => {
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
static SERVERS: Lazy<Mutex<HashMap<(Host<String>, u16), mpsc::Receiver<(TcpStream, (Host, u16))>>>> =
|
||||||
|
Lazy::new(|| Mutex::new(HashMap::with_capacity(0)));
|
||||||
|
|
||||||
|
let local_srv = (Host::parse(&jwt.claims.r)?, jwt.claims.rp);
|
||||||
|
let bind = format!("{}:{}", local_srv.0, local_srv.1);
|
||||||
|
let listening_server = socks5::run_server(bind.parse()?);
|
||||||
|
let (tcp, local_srv) = run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?;
|
||||||
|
let (local_rx, local_tx) = tokio::io::split(tcp);
|
||||||
|
|
||||||
|
Ok((jwt.claims.p, local_srv.0, local_srv.1, Box::pin(local_rx), Box::pin(local_tx)))
|
||||||
|
}
|
||||||
_ => Err(anyhow::anyhow!("Invalid upgrade request")),
|
_ => Err(anyhow::anyhow!("Invalid upgrade request")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
async fn run_listening_server<T, Fut, FutOut>(
|
async fn run_listening_server<T, Fut, FutOut, E>(
|
||||||
local_srv: &(Host, u16),
|
local_srv: &(Host, u16),
|
||||||
servers: &Mutex<HashMap<(Host<String>, u16), mpsc::Receiver<T>>>,
|
servers: &Mutex<HashMap<(Host<String>, u16), mpsc::Receiver<T>>>,
|
||||||
gen_listening_server: Fut,
|
gen_listening_server: Fut,
|
||||||
) -> anyhow::Result<T>
|
) -> anyhow::Result<T>
|
||||||
where
|
where
|
||||||
Fut: Future<Output = anyhow::Result<FutOut>>,
|
Fut: Future<Output = anyhow::Result<FutOut>>,
|
||||||
FutOut: Stream<Item = io::Result<T>> + Send + 'static,
|
FutOut: Stream<Item = Result<T, E>> + Send + 'static,
|
||||||
|
E: Debug + Send,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
let listening_server = servers.lock().remove(local_srv);
|
let listening_server = servers.lock().remove(local_srv);
|
||||||
|
@ -214,7 +230,7 @@ async fn server_upgrade(
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("connected to {:?} {:?} {:?}", protocol, dest, port);
|
info!("connected to {:?} {:?} {:?}", protocol, dest, port);
|
||||||
let (response, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
|
let (mut response, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
|
||||||
Ok(ret) => ret,
|
Ok(ret) => ret,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Rejecting connection with bad upgrade request: {} {}", err, req.uri());
|
warn!("Rejecting connection with bad upgrade request: {} {}", err, req.uri());
|
||||||
|
@ -244,6 +260,9 @@ async fn server_upgrade(
|
||||||
.instrument(Span::current()),
|
.instrument(Span::current()),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
response
|
||||||
|
.headers_mut()
|
||||||
|
.insert(COOKIE, HeaderValue::from_str(&format!("fake://{}:{}", dest, port)).unwrap());
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue