From c265c219f01f6403b38953ca435d4c739a5b78ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=A3rebe=20-=20Romain=20GERARD?= Date: Sat, 21 Oct 2023 12:04:10 +0200 Subject: [PATCH] Add http proxy support Former-commit-id: 74f6da15e45f5ffbe6f37f9a25b27c46184252ba [formerly 65d0871b161bfc90497160a0c1525e11e6f4818d] [formerly b41ffebb30a1cca421b07c359c4943e5181ce7a7 [formerly 1c82b9fc05420e11b0cb68597e51cb3abdfec7aa]] Former-commit-id: f15f736126be3b296a00366b9f6ecbebeee5c399 [formerly ca183f028e8cee92084aaa87bab0bd73962ab50d] Former-commit-id: 31e45ab64416517af7d140020852e9bdaea8939d Former-commit-id: 7bbedcca5773d9746a7d3c6b6ef36056d3f1868e Former-commit-id: 4ae8aaf27b45c3e9ff0035b4cf6d2471c62c62b2 Former-commit-id: 67267b9ecc72ffb52e26aa6304e41f21ce0c7e52 [formerly 36adc769ecba20c66a6ec9d0243fe74a13352a47] Former-commit-id: 56d37bd430a18e1f7f03ad2fc196aaeee4239745 --- src/main.rs | 34 ++++++++++++++++++--- src/tcp.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++++-- src/transport.rs | 25 +++++++++++----- 3 files changed, 124 insertions(+), 13 deletions(-) diff --git a/src/main.rs b/src/main.rs index 90cab85..b288a57 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,13 +13,14 @@ use futures_util::{pin_mut, stream, Stream, StreamExt, TryStreamExt}; use hyper::http::HeaderValue; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap}; -use std::io; +use std::fmt::{Debug, Formatter}; use std::io::ErrorKind; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::{fmt, io}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls::rustls::server::DnsName; @@ -71,6 +72,15 @@ struct Client { #[arg(long, verbatim_doc_comment)] tls_verify_certificate: bool, + /// If set, will use this http proxy to connect to the server + #[arg( + short = 'p', + long, + value_name = "http://USER:PASS@HOST:PORT", + verbatim_doc_comment + )] + http_proxy: Option, + /// Use a specific prefix that will show up in the http path during the upgrade request. /// Useful if you need to route requests server side but don't have vhosts #[arg(long, default_value = "morille", verbatim_doc_comment)] @@ -400,7 +410,7 @@ pub struct TlsServerConfig { pub tls_key: PrivateKey, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct WsServerConfig { pub socket_so_mark: Option, pub bind: SocketAddr, @@ -411,6 +421,20 @@ pub struct WsServerConfig { pub tls: Option, } +impl Debug for WsServerConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("WsServerConfig") + .field("socket_so_mark", &self.socket_so_mark) + .field("bind", &self.bind) + .field("restrict_to", &self.restrict_to) + .field("websocket_ping_frequency", &self.websocket_ping_frequency) + .field("timeout_connect", &self.timeout_connect) + .field("websocket_mask_frame", &self.websocket_mask_frame) + .field("tls", &self.tls.is_some()) + .finish() + } +} + #[derive(Clone, Debug)] pub struct WsClientConfig { pub remote_addr: (Host, u16), @@ -421,6 +445,7 @@ pub struct WsClientConfig { pub timeout_connect: Duration, pub websocket_ping_frequency: Duration, pub websocket_mask_frame: bool, + pub http_proxy: Option, } impl WsClientConfig { @@ -490,7 +515,7 @@ async fn main() { _ => panic!("invalid scheme in server url {}", args.remote_addr.scheme()), }; - let server_config = Arc::new(WsClientConfig { + let client_config = Arc::new(WsClientConfig { remote_addr: ( args.remote_addr.host().unwrap().to_owned(), args.remote_addr.port_or_known_default().unwrap(), @@ -504,11 +529,12 @@ async fn main() { .websocket_ping_frequency_sec .unwrap_or(Duration::from_secs(30)), websocket_mask_frame: args.websocket_mask_frame, + http_proxy: args.http_proxy, }); // Start tunnels for tunnel in args.local_to_remote.into_iter() { - let server_config = server_config.clone(); + let server_config = client_config.clone(); match &tunnel.local_protocol { LocalProtocol::Tcp => { diff --git a/src/tcp.rs b/src/tcp.rs index 3014d1e..7d5bdac 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -1,14 +1,16 @@ use anyhow::{anyhow, Context}; use std::{io, vec}; +use base64::Engine; use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; use tokio::time::timeout; use tokio_stream::wrappers::TcpListenerStream; use tracing::debug; use tracing::log::info; -use url::Host; +use url::{Host, Url}; fn configure_socket(socket: &mut TcpSocket, so_mark: &Option) -> Result<(), anyhow::Error> { socket.set_nodelay(true).with_context(|| { @@ -42,6 +44,7 @@ fn configure_socket(socket: &mut TcpSocket, so_mark: &Option) -> Result<(), Ok(()) } + pub async fn connect( host: &Host, port: u16, @@ -50,7 +53,6 @@ pub async fn connect( ) -> Result { info!("Opening TCP connection to {}:{}", host, port); - // TODO: Avoid allocation of vec by extracting the code that does the connection in a separate function let socket_addrs: Vec = match host { Host::Domain(domain) => tokio::net::lookup_host(format!("{}:{}", domain, port)) .await @@ -101,6 +103,78 @@ pub async fn connect( } } +pub async fn connect_with_http_proxy( + proxy: &Url, + host: &Host, + port: u16, + so_mark: &Option, + connect_timeout: Duration, +) -> Result { + let proxy_host = proxy.host().context("Cannot parse proxy host")?; + let proxy_port = proxy.port_or_known_default().unwrap_or(80); + + let mut socket = connect(&host.to_owned(), proxy_port, so_mark, connect_timeout).await?; + info!("Connected to http proxy {}:{}", proxy_host, proxy_port); + + let authorization = + if let Some((user, password)) = proxy.password().map(|p| (proxy.username(), p)) { + let creds = + base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, password)); + format!("Proxy-Authorization: Basic {}\r\n", creds) + } else { + "".to_string() + }; + + let connect_request = + format!("CONNECT {host}:{port} HTTP/1.0\r\nHost: {host}:{port}\r\n{authorization}\r\n"); + socket + .write_all(connect_request.trim_start().as_bytes()) + .await?; + + let mut buf = [0u8; 8096]; + let mut needle = 0; + loop { + let nb_bytes = tokio::time::timeout(connect_timeout, socket.read(&mut buf[needle..])).await; + let nb_bytes = match nb_bytes { + Ok(Ok(nb_bytes)) => { + if nb_bytes == 0 { + return Err(anyhow!( + "Cannot connect to http proxy. Proxy closed the connection without returning any response" )); + } else { + nb_bytes + } + } + Ok(Err(err)) => { + return Err(anyhow!("Cannot connect to http proxy. {err}")); + } + Err(_) => { + return Err(anyhow!( + "Cannot connect to http proxy. Proxy took too long to connect" + )); + } + }; + + needle += nb_bytes; + if buf[..needle].windows(4).any(|window| window == b"\r\n\r\n") { + break; + } + } + + let ok_response = b"HTTP/1.0 200"; + if !buf + .windows(ok_response.len()) + .any(|window| window == ok_response) + { + return Err(anyhow!( + "Cannot connect to http proxy. Proxy returned an invalid response: {}", + String::from_utf8_lossy(&buf[..needle]) + )); + } + + info!("http proxy connected to remote host {}:{}", host, port); + Ok(socket) +} + pub async fn run_server(bind: SocketAddr) -> Result { info!("Starting TCP server listening cnx on {}", bind); diff --git a/src/transport.rs b/src/transport.rs index 28896db..3c2b9a3 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -71,13 +71,24 @@ pub async fn connect( tunnel_cfg: &LocalToRemote, ) -> anyhow::Result> { let (host, port) = &server_cfg.remote_addr; - let tcp_stream = tcp::connect( - host, - *port, - &tunnel_cfg.socket_so_mark, - server_cfg.timeout_connect, - ) - .await?; + let tcp_stream = if let Some(http_proxy) = &server_cfg.http_proxy { + tcp::connect_with_http_proxy( + http_proxy, + host, + *port, + &tunnel_cfg.socket_so_mark, + server_cfg.timeout_connect, + ) + .await? + } else { + tcp::connect( + host, + *port, + &tunnel_cfg.socket_so_mark, + server_cfg.timeout_connect, + ) + .await? + }; let data = JwtTunnelConfig { id: request_id.to_string(),