Add support for unix socket
This commit is contained in:
parent
dc4eadb8f9
commit
10f15d1225
4 changed files with 186 additions and 9 deletions
91
src/main.rs
91
src/main.rs
|
@ -7,6 +7,8 @@ mod tcp;
|
||||||
mod tls;
|
mod tls;
|
||||||
mod tunnel;
|
mod tunnel;
|
||||||
mod udp;
|
mod udp;
|
||||||
|
#[cfg(unix)]
|
||||||
|
mod unix_socket;
|
||||||
|
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
|
@ -258,7 +260,7 @@ struct Server {
|
||||||
tls_private_key: Option<PathBuf>,
|
tls_private_key: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
enum LocalProtocol {
|
enum LocalProtocol {
|
||||||
Tcp { proxy_protocol: bool },
|
Tcp { proxy_protocol: bool },
|
||||||
Udp { timeout: Option<Duration> },
|
Udp { timeout: Option<Duration> },
|
||||||
|
@ -269,6 +271,8 @@ enum LocalProtocol {
|
||||||
ReverseTcp,
|
ReverseTcp,
|
||||||
ReverseUdp { timeout: Option<Duration> },
|
ReverseUdp { timeout: Option<Duration> },
|
||||||
ReverseSocks5,
|
ReverseSocks5,
|
||||||
|
ReverseUnix { path: PathBuf },
|
||||||
|
Unix { path: PathBuf },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -392,6 +396,22 @@ fn parse_tunnel_arg(arg: &str) -> Result<LocalToRemote, io::Error> {
|
||||||
remote: (dest_host, dest_port),
|
remote: (dest_host, dest_port),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
"unix:/" => {
|
||||||
|
let Some((path, remote)) = arg[7..].split_once(':') else {
|
||||||
|
return Err(Error::new(
|
||||||
|
ErrorKind::InvalidInput,
|
||||||
|
format!("cannot parse unix socket path from {}", arg),
|
||||||
|
));
|
||||||
|
};
|
||||||
|
let (dest_host, dest_port, _options) = parse_tunnel_dest(remote)?;
|
||||||
|
Ok(LocalToRemote {
|
||||||
|
local_protocol: LocalProtocol::Unix {
|
||||||
|
path: PathBuf::from(path),
|
||||||
|
},
|
||||||
|
local: SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0)),
|
||||||
|
remote: (dest_host, dest_port),
|
||||||
|
})
|
||||||
|
}
|
||||||
_ => match &arg[..8] {
|
_ => match &arg[..8] {
|
||||||
"socks5:/" => {
|
"socks5:/" => {
|
||||||
let (local_bind, remaining) = parse_local_bind(&arg[9..])?;
|
let (local_bind, remaining) = parse_local_bind(&arg[9..])?;
|
||||||
|
@ -800,7 +820,44 @@ async fn main() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
_ => panic!("Invalid protocol for reverse tunnel"),
|
LocalProtocol::Unix { path } => {
|
||||||
|
let path = path.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let remote = tunnel.remote.clone();
|
||||||
|
let cfg = client_config.clone();
|
||||||
|
let connect_to_dest = |_| async {
|
||||||
|
tcp::connect(
|
||||||
|
&remote.0,
|
||||||
|
remote.1,
|
||||||
|
cfg.socket_so_mark,
|
||||||
|
cfg.timeout_connect,
|
||||||
|
&cfg.dns_resolver,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
};
|
||||||
|
|
||||||
|
let (host, port) = to_host_port(tunnel.local);
|
||||||
|
let remote = RemoteAddr {
|
||||||
|
protocol: LocalProtocol::ReverseUnix { path: path.clone() },
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
};
|
||||||
|
if let Err(err) =
|
||||||
|
tunnel::client::run_reverse_tunnel(client_config, remote, connect_to_dest).await
|
||||||
|
{
|
||||||
|
error!("{:?}", err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
LocalProtocol::Stdio
|
||||||
|
| LocalProtocol::TProxyTcp
|
||||||
|
| LocalProtocol::TProxyUdp { .. }
|
||||||
|
| LocalProtocol::ReverseTcp
|
||||||
|
| LocalProtocol::ReverseUdp { .. }
|
||||||
|
| LocalProtocol::ReverseSocks5
|
||||||
|
| LocalProtocol::ReverseUnix { .. } => {
|
||||||
|
panic!("Invalid protocol for reverse tunnel");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -853,6 +910,35 @@ async fn main() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
#[cfg(unix)]
|
||||||
|
LocalProtocol::Unix { path } => {
|
||||||
|
let remote = tunnel.remote.clone();
|
||||||
|
let server = unix_socket::run_server(path)
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
panic!("Cannot start Unix domain server on {}: {}", tunnel.local, err)
|
||||||
|
})
|
||||||
|
.map_err(anyhow::Error::new)
|
||||||
|
.map_ok(move |stream| {
|
||||||
|
let remote = RemoteAddr {
|
||||||
|
protocol: LocalProtocol::Tcp { proxy_protocol: false },
|
||||||
|
host: remote.0.clone(),
|
||||||
|
port: remote.1,
|
||||||
|
};
|
||||||
|
(stream.into_split(), remote)
|
||||||
|
});
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(err) = tunnel::client::run_tunnel(client_config, server).await {
|
||||||
|
error!("{:?}", err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
LocalProtocol::Unix => {
|
||||||
|
panic!("Unix socket is not available for non Unix platform")
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
LocalProtocol::TProxyUdp { timeout } => {
|
LocalProtocol::TProxyUdp { timeout } => {
|
||||||
let timeout = *timeout;
|
let timeout = *timeout;
|
||||||
|
@ -951,6 +1037,7 @@ async fn main() {
|
||||||
LocalProtocol::ReverseTcp => {}
|
LocalProtocol::ReverseTcp => {}
|
||||||
LocalProtocol::ReverseUdp { .. } => {}
|
LocalProtocol::ReverseUdp { .. } => {}
|
||||||
LocalProtocol::ReverseSocks5 => {}
|
LocalProtocol::ReverseSocks5 => {}
|
||||||
|
LocalProtocol::ReverseUnix { .. } => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,15 +35,17 @@ impl JwtTunnelConfig {
|
||||||
Self {
|
Self {
|
||||||
id: request_id.to_string(),
|
id: request_id.to_string(),
|
||||||
p: match dest.protocol {
|
p: match dest.protocol {
|
||||||
LocalProtocol::Tcp { .. } => dest.protocol,
|
LocalProtocol::Tcp { .. } => dest.protocol.clone(),
|
||||||
LocalProtocol::Udp { .. } => dest.protocol,
|
LocalProtocol::Udp { .. } => dest.protocol.clone(),
|
||||||
LocalProtocol::Stdio => LocalProtocol::Tcp { proxy_protocol: false },
|
LocalProtocol::Stdio => LocalProtocol::Tcp { proxy_protocol: false },
|
||||||
LocalProtocol::Socks5 { .. } => LocalProtocol::Tcp { proxy_protocol: false },
|
LocalProtocol::Socks5 { .. } => LocalProtocol::Tcp { proxy_protocol: false },
|
||||||
LocalProtocol::ReverseTcp => LocalProtocol::ReverseTcp,
|
LocalProtocol::ReverseTcp => LocalProtocol::ReverseTcp,
|
||||||
LocalProtocol::ReverseUdp { .. } => dest.protocol,
|
LocalProtocol::ReverseUdp { .. } => dest.protocol.clone(),
|
||||||
LocalProtocol::ReverseSocks5 => LocalProtocol::ReverseSocks5,
|
LocalProtocol::ReverseSocks5 => LocalProtocol::ReverseSocks5,
|
||||||
LocalProtocol::TProxyTcp => LocalProtocol::Tcp { proxy_protocol: false },
|
LocalProtocol::TProxyTcp => LocalProtocol::Tcp { proxy_protocol: false },
|
||||||
LocalProtocol::TProxyUdp { timeout } => LocalProtocol::Udp { timeout },
|
LocalProtocol::TProxyUdp { timeout } => LocalProtocol::Udp { timeout },
|
||||||
|
LocalProtocol::Unix { .. } => LocalProtocol::Tcp { proxy_protocol: false },
|
||||||
|
LocalProtocol::ReverseUnix { .. } => dest.protocol.clone(),
|
||||||
},
|
},
|
||||||
r: dest.host.to_string(),
|
r: dest.host.to_string(),
|
||||||
rp: dest.port,
|
rp: dest.port,
|
||||||
|
|
|
@ -11,7 +11,7 @@ use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use super::{tunnel_to_jwt_token, JwtTunnelConfig, RemoteAddr, JWT_DECODE, JWT_HEADER_PREFIX};
|
use super::{tunnel_to_jwt_token, JwtTunnelConfig, RemoteAddr, JWT_DECODE, JWT_HEADER_PREFIX};
|
||||||
use crate::{socks5, tcp, tls, udp, LocalProtocol, TlsServerConfig, WsServerConfig};
|
use crate::{socks5, tcp, tls, udp, unix_socket, LocalProtocol, TlsServerConfig, WsServerConfig};
|
||||||
use hyper::body::Incoming;
|
use hyper::body::Incoming;
|
||||||
use hyper::header::{COOKIE, SEC_WEBSOCKET_PROTOCOL};
|
use hyper::header::{COOKIE, SEC_WEBSOCKET_PROTOCOL};
|
||||||
use hyper::http::HeaderValue;
|
use hyper::http::HeaderValue;
|
||||||
|
@ -26,7 +26,7 @@ use crate::socks5::Socks5Stream;
|
||||||
use crate::tunnel::tls_reloader::TlsReloader;
|
use crate::tunnel::tls_reloader::TlsReloader;
|
||||||
use crate::udp::UdpStream;
|
use crate::udp::UdpStream;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream, UnixStream};
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tokio_rustls::TlsAcceptor;
|
use tokio_rustls::TlsAcceptor;
|
||||||
|
@ -133,7 +133,37 @@ async fn run_tunnel(
|
||||||
};
|
};
|
||||||
Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
|
Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
|
||||||
}
|
}
|
||||||
_ => Err(anyhow::anyhow!("Invalid upgrade request")),
|
#[cfg(unix)]
|
||||||
|
LocalProtocol::ReverseUnix { ref path } => {
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
static SERVERS: Lazy<Mutex<HashMap<(Host<String>, u16), mpsc::Receiver<UnixStream>>>> =
|
||||||
|
Lazy::new(|| Mutex::new(HashMap::with_capacity(0)));
|
||||||
|
|
||||||
|
let local_srv = (Host::parse(&jwt.claims.r)?, jwt.claims.rp);
|
||||||
|
let listening_server = unix_socket::run_server(path);
|
||||||
|
let stream = run_listening_server(&local_srv, SERVERS.deref(), listening_server).await?;
|
||||||
|
let (local_rx, local_tx) = stream.into_split();
|
||||||
|
|
||||||
|
let remote = RemoteAddr {
|
||||||
|
protocol: jwt.claims.p.clone(),
|
||||||
|
host: local_srv.0,
|
||||||
|
port: local_srv.1,
|
||||||
|
};
|
||||||
|
Ok((remote, Box::pin(local_rx), Box::pin(local_tx)))
|
||||||
|
}
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
LocalProtocol::ReverseUnix { ref path } => {
|
||||||
|
error!("Received an unsupported target protocol {:?}", jwt.claims);
|
||||||
|
Err(anyhow::anyhow!("Invalid upgrade request"))
|
||||||
|
}
|
||||||
|
LocalProtocol::Stdio
|
||||||
|
| LocalProtocol::Socks5 { .. }
|
||||||
|
| LocalProtocol::TProxyTcp
|
||||||
|
| LocalProtocol::TProxyUdp { .. }
|
||||||
|
| LocalProtocol::Unix { .. } => {
|
||||||
|
error!("Received an unsupported target protocol {:?}", jwt.claims);
|
||||||
|
Err(anyhow::anyhow!("Invalid upgrade request"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,7 +363,7 @@ async fn server_upgrade(
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
let req_protocol = jwt.claims.p;
|
let req_protocol = jwt.claims.p.clone();
|
||||||
let tunnel = match run_tunnel(&server_config, jwt, client_addr).await {
|
let tunnel = match run_tunnel(&server_config, jwt, client_addr).await {
|
||||||
Ok(ret) => ret,
|
Ok(ret) => ret,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
|
58
src/unix_socket.rs
Normal file
58
src/unix_socket.rs
Normal file
|
@ -0,0 +1,58 @@
|
||||||
|
use anyhow::Context;
|
||||||
|
use futures_util::Stream;
|
||||||
|
use std::io;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::Poll;
|
||||||
|
use tokio::net::{UnixListener, UnixStream};
|
||||||
|
use tracing::log::info;
|
||||||
|
|
||||||
|
pub struct UnixListenerStream {
|
||||||
|
inner: UnixListener,
|
||||||
|
path_to_delete: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UnixListenerStream {
|
||||||
|
pub fn new(listener: UnixListener, path_to_delete: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: listener,
|
||||||
|
path_to_delete,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for UnixListenerStream {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.path_to_delete {
|
||||||
|
let Ok(addr) = &self.inner.local_addr() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Some(path) = addr.as_pathname() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for UnixListenerStream {
|
||||||
|
type Item = io::Result<UnixStream>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<io::Result<UnixStream>>> {
|
||||||
|
match self.inner.poll_accept(cx) {
|
||||||
|
Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
|
||||||
|
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run_server(socket_path: &Path) -> Result<UnixListenerStream, anyhow::Error> {
|
||||||
|
info!("Starting Unix socket server listening cnx on {:?}", socket_path);
|
||||||
|
|
||||||
|
let path_to_delete = socket_path.exists();
|
||||||
|
let listener = UnixListener::bind(socket_path)
|
||||||
|
.with_context(|| format!("Cannot create Unix socket server {:?}", socket_path))?;
|
||||||
|
|
||||||
|
Ok(UnixListenerStream::new(listener, path_to_delete))
|
||||||
|
}
|
Loading…
Reference in a new issue