feat: Add http proxy support

This commit is contained in:
Σrebe - Romain GERARD 2024-07-27 20:10:05 +02:00
parent b077819cc6
commit 6e10c27dbb
No known key found for this signature in database
GPG key ID: 7A42B4B97E0332F4
5 changed files with 285 additions and 7 deletions

145
src/http_proxy.rs Normal file
View file

@ -0,0 +1,145 @@
use anyhow::Context;
use std::future::Future;
use bytes::Bytes;
use log::{debug, error};
use std::net::{Ipv4Addr, SocketAddr};
use std::pin::Pin;
use base64::Engine;
use futures_util::{future, stream, Stream};
use http_body_util::Empty;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioTimer;
use parking_lot::Mutex;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tracing::log::info;
use url::Host;
#[allow(clippy::type_complexity)]
pub struct HttpProxyListener {
listener: Pin<Box<dyn Stream<Item = anyhow::Result<(TcpStream, (Host, u16))>> + Send>>,
}
impl Stream for HttpProxyListener {
type Item = anyhow::Result<(TcpStream, (Host, u16))>;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
unsafe { self.map_unchecked_mut(|x| &mut x.listener) }.poll_next(cx)
}
}
fn server_client(
credentials: &Option<String>,
dest: &Mutex<(Host, u16)>,
req: Request<Incoming>,
) -> impl Future<Output = Result<Response<Empty<Bytes>>, &'static str>> {
const PROXY_AUTHORIZATION_PREFIX: &str = "Basic ";
let ok_response = |forward_to: (Host, u16)| -> Result<Response<Empty<Bytes>>, _> {
*dest.lock() = forward_to;
Ok(Response::builder().status(200).body(Empty::new()).unwrap())
};
fn err_response() -> Result<Response<Empty<Bytes>>, &'static str> {
info!("Un-authorized connection to http proxy");
Err("Un-authorized")
}
if req.method() != hyper::Method::CONNECT {
return future::ready(err_response());
}
debug!("HTTP Proxy CONNECT request to {}", req.uri());
let forward_to = (
Host::parse(req.uri().host().unwrap_or_default()).unwrap_or(Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0))),
req.uri().port_u16().unwrap_or(443),
);
let Some(token) = credentials else {
return future::ready(ok_response(forward_to));
};
let Some(auth) = req.headers().get(hyper::header::PROXY_AUTHORIZATION) else {
return future::ready(err_response());
};
let auth = auth.to_str().unwrap_or_default().trim();
if auth.starts_with(PROXY_AUTHORIZATION_PREFIX) && &auth[PROXY_AUTHORIZATION_PREFIX.len()..] == token {
return future::ready(ok_response(forward_to));
}
future::ready(err_response())
}
pub async fn run_server(
bind: SocketAddr,
timeout: Option<Duration>,
credentials: Option<(String, String)>,
) -> Result<HttpProxyListener, anyhow::Error> {
info!(
"Starting http proxy server listening cnx on {} with credentials {:?}",
bind, credentials
);
let listener = TcpListener::bind(bind)
.await
.with_context(|| format!("Cannot create TCP server {:?}", bind))?;
let http1 = {
let mut builder = http1::Builder::new();
builder
.timer(TokioTimer::new())
.header_read_timeout(timeout)
.keep_alive(false);
builder
};
let auth_header =
credentials.map(|(user, pass)| base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, pass)));
let listener = stream::unfold((listener, http1, auth_header), |(listener, http1, auth_header)| async {
loop {
let (mut stream, _) = match listener.accept().await {
Ok(v) => v,
Err(err) => {
error!("Error while accepting connection {:?}", err);
continue;
}
};
let forward_to = Mutex::new((Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0)), 0));
let conn_fut = http1.serve_connection(
hyper_util::rt::TokioIo::new(&mut stream),
service_fn(|req| server_client(&auth_header, &forward_to, req)),
);
match conn_fut.await {
Ok(_) => return Some((Ok((stream, forward_to.into_inner())), (listener, http1, auth_header))),
Err(err) => {
info!("Error while serving connection: {}", err);
continue;
}
}
}
});
Ok(HttpProxyListener {
listener: Box::pin(listener),
})
}
//#[cfg(test)]
//mod tests {
// use super::*;
// use tracing::level_filters::LevelFilter;
//
// #[tokio::test]
// async fn test_run_server() {
// tracing_subscriber::fmt()
// .with_ansi(true)
// .with_max_level(LevelFilter::TRACE)
// .init();
// let x = run_server("127.0.0.1:1212".parse().unwrap(), None, None).await;
// }
//}

View file

