Add http proxy support
Former-commit-id: 74f6da15e45f5ffbe6f37f9a25b27c46184252ba [formerly 65d0871b161bfc90497160a0c1525e11e6f4818d] [formerly b41ffebb30a1cca421b07c359c4943e5181ce7a7 [formerly 1c82b9fc05420e11b0cb68597e51cb3abdfec7aa]] Former-commit-id: f15f736126be3b296a00366b9f6ecbebeee5c399 [formerly ca183f028e8cee92084aaa87bab0bd73962ab50d] Former-commit-id: 31e45ab64416517af7d140020852e9bdaea8939d Former-commit-id: 7bbedcca5773d9746a7d3c6b6ef36056d3f1868e Former-commit-id: 4ae8aaf27b45c3e9ff0035b4cf6d2471c62c62b2 Former-commit-id: 67267b9ecc72ffb52e26aa6304e41f21ce0c7e52 [formerly 36adc769ecba20c66a6ec9d0243fe74a13352a47] Former-commit-id: 56d37bd430a18e1f7f03ad2fc196aaeee4239745
This commit is contained in:
parent
ce9ced6307
commit
c265c219f0
3 changed files with 124 additions and 13 deletions
34
src/main.rs
34
src/main.rs
|
@ -13,13 +13,14 @@ use futures_util::{pin_mut, stream, Stream, StreamExt, TryStreamExt};
|
|||
use hyper::http::HeaderValue;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::io;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::io::ErrorKind;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4};
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{fmt, io};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use tokio_rustls::rustls::server::DnsName;
|
||||
|
@ -71,6 +72,15 @@ struct Client {
|
|||
#[arg(long, verbatim_doc_comment)]
|
||||
tls_verify_certificate: bool,
|
||||
|
||||
/// If set, will use this http proxy to connect to the server
|
||||
#[arg(
|
||||
short = 'p',
|
||||
long,
|
||||
value_name = "http://USER:PASS@HOST:PORT",
|
||||
verbatim_doc_comment
|
||||
)]
|
||||
http_proxy: Option<Url>,
|
||||
|
||||
/// Use a specific prefix that will show up in the http path during the upgrade request.
|
||||
/// Useful if you need to route requests server side but don't have vhosts
|
||||
#[arg(long, default_value = "morille", verbatim_doc_comment)]
|
||||
|
@ -400,7 +410,7 @@ pub struct TlsServerConfig {
|
|||
pub tls_key: PrivateKey,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone)]
|
||||
pub struct WsServerConfig {
|
||||
pub socket_so_mark: Option<i32>,
|
||||
pub bind: SocketAddr,
|
||||
|
@ -411,6 +421,20 @@ pub struct WsServerConfig {
|
|||
pub tls: Option<TlsServerConfig>,
|
||||
}
|
||||
|
||||
impl Debug for WsServerConfig {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("WsServerConfig")
|
||||
.field("socket_so_mark", &self.socket_so_mark)
|
||||
.field("bind", &self.bind)
|
||||
.field("restrict_to", &self.restrict_to)
|
||||
.field("websocket_ping_frequency", &self.websocket_ping_frequency)
|
||||
.field("timeout_connect", &self.timeout_connect)
|
||||
.field("websocket_mask_frame", &self.websocket_mask_frame)
|
||||
.field("tls", &self.tls.is_some())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct WsClientConfig {
|
||||
pub remote_addr: (Host<String>, u16),
|
||||
|
@ -421,6 +445,7 @@ pub struct WsClientConfig {
|
|||
pub timeout_connect: Duration,
|
||||
pub websocket_ping_frequency: Duration,
|
||||
pub websocket_mask_frame: bool,
|
||||
pub http_proxy: Option<Url>,
|
||||
}
|
||||
|
||||
impl WsClientConfig {
|
||||
|
@ -490,7 +515,7 @@ async fn main() {
|
|||
_ => panic!("invalid scheme in server url {}", args.remote_addr.scheme()),
|
||||
};
|
||||
|
||||
let server_config = Arc::new(WsClientConfig {
|
||||
let client_config = Arc::new(WsClientConfig {
|
||||
remote_addr: (
|
||||
args.remote_addr.host().unwrap().to_owned(),
|
||||
args.remote_addr.port_or_known_default().unwrap(),
|
||||
|
@ -504,11 +529,12 @@ async fn main() {
|
|||
.websocket_ping_frequency_sec
|
||||
.unwrap_or(Duration::from_secs(30)),
|
||||
websocket_mask_frame: args.websocket_mask_frame,
|
||||
http_proxy: args.http_proxy,
|
||||
});
|
||||
|
||||
// Start tunnels
|
||||
for tunnel in args.local_to_remote.into_iter() {
|
||||
let server_config = server_config.clone();
|
||||
let server_config = client_config.clone();
|
||||
|
||||
match &tunnel.local_protocol {
|
||||
LocalProtocol::Tcp => {
|
||||
|
|
78
src/tcp.rs
78
src/tcp.rs
|
@ -1,14 +1,16 @@
|
|||
use anyhow::{anyhow, Context};
|
||||
use std::{io, vec};
|
||||
|
||||
use base64::Engine;
|
||||
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpSocket, TcpStream};
|
||||
use tokio::time::timeout;
|
||||
use tokio_stream::wrappers::TcpListenerStream;
|
||||
use tracing::debug;
|
||||
use tracing::log::info;
|
||||
use url::Host;
|
||||
use url::{Host, Url};
|
||||
|
||||
fn configure_socket(socket: &mut TcpSocket, so_mark: &Option<i32>) -> Result<(), anyhow::Error> {
|
||||
socket.set_nodelay(true).with_context(|| {
|
||||
|
@ -42,6 +44,7 @@ fn configure_socket(socket: &mut TcpSocket, so_mark: &Option<i32>) -> Result<(),
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn connect(
|
||||
host: &Host<String>,
|
||||
port: u16,
|
||||
|
@ -50,7 +53,6 @@ pub async fn connect(
|
|||
) -> Result<TcpStream, anyhow::Error> {
|
||||
info!("Opening TCP connection to {}:{}", host, port);
|
||||
|
||||
// TODO: Avoid allocation of vec by extracting the code that does the connection in a separate function
|
||||
let socket_addrs: Vec<SocketAddr> = match host {
|
||||
Host::Domain(domain) => tokio::net::lookup_host(format!("{}:{}", domain, port))
|
||||
.await
|
||||
|
@ -101,6 +103,78 @@ pub async fn connect(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn connect_with_http_proxy(
|
||||
proxy: &Url,
|
||||
host: &Host<String>,
|
||||
port: u16,
|
||||
so_mark: &Option<i32>,
|
||||
connect_timeout: Duration,
|
||||
) -> Result<TcpStream, anyhow::Error> {
|
||||
let proxy_host = proxy.host().context("Cannot parse proxy host")?;
|
||||
let proxy_port = proxy.port_or_known_default().unwrap_or(80);
|
||||
|
||||
let mut socket = connect(&host.to_owned(), proxy_port, so_mark, connect_timeout).await?;
|
||||
info!("Connected to http proxy {}:{}", proxy_host, proxy_port);
|
||||
|
||||
let authorization =
|
||||
if let Some((user, password)) = proxy.password().map(|p| (proxy.username(), p)) {
|
||||
let creds =
|
||||
base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, password));
|
||||
format!("Proxy-Authorization: Basic {}\r\n", creds)
|
||||
} else {
|
||||
"".to_string()
|
||||
};
|
||||
|
||||
let connect_request =
|
||||
format!("CONNECT {host}:{port} HTTP/1.0\r\nHost: {host}:{port}\r\n{authorization}\r\n");
|
||||
socket
|
||||
.write_all(connect_request.trim_start().as_bytes())
|
||||
.await?;
|
||||
|
||||
let mut buf = [0u8; 8096];
|
||||
let mut needle = 0;
|
||||
loop {
|
||||
let nb_bytes = tokio::time::timeout(connect_timeout, socket.read(&mut buf[needle..])).await;
|
||||
let nb_bytes = match nb_bytes {
|
||||
Ok(Ok(nb_bytes)) => {
|
||||
if nb_bytes == 0 {
|
||||
return Err(anyhow!(
|
||||
"Cannot connect to http proxy. Proxy closed the connection without returning any response" ));
|
||||
} else {
|
||||
nb_bytes
|
||||
}
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
return Err(anyhow!("Cannot connect to http proxy. {err}"));
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(anyhow!(
|
||||
"Cannot connect to http proxy. Proxy took too long to connect"
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
needle += nb_bytes;
|
||||
if buf[..needle].windows(4).any(|window| window == b"\r\n\r\n") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let ok_response = b"HTTP/1.0 200";
|
||||
if !buf
|
||||
.windows(ok_response.len())
|
||||
.any(|window| window == ok_response)
|
||||
{
|
||||
return Err(anyhow!(
|
||||
"Cannot connect to http proxy. Proxy returned an invalid response: {}",
|
||||
String::from_utf8_lossy(&buf[..needle])
|
||||
));
|
||||
}
|
||||
|
||||
info!("http proxy connected to remote host {}:{}", host, port);
|
||||
Ok(socket)
|
||||
}
|
||||
|
||||
pub async fn run_server(bind: SocketAddr) -> Result<TcpListenerStream, anyhow::Error> {
|
||||
info!("Starting TCP server listening cnx on {}", bind);
|
||||
|
||||
|
|
|
@ -71,13 +71,24 @@ pub async fn connect(
|
|||
tunnel_cfg: &LocalToRemote,
|
||||
) -> anyhow::Result<WebSocket<Upgraded>> {
|
||||
let (host, port) = &server_cfg.remote_addr;
|
||||
let tcp_stream = tcp::connect(
|
||||
host,
|
||||
*port,
|
||||
&tunnel_cfg.socket_so_mark,
|
||||
server_cfg.timeout_connect,
|
||||
)
|
||||
.await?;
|
||||
let tcp_stream = if let Some(http_proxy) = &server_cfg.http_proxy {
|
||||
tcp::connect_with_http_proxy(
|
||||
http_proxy,
|
||||
host,
|
||||
*port,
|
||||
&tunnel_cfg.socket_so_mark,
|
||||
server_cfg.timeout_connect,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
tcp::connect(
|
||||
host,
|
||||
*port,
|
||||
&tunnel_cfg.socket_so_mark,
|
||||
server_cfg.timeout_connect,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
let data = JwtTunnelConfig {
|
||||
id: request_id.to_string(),
|
||||
|
|
Loading…
Reference in a new issue