diff --git a/src/http_proxy.rs b/src/http_proxy.rs new file mode 100644 index 0000000..ab81961 --- /dev/null +++ b/src/http_proxy.rs @@ -0,0 +1,145 @@ +use anyhow::Context; +use std::future::Future; + +use bytes::Bytes; +use log::{debug, error}; +use std::net::{Ipv4Addr, SocketAddr}; +use std::pin::Pin; + +use base64::Engine; +use futures_util::{future, stream, Stream}; +use http_body_util::Empty; +use hyper::body::Incoming; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioTimer; +use parking_lot::Mutex; +use std::time::Duration; +use tokio::net::{TcpListener, TcpStream}; +use tracing::log::info; +use url::Host; + +#[allow(clippy::type_complexity)] +pub struct HttpProxyListener { + listener: Pin> + Send>>, +} + +impl Stream for HttpProxyListener { + type Item = anyhow::Result<(TcpStream, (Host, u16))>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + unsafe { self.map_unchecked_mut(|x| &mut x.listener) }.poll_next(cx) + } +} + +fn server_client( + credentials: &Option, + dest: &Mutex<(Host, u16)>, + req: Request, +) -> impl Future>, &'static str>> { + const PROXY_AUTHORIZATION_PREFIX: &str = "Basic "; + let ok_response = |forward_to: (Host, u16)| -> Result>, _> { + *dest.lock() = forward_to; + Ok(Response::builder().status(200).body(Empty::new()).unwrap()) + }; + fn err_response() -> Result>, &'static str> { + info!("Un-authorized connection to http proxy"); + Err("Un-authorized") + } + + if req.method() != hyper::Method::CONNECT { + return future::ready(err_response()); + } + + debug!("HTTP Proxy CONNECT request to {}", req.uri()); + let forward_to = ( + Host::parse(req.uri().host().unwrap_or_default()).unwrap_or(Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0))), + req.uri().port_u16().unwrap_or(443), + ); + + let Some(token) = credentials else { + return future::ready(ok_response(forward_to)); + }; + + let Some(auth) = req.headers().get(hyper::header::PROXY_AUTHORIZATION) else { + return future::ready(err_response()); + }; + + let auth = auth.to_str().unwrap_or_default().trim(); + if auth.starts_with(PROXY_AUTHORIZATION_PREFIX) && &auth[PROXY_AUTHORIZATION_PREFIX.len()..] == token { + return future::ready(ok_response(forward_to)); + } + + future::ready(err_response()) +} + +pub async fn run_server( + bind: SocketAddr, + timeout: Option, + credentials: Option<(String, String)>, +) -> Result { + info!( + "Starting http proxy server listening cnx on {} with credentials {:?}", + bind, credentials + ); + + let listener = TcpListener::bind(bind) + .await + .with_context(|| format!("Cannot create TCP server {:?}", bind))?; + + let http1 = { + let mut builder = http1::Builder::new(); + builder + .timer(TokioTimer::new()) + .header_read_timeout(timeout) + .keep_alive(false); + builder + }; + let auth_header = + credentials.map(|(user, pass)| base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, pass))); + + let listener = stream::unfold((listener, http1, auth_header), |(listener, http1, auth_header)| async { + loop { + let (mut stream, _) = match listener.accept().await { + Ok(v) => v, + Err(err) => { + error!("Error while accepting connection {:?}", err); + continue; + } + }; + + let forward_to = Mutex::new((Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0)), 0)); + let conn_fut = http1.serve_connection( + hyper_util::rt::TokioIo::new(&mut stream), + service_fn(|req| server_client(&auth_header, &forward_to, req)), + ); + match conn_fut.await { + Ok(_) => return Some((Ok((stream, forward_to.into_inner())), (listener, http1, auth_header))), + Err(err) => { + info!("Error while serving connection: {}", err); + continue; + } + } + } + }); + + Ok(HttpProxyListener { + listener: Box::pin(listener), + }) +} + +//#[cfg(test)] +//mod tests { +// use super::*; +// use tracing::level_filters::LevelFilter; +// +// #[tokio::test] +// async fn test_run_server() { +// tracing_subscriber::fmt() +// .with_ansi(true) +// .with_max_level(LevelFilter::TRACE) +// .init(); +// let x = run_server("127.0.0.1:1212".parse().unwrap(), None, None).await; +// } +//} diff --git a/src/main.rs b/src/main.rs index 1474a1a..da47b1e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod dns; mod embedded_certificate; +mod http_proxy; mod restrictions; mod socks5; mod socks5_udp; @@ -108,6 +109,9 @@ struct Client { /// 'socks5://[::1]:1212' => listen locally with socks5 on port 1212 and forward dynamically requested tunnel /// 'socks5://[::1]:1212?login=admin&password=admin' => listen locally with socks5 on port 1212 and only accept connection with login=admin and password=admin /// + /// 'httpproxy://[::1]:1212' => start a http proxy on port 1212 and forward dynamically requested tunnel + /// 'httpproxy://[::1]:1212?login=admin&password=admin' => start a http proxy on port 1212 and only accept connection with login=admin and password=admin + /// /// 'tproxy+tcp://[::1]:1212' => listen locally on tcp on port 1212 as a *transparent proxy* and forward dynamically requested tunnel /// 'tproxy+udp://[::1]:1212?timeout_sec=10' listen locally on udp on port 1212 as a *transparent proxy* and forward dynamically requested tunnel /// linux only and requires sudo/CAP_NET_ADMIN @@ -123,6 +127,7 @@ struct Client { /// 'tcp://1212:google.com:443' => listen on server for incoming tcp cnx on port 1212 and forward to google.com on port 443 from local machine /// 'udp://1212:1.1.1.1:53' => listen on server for incoming udp on port 1212 and forward to cloudflare dns 1.1.1.1 on port 53 from local machine /// 'socks5://[::1]:1212' => listen on server for incoming socks5 request on port 1212 and forward dynamically request from local machine (login/password is supported) + /// 'httpproxy://[::1]:1212' => listen on server for incoming http proxy request on port 1212 and forward dynamically request from local machine (login/password is supported) /// 'unix://wstunnel.sock:g.com:443' => listen on server for incoming data from unix socket of path wstunnel.sock and forward to g.com:443 from local machine #[arg(short='R', long, value_name = "{tcp,udp,socks5,unix}://[BIND:]PORT:HOST:PORT", value_parser = parse_tunnel_arg, verbatim_doc_comment)] remote_to_local: Vec, @@ -230,7 +235,7 @@ struct Client { /// Obviously, this is not going to work for tunneling traffic /// - if you have wstunnel behind a reverse proxy, most of them (i.e: nginx) are going to turn http2 request into http1 /// This is not going to work, because http1 does not support streaming naturally - /// The only way to make it works with http2 is to have wstunnel directly exposed to the internet without any reverse proxy in front of it + /// - The only way to make it works with http2 is to have wstunnel directly exposed to the internet without any reverse proxy in front of it #[arg(value_name = "ws[s]|http[s]://wstunnel.server.com[:port]", value_parser = parse_server_url, verbatim_doc_comment)] remote_addr: Url, @@ -380,6 +385,11 @@ enum LocalProtocol { TProxyUdp { timeout: Option, }, + HttpProxy { + timeout: Option, + credentials: Option<(String, String)>, + proxy_protocol: bool, + }, ReverseTcp, ReverseUdp { timeout: Option, @@ -388,6 +398,10 @@ enum LocalProtocol { timeout: Option, credentials: Option<(String, String)>, }, + ReverseHttpProxy { + timeout: Option, + credentials: Option<(String, String)>, + }, ReverseUnix { path: PathBuf, }, @@ -398,7 +412,14 @@ enum LocalProtocol { impl LocalProtocol { pub const fn is_reverse_tunnel(&self) -> bool { - matches!(self, Self::ReverseTcp | Self::ReverseUdp { .. } | Self::ReverseSocks5 { .. }) + matches!( + self, + Self::ReverseTcp + | Self::ReverseUdp { .. } + | Self::ReverseSocks5 { .. } + | Self::ReverseUnix { .. } + | Self::ReverseHttpProxy { .. } + ) } } @@ -539,7 +560,7 @@ fn parse_tunnel_arg(arg: &str) -> Result { } _ => match &arg[..8] { "socks5:/" => { - let (local_bind, remaining) = parse_local_bind(&arg[9..])?; + let (local_bind, remaining) = parse_local_bind(&arg["socks5://".len()..])?; let x = format!("0.0.0.0:0?{}", remaining); let (dest_host, dest_port, options) = parse_tunnel_dest(&x)?; let timeout = options @@ -556,8 +577,31 @@ fn parse_tunnel_arg(arg: &str) -> Result { remote: (dest_host, dest_port), }) } + "httpprox" => { + let (local_bind, remaining) = parse_local_bind(&arg["httpproxy://".len()..])?; + let x = format!("0.0.0.0:0?{}", remaining); + let (dest_host, dest_port, options) = parse_tunnel_dest(&x)?; + let proxy_protocol = options.contains_key("proxy_protocol"); + let timeout = options + .get("timeout_sec") + .and_then(|x| x.parse::().ok()) + .map(|d| if d == 0 { None } else { Some(Duration::from_secs(d)) }) + .unwrap_or(Some(Duration::from_secs(30))); + let credentials = options + .get("login") + .and_then(|login| options.get("password").map(|p| (login.to_string(), p.to_string()))); + Ok(LocalToRemote { + local_protocol: LocalProtocol::HttpProxy { + timeout, + credentials, + proxy_protocol, + }, + local: local_bind, + remote: (dest_host, dest_port), + }) + } "stdio://" => { - let (dest_host, dest_port, _options) = parse_tunnel_dest(&arg[8..])?; + let (dest_host, dest_port, _options) = parse_tunnel_dest(&arg["stdio://".len()..])?; Ok(LocalToRemote { local_protocol: LocalProtocol::Stdio, local: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(0), 0)), @@ -1055,6 +1099,39 @@ async fn main() { } }); } + LocalProtocol::HttpProxy { + timeout, credentials, .. + } => { + let credentials = credentials.clone(); + let timeout = *timeout; + tokio::spawn(async move { + let cfg = client_config.clone(); + let (host, port) = to_host_port(tunnel.local); + let remote = RemoteAddr { + protocol: LocalProtocol::ReverseHttpProxy { timeout, credentials }, + host, + port, + }; + let connect_to_dest = |remote: Option| { + let so_mark = cfg.socket_so_mark; + let timeout = cfg.timeout_connect; + let dns_resolver = &cfg.dns_resolver; + async move { + let Some(remote) = remote else { + return Err(anyhow!("Missing remote destination for reverse socks5")); + }; + + tcp::connect(&remote.host, remote.port, so_mark, timeout, dns_resolver).await + } + }; + + if let Err(err) = + tunnel::client::run_reverse_tunnel(client_config, remote, connect_to_dest).await + { + error!("{:?}", err); + } + }); + } #[cfg(unix)] LocalProtocol::Unix { path } => { let path = path.clone(); @@ -1095,7 +1172,8 @@ async fn main() { | LocalProtocol::ReverseTcp | LocalProtocol::ReverseUdp { .. } | LocalProtocol::ReverseSocks5 { .. } - | LocalProtocol::ReverseUnix { .. } => { + | LocalProtocol::ReverseHttpProxy { .. } => {} + LocalProtocol::ReverseUnix { .. } => { panic!("Invalid protocol for reverse tunnel"); } } @@ -1251,6 +1329,30 @@ async fn main() { } }); } + LocalProtocol::HttpProxy { + timeout, + credentials, + proxy_protocol, + } => { + let proxy_protocol = *proxy_protocol; + let server = http_proxy::run_server(tunnel.local, *timeout, credentials.clone()) + .await + .unwrap_or_else(|err| panic!("Cannot start http proxy server on {}: {}", tunnel.local, err)) + .map_ok(move |(stream, (host, port))| { + let remote = RemoteAddr { + protocol: LocalProtocol::Tcp { proxy_protocol }, + host, + port, + }; + (tokio::io::split(stream), remote) + }); + + tokio::spawn(async move { + if let Err(err) = tunnel::client::run_tunnel(client_config, server).await { + error!("{:?}", err); + } + }); + } LocalProtocol::Stdio => { let (server, mut handle) = stdio::server::run_server().await.unwrap_or_else(|err| { @@ -1287,6 +1389,7 @@ async fn main() { LocalProtocol::ReverseUdp { .. } => {} LocalProtocol::ReverseSocks5 { .. } => {} LocalProtocol::ReverseUnix { .. } => {} + LocalProtocol::ReverseHttpProxy { .. } => {} } } } diff --git a/src/restrictions/types.rs b/src/restrictions/types.rs index 28a41d0..9637528 100644 --- a/src/restrictions/types.rs +++ b/src/restrictions/types.rs @@ -78,6 +78,7 @@ pub enum ReverseTunnelConfigProtocol { Udp, Socks5, Unix, + HttpProxy, Unknown, } @@ -160,11 +161,13 @@ impl From<&LocalProtocol> for ReverseTunnelConfigProtocol { | LocalProtocol::Socks5 { .. } | LocalProtocol::TProxyTcp { .. } | LocalProtocol::TProxyUdp { .. } + | LocalProtocol::HttpProxy { .. } | LocalProtocol::Unix { .. } => Self::Unknown, LocalProtocol::ReverseTcp => Self::Tcp, LocalProtocol::ReverseUdp { .. } => Self::Udp, LocalProtocol::ReverseSocks5 { .. } => Self::Socks5, LocalProtocol::ReverseUnix { .. } => Self::Unix, + LocalProtocol::ReverseHttpProxy { .. } => Self::HttpProxy, } } } @@ -179,6 +182,8 @@ impl From<&LocalProtocol> for TunnelConfigProtocol { | LocalProtocol::Socks5 { .. } | LocalProtocol::TProxyTcp { .. } | LocalProtocol::TProxyUdp { .. } + | LocalProtocol::HttpProxy { .. } + | LocalProtocol::ReverseHttpProxy { .. } | LocalProtocol::Unix { .. } => Self::Unknown, LocalProtocol::Tcp { .. } => Self::Tcp, LocalProtocol::Udp { .. } => Self::Udp, diff --git a/src/tunnel/mod.rs b/src/tunnel/mod.rs index a044160..a322444 100644 --- a/src/tunnel/mod.rs +++ b/src/tunnel/mod.rs @@ -41,6 +41,7 @@ impl JwtTunnelConfig { LocalProtocol::Udp { .. } => dest.protocol.clone(), LocalProtocol::Stdio => LocalProtocol::Tcp { proxy_protocol: false }, LocalProtocol::Socks5 { .. } => LocalProtocol::Tcp { proxy_protocol: false }, + LocalProtocol::HttpProxy { .. } => dest.protocol.clone(), LocalProtocol::ReverseTcp => LocalProtocol::ReverseTcp, LocalProtocol::ReverseUdp { .. } => dest.protocol.clone(), LocalProtocol::ReverseSocks5 { .. } => dest.protocol.clone(), @@ -48,6 +49,7 @@ impl JwtTunnelConfig { LocalProtocol::TProxyUdp { timeout } => LocalProtocol::Udp { timeout }, LocalProtocol::Unix { .. } => LocalProtocol::Tcp { proxy_protocol: false }, LocalProtocol::ReverseUnix { .. } => dest.protocol.clone(), + LocalProtocol::ReverseHttpProxy { .. } => dest.protocol.clone(), }, r: dest.host.to_string(), rp: dest.port, diff --git a/src/tunnel/server.rs b/src/tunnel/server.rs index f0f6004..b1718da 100644 --- a/src/tunnel/server.rs +++ b/src/tunnel/server.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use std::time::Duration; use super::{tunnel_to_jwt_token, JwtTunnelConfig, RemoteAddr, JWT_DECODE, JWT_HEADER_PREFIX}; -use crate::{socks5, tcp, tls, udp, LocalProtocol, TlsServerConfig, WsServerConfig}; +use crate::{http_proxy, socks5, tcp, tls, udp, LocalProtocol, TlsServerConfig, WsServerConfig}; use hyper::body::{Frame, Incoming}; use hyper::header::{CONTENT_TYPE, COOKIE, SEC_WEBSOCKET_PROTOCOL}; use hyper::http::HeaderValue; @@ -150,6 +150,25 @@ async fn run_tunnel( }; Ok((remote, Box::pin(local_rx), Box::pin(local_tx))) } + LocalProtocol::ReverseHttpProxy { timeout, credentials } => { + #[allow(clippy::type_complexity)] + static SERVERS: Lazy, u16), mpsc::Receiver<(TcpStream, (Host, u16))>>>> = + Lazy::new(|| Mutex::new(HashMap::with_capacity(0))); + + let remote_port = find_mapped_port(remote.port, restriction); + let local_srv = (remote.host, remote_port); + let bind = format!("{}:{}", local_srv.0, local_srv.1); + let listening_server = http_proxy::run_server(bind.parse()?, timeout, credentials); + let (stream, local_srv) = run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?; + let (local_rx, local_tx) = tokio::io::split(stream); + + let remote = RemoteAddr { + protocol: LocalProtocol::Tcp { proxy_protocol: false }, + host: local_srv.0, + port: local_srv.1, + }; + Ok((remote, Box::pin(local_rx), Box::pin(local_tx))) + } #[cfg(unix)] LocalProtocol::ReverseUnix { ref path } => { use crate::unix_socket; @@ -181,6 +200,7 @@ async fn run_tunnel( | LocalProtocol::Socks5 { .. } | LocalProtocol::TProxyTcp | LocalProtocol::TProxyUdp { .. } + | LocalProtocol::HttpProxy { .. } | LocalProtocol::Unix { .. } => { error!("Received an unsupported target protocol {:?}", remote); Err(anyhow::anyhow!("Invalid upgrade request")) @@ -572,7 +592,10 @@ async fn ws_server_upgrade( .instrument(Span::current()), ); - if matches!(req_protocol, LocalProtocol::ReverseSocks5 { .. }) { + if matches!( + req_protocol, + LocalProtocol::ReverseSocks5 { .. } | LocalProtocol::ReverseHttpProxy { .. } + ) { let Ok(header_val) = HeaderValue::from_str(&tunnel_to_jwt_token(Uuid::from_u128(0), &remote_addr)) else { error!("Bad headervalue for reverse socks5: {} {}", remote_addr.host, remote_addr.port); return http::Response::builder()