Add tcp reverse tunneling

This commit is contained in:
Σrebe - Romain GERARD 2023-11-26 15:47:49 +01:00
parent cb4675a817
commit 87cf422489
No known key found for this signature in database
GPG key ID: 7A42B4B97E0332F4
6 changed files with 121 additions and 5 deletions

View file

@ -1,5 +1,5 @@
use super::{JwtTunnelConfig, JWT_KEY};
use crate::{LocalToRemote, WsClientConfig};
use crate::{tcp, LocalToRemote, WsClientConfig};
use anyhow::{anyhow, Context};
use fastwebsockets::WebSocket;
@ -9,6 +9,7 @@ use hyper::header::{CONNECTION, HOST, SEC_WEBSOCKET_KEY};
use hyper::upgrade::Upgraded;
use hyper::{Body, Request};
use std::future::Future;
use std::net::IpAddr;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
@ -147,3 +148,67 @@ where
Ok(())
}
pub async fn run_reverse_tunnel(
client_config: Arc<WsClientConfig>,
mut tunnel_cfg: LocalToRemote,
) -> anyhow::Result<()> {
// Invert local with remote
let remote = tunnel_cfg.remote;
tunnel_cfg.remote = match tunnel_cfg.local.ip() {
IpAddr::V4(ip) => (Host::Ipv4(ip), tunnel_cfg.local.port()),
IpAddr::V6(ip) => (Host::Ipv6(ip), tunnel_cfg.local.port()),
};
loop {
let client_config = client_config.clone();
let request_id = Uuid::now_v7();
let span = span!(
Level::INFO,
"tunnel",
id = request_id.to_string(),
remote = format!("{}:{}", tunnel_cfg.remote.0, tunnel_cfg.remote.1)
);
let _span = span.enter();
// Correctly configure tunnel cfg
let mut ws = connect(request_id, &client_config, &tunnel_cfg)
.instrument(span.clone())
.await?;
ws.set_auto_apply_mask(client_config.websocket_mask_frame);
// Connect to endpoint
let stream = tcp::connect(
&remote.0,
remote.1,
&client_config.socket_so_mark,
client_config.timeout_connect,
)
.instrument(span.clone())
.await;
let stream = match stream {
Ok(s) => s,
Err(err) => {
error!("Cannot connect to {remote:?}: {err:?}");
continue;
}
};
let (local_rx, local_tx) = tokio::io::split(stream);
let (ws_rx, ws_tx) = ws.split(tokio::io::split);
let (close_tx, close_rx) = oneshot::channel::<()>();
let tunnel = async move {
let ping_frequency = client_config.websocket_ping_frequency;
tokio::spawn(
super::io::propagate_read(local_rx, ws_tx, close_tx, Some(ping_frequency)).instrument(Span::current()),
);
// Forward websocket rx to local rx
let _ = super::io::propagate_write(local_tx, ws_rx, close_rx).await;
}
.instrument(span.clone());
tokio::spawn(tunnel);
}
}

View file

@ -34,6 +34,7 @@ impl JwtTunnelConfig {
LocalProtocol::Udp { .. } => tunnel.local_protocol,
LocalProtocol::Stdio => LocalProtocol::Tcp,
LocalProtocol::Socks5 => LocalProtocol::Tcp,
LocalProtocol::ReverseTcp => LocalProtocol::ReverseTcp,
},
r: tunnel.remote.0.to_string(),
rp: tunnel.remote.1,

View file

@ -1,3 +1,5 @@
use ahash::{HashMap, HashMapExt};
use futures_util::StreamExt;
use std::cmp::min;
use std::ops::{Deref, Not};
use std::pin::Pin;
@ -10,10 +12,13 @@ use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{http, Body, Request, Response, StatusCode};
use jsonwebtoken::TokenData;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio_stream::wrappers::TcpListenerStream;
use tracing::{error, info, span, warn, Instrument, Level, Span};
use url::Host;
@ -63,7 +68,7 @@ async fn from_query(
Box::pin(cnx),
))
}
LocalProtocol::Tcp { .. } => {
LocalProtocol::Tcp => {
let host = Host::parse(&jwt.claims.r)?;
let port = jwt.claims.rp;
let (rx, tx) = tcp::connect(&host, port, &server_config.socket_so_mark, Duration::from_secs(10))
@ -72,6 +77,26 @@ async fn from_query(
Ok((jwt.claims.p, host, port, Box::pin(rx), Box::pin(tx)))
}
LocalProtocol::ReverseTcp => {
#[allow(clippy::type_complexity)]
static REVERSE: Lazy<Mutex<HashMap<(Host<String>, u16), TcpListenerStream>>> =
Lazy::new(|| Mutex::new(HashMap::with_capacity(0)));
let local_srv = (Host::parse(&jwt.claims.r)?, jwt.claims.rp);
let listening_server = REVERSE.lock().remove(&local_srv);
let mut listening_server = if let Some(listening_server) = listening_server {
listening_server
} else {
let bind = format!("{}:{}", local_srv.0, local_srv.1);
tcp::run_server(bind.parse()?).await?
};
let tcp = listening_server.next().await.unwrap()?;
let (local_rx, local_tx) = tokio::io::split(tcp);
REVERSE.lock().insert(local_srv.clone(), listening_server);
Ok((jwt.claims.p, local_srv.0, local_srv.1, Box::pin(local_rx), Box::pin(local_tx)))
}
_ => Err(anyhow::anyhow!("Invalid upgrade request")),
}
}