@ -1,5 +1,6 @@
mod dns;
mod embedded_certificate;
mod http_proxy;
mod restrictions;
mod socks5;
mod socks5_udp;
@ -108,6 +109,9 @@ struct Client {
/// 'socks5://[::1]:1212' => listen locally with socks5 on port 1212 and forward dynamically requested tunnel
/// 'socks5://[::1]:1212?login=admin&password=admin' => listen locally with socks5 on port 1212 and only accept connection with login=admin and password=admin
///
/// 'httpproxy://[::1]:1212' => start a http proxy on port 1212 and forward dynamically requested tunnel
/// 'httpproxy://[::1]:1212?login=admin&password=admin' => start a http proxy on port 1212 and only accept connection with login=admin and password=admin
///
/// 'tproxy+tcp://[::1]:1212' => listen locally on tcp on port 1212 as a *transparent proxy* and forward dynamically requested tunnel
/// 'tproxy+udp://[::1]:1212?timeout_sec=10' listen locally on udp on port 1212 as a *transparent proxy* and forward dynamically requested tunnel
/// linux only and requires sudo/CAP_NET_ADMIN
@ -123,6 +127,7 @@ struct Client {
/// 'tcp://1212:google.com:443' => listen on server for incoming tcp cnx on port 1212 and forward to google.com on port 443 from local machine
/// 'udp://1212:1.1.1.1:53' => listen on server for incoming udp on port 1212 and forward to cloudflare dns 1.1.1.1 on port 53 from local machine
/// 'socks5://[::1]:1212' => listen on server for incoming socks5 request on port 1212 and forward dynamically request from local machine (login/password is supported)
/// 'httpproxy://[::1]:1212' => listen on server for incoming http proxy request on port 1212 and forward dynamically request from local machine (login/password is supported)
/// 'unix://wstunnel.sock:g.com:443' => listen on server for incoming data from unix socket of path wstunnel.sock and forward to g.com:443 from local machine
#[arg(short='R', long, value_name = "{tcp,udp,socks5,unix}://[BIND:]PORT:HOST:PORT", value_parser = parse_tunnel_arg, verbatim_doc_comment)]
remote_to_local: Vec<LocalToRemote>,
@ -230,7 +235,7 @@ struct Client {
/// Obviously, this is not going to work for tunneling traffic
/// - if you have wstunnel behind a reverse proxy, most of them (i.e: nginx) are going to turn http2 request into http1
/// This is not going to work, because http1 does not support streaming naturally
/// The only way to make it works with http2 is to have wstunnel directly exposed to the internet without any reverse proxy in front of it
/// - The only way to make it works with http2 is to have wstunnel directly exposed to the internet without any reverse proxy in front of it
#[arg(value_name = "ws[s]|http[s]://wstunnel.server.com[:port]", value_parser = parse_server_url, verbatim_doc_comment)]
remote_addr: Url,
@ -380,6 +385,11 @@ enum LocalProtocol {
TProxyUdp {
timeout: Option<Duration>,
},
HttpProxy {
timeout: Option<Duration>,
credentials: Option<(String, String)>,
proxy_protocol: bool,
},
ReverseTcp,
ReverseUdp {
timeout: Option<Duration>,
@ -388,6 +398,10 @@ enum LocalProtocol {
timeout: Option<Duration>,
credentials: Option<(String, String)>,
},
ReverseHttpProxy {
timeout: Option<Duration>,
credentials: Option<(String, String)>,
},
ReverseUnix {
path: PathBuf,
},
@ -398,7 +412,14 @@ enum LocalProtocol {
impl LocalProtocol {
pub const fn is_reverse_tunnel(&self) -> bool {
matches!(self, Self::ReverseTcp | Self::ReverseUdp { .. } | Self::ReverseSocks5 { .. })
matches!(
self,
Self::ReverseTcp
| Self::ReverseUdp { .. }
| Self::ReverseSocks5 { .. }
| Self::ReverseUnix { .. }
| Self::ReverseHttpProxy { .. }
)
}
}
@ -539,7 +560,7 @@ fn parse_tunnel_arg(arg: &str) -> Result<LocalToRemote, io::Error> {
}
_ => match &arg[..8] {
"socks5:/" => {
let (local_bind, remaining) = parse_local_bind(&arg[9..])?;
let (local_bind, remaining) = parse_local_bind(&arg["socks5://".len()..])?;
let x = format!("0.0.0.0:0?{}", remaining);
let (dest_host, dest_port, options) = parse_tunnel_dest(&x)?;
let timeout = options
@ -556,8 +577,31 @@ fn parse_tunnel_arg(arg: &str) -> Result<LocalToRemote, io::Error> {
remote: (dest_host, dest_port),
})
}
"httpprox" => {
let (local_bind, remaining) = parse_local_bind(&arg["httpproxy://".len()..])?;
let x = format!("0.0.0.0:0?{}", remaining);
let (dest_host, dest_port, options) = parse_tunnel_dest(&x)?;
let proxy_protocol = options.contains_key("proxy_protocol");
let timeout = options
.get("timeout_sec")
.and_then(|x| x.parse::<u64>().ok())
.map(|d| if d == 0 { None } else { Some(Duration::from_secs(d)) })
.unwrap_or(Some(Duration::from_secs(30)));
let credentials = options
.get("login")
.and_then(|login| options.get("password").map(|p| (login.to_string(), p.to_string())));
Ok(LocalToRemote {
local_protocol: LocalProtocol::HttpProxy {
timeout,
credentials,
proxy_protocol,
},
local: local_bind,
remote: (dest_host, dest_port),
})
}
"stdio://" => {
let (dest_host, dest_port, _options) = parse_tunnel_dest(&arg[8..])?;
let (dest_host, dest_port, _options) = parse_tunnel_dest(&arg["stdio://".len()..])?;
Ok(LocalToRemote {
local_protocol: LocalProtocol::Stdio,
local: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(0), 0)),
@ -1055,6 +1099,39 @@ async fn main() {
}
});
}
LocalProtocol::HttpProxy {
timeout, credentials, ..
} => {
let credentials = credentials.clone();
let timeout = *timeout;
tokio::spawn(async move {
let cfg = client_config.clone();
let (host, port) = to_host_port(tunnel.local);
let remote = RemoteAddr {
protocol: LocalProtocol::ReverseHttpProxy { timeout, credentials },
host,
port,
};
let connect_to_dest = |remote: Option<RemoteAddr>| {
let so_mark = cfg.socket_so_mark;
let timeout = cfg.timeout_connect;
let dns_resolver = &cfg.dns_resolver;
async move {
let Some(remote) = remote else {
return Err(anyhow!("Missing remote destination for reverse socks5"));
};
tcp::connect(&remote.host, remote.port, so_mark, timeout, dns_resolver).await
}
};
if let Err(err) =
tunnel::client::run_reverse_tunnel(client_config, remote, connect_to_dest).await
{
error!("{:?}", err);
}
});
}
#[cfg(unix)]
LocalProtocol::Unix { path } => {
let path = path.clone();
@ -1095,7 +1172,8 @@ async fn main() {
| LocalProtocol::ReverseTcp
| LocalProtocol::ReverseUdp { .. }
| LocalProtocol::ReverseSocks5 { .. }
| LocalProtocol::ReverseUnix { .. } => {
| LocalProtocol::ReverseHttpProxy { .. } => {}
LocalProtocol::ReverseUnix { .. } => {
panic!("Invalid protocol for reverse tunnel");
}
}
@ -1251,6 +1329,30 @@ async fn main() {
}
});
}
LocalProtocol::HttpProxy {
timeout,
credentials,
proxy_protocol,
} => {
let proxy_protocol = *proxy_protocol;
let server = http_proxy::run_server(tunnel.local, *timeout, credentials.clone())
.await
.unwrap_or_else(|err| panic!("Cannot start http proxy server on {}: {}", tunnel.local, err))
.map_ok(move |(stream, (host, port))| {
let remote = RemoteAddr {
protocol: LocalProtocol::Tcp { proxy_protocol },
host,
port,
};
(tokio::io::split(stream), remote)
});
tokio::spawn(async move {
if let Err(err) = tunnel::client::run_tunnel(client_config, server).await {
error!("{:?}", err);
}
});
}
LocalProtocol::Stdio => {
let (server, mut handle) = stdio::server::run_server().await.unwrap_or_else(|err| {
@ -1287,6 +1389,7 @@ async fn main() {
LocalProtocol::ReverseUdp { .. } => {}
LocalProtocol::ReverseSocks5 { .. } => {}
LocalProtocol::ReverseUnix { .. } => {}
LocalProtocol::ReverseHttpProxy { .. } => {}
}
}
}

View file

@ -78,6 +78,7 @@ pub enum ReverseTunnelConfigProtocol {
Udp,
Socks5,
Unix,
HttpProxy,
Unknown,
}
@ -160,11 +161,13 @@ impl From<&LocalProtocol> for ReverseTunnelConfigProtocol {
| LocalProtocol::Socks5 { .. }
| LocalProtocol::TProxyTcp { .. }
| LocalProtocol::TProxyUdp { .. }
| LocalProtocol::HttpProxy { .. }
| LocalProtocol::Unix { .. } => Self::Unknown,
LocalProtocol::ReverseTcp => Self::Tcp,
LocalProtocol::ReverseUdp { .. } => Self::Udp,
LocalProtocol::ReverseSocks5 { .. } => Self::Socks5,
LocalProtocol::ReverseUnix { .. } => Self::Unix,
LocalProtocol::ReverseHttpProxy { .. } => Self::HttpProxy,
}
}
}
@ -179,6 +182,8 @@ impl From<&LocalProtocol> for TunnelConfigProtocol {
| LocalProtocol::Socks5 { .. }
| LocalProtocol::TProxyTcp { .. }
| LocalProtocol::TProxyUdp { .. }
| LocalProtocol::HttpProxy { .. }
| LocalProtocol::ReverseHttpProxy { .. }
| LocalProtocol::Unix { .. } => Self::Unknown,
LocalProtocol::Tcp { .. } => Self::Tcp,
LocalProtocol::Udp { .. } => Self::Udp,

View file

@ -41,6 +41,7 @@ impl JwtTunnelConfig {
LocalProtocol::Udp { .. } => dest.protocol.clone(),
LocalProtocol::Stdio => LocalProtocol::Tcp { proxy_protocol: false },
LocalProtocol::Socks5 { .. } => LocalProtocol::Tcp { proxy_protocol: false },
LocalProtocol::HttpProxy { .. } => dest.protocol.clone(),
LocalProtocol::ReverseTcp => LocalProtocol::ReverseTcp,
LocalProtocol::ReverseUdp { .. } => dest.protocol.clone(),
LocalProtocol::ReverseSocks5 { .. } => dest.protocol.clone(),
@ -48,6 +49,7 @@ impl JwtTunnelConfig {
LocalProtocol::TProxyUdp { timeout } => LocalProtocol::Udp { timeout },
LocalProtocol::Unix { .. } => LocalProtocol::Tcp { proxy_protocol: false },
LocalProtocol::ReverseUnix { .. } => dest.protocol.clone(),
LocalProtocol::ReverseHttpProxy { .. } => dest.protocol.clone(),
},
r: dest.host.to_string(),
rp: dest.port,

View file

@ -15,7 +15,7 @@ use std::sync::Arc;
use std::time::Duration;
use super::{tunnel_to_jwt_token, JwtTunnelConfig, RemoteAddr, JWT_DECODE, JWT_HEADER_PREFIX};
use crate::{socks5, tcp, tls, udp, LocalProtocol, TlsServerConfig, WsServerConfig};
use crate::{http_proxy, socks5, tcp, tls, udp, LocalProtocol, TlsServerConfig, WsServerConfig};
use hyper::body::{Frame, Incoming};
use hyper::header::{CONTENT_TYPE, COOKIE, SEC_WEBSOCKET_PROTOCOL};
use hyper::http::HeaderValue;
@ -150,6 +150,25 @@ async fn run_tunnel(
};
Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
}
LocalProtocol::ReverseHttpProxy { timeout, credentials } => {
#[allow(clippy::type_complexity)]
static SERVERS: Lazy<Mutex<HashMap<(Host<String>, u16), mpsc::Receiver<(TcpStream, (Host, u16))>>>> =
Lazy::new(|| Mutex::new(HashMap::with_capacity(0)));
let remote_port = find_mapped_port(remote.port, restriction);
let local_srv = (remote.host, remote_port);
let bind = format!("{}:{}", local_srv.0, local_srv.1);
let listening_server = http_proxy::run_server(bind.parse()?, timeout, credentials);
let (stream, local_srv) = run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?;
let (local_rx, local_tx) = tokio::io::split(stream);
let remote = RemoteAddr {
protocol: LocalProtocol::Tcp { proxy_protocol: false },
host: local_srv.0,
port: local_srv.1,
};
Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
}
#[cfg(unix)]
LocalProtocol::ReverseUnix { ref path } => {
use crate::unix_socket;
@ -181,6 +200,7 @@ async fn run_tunnel(
| LocalProtocol::Socks5 { .. }
| LocalProtocol::TProxyTcp
| LocalProtocol::TProxyUdp { .. }
| LocalProtocol::HttpProxy { .. }
| LocalProtocol::Unix { .. } => {
error!("Received an unsupported target protocol {:?}", remote);
Err(anyhow::anyhow!("Invalid upgrade request"))
@ -572,7 +592,10 @@ async fn ws_server_upgrade(
.instrument(Span::current()),
);
if matches!(req_protocol, LocalProtocol::ReverseSocks5 { .. }) {
if matches!(
req_protocol,
LocalProtocol::ReverseSocks5 { .. } | LocalProtocol::ReverseHttpProxy { .. }
) {
let Ok(header_val) = HeaderValue::from_str(&tunnel_to_jwt_token(Uuid::from_u128(0), &remote_addr)) else {
error!("Bad headervalue for reverse socks5: {} {}", remote_addr.host, remote_addr.port);
return http::Response::builder()