2024-01-07 17:37:50 +00:00
|
|
|
use super::{tunnel_to_jwt_token, JwtTunnelConfig, RemoteAddr, JWT_DECODE, JWT_HEADER_PREFIX};
|
|
|
|
use crate::WsClientConfig;
|
2023-10-23 17:11:12 +00:00
|
|
|
use anyhow::{anyhow, Context};
|
|
|
|
|
2023-12-15 08:47:06 +00:00
|
|
|
use bytes::Bytes;
|
2023-10-23 17:11:12 +00:00
|
|
|
use fastwebsockets::WebSocket;
|
2023-10-28 13:55:14 +00:00
|
|
|
use futures_util::pin_mut;
|
2023-12-15 08:47:06 +00:00
|
|
|
use http_body_util::Empty;
|
|
|
|
use hyper::body::Incoming;
|
2023-12-26 20:16:34 +00:00
|
|
|
use hyper::header::{AUTHORIZATION, COOKIE, SEC_WEBSOCKET_PROTOCOL, SEC_WEBSOCKET_VERSION, UPGRADE};
|
2023-10-23 17:11:12 +00:00
|
|
|
use hyper::header::{CONNECTION, HOST, SEC_WEBSOCKET_KEY};
|
|
|
|
use hyper::upgrade::Upgraded;
|
2023-12-15 08:47:06 +00:00
|
|
|
use hyper::{Request, Response};
|
|
|
|
use hyper_util::rt::{TokioExecutor, TokioIo};
|
2024-01-07 17:37:50 +00:00
|
|
|
use jsonwebtoken::TokenData;
|
2023-10-23 17:11:12 +00:00
|
|
|
use std::future::Future;
|
|
|
|
use std::ops::{Deref, DerefMut};
|
2023-10-28 13:55:14 +00:00
|
|
|
use std::sync::Arc;
|
2023-10-23 17:11:12 +00:00
|
|
|
use tokio::io::{AsyncRead, AsyncWrite};
|
|
|
|
use tokio::sync::oneshot;
|
2023-10-28 13:55:14 +00:00
|
|
|
use tokio_stream::{Stream, StreamExt};
|
2023-10-23 17:11:12 +00:00
|
|
|
use tracing::log::debug;
|
2023-10-28 13:55:14 +00:00
|
|
|
use tracing::{error, span, Instrument, Level, Span};
|
2024-01-07 17:37:50 +00:00
|
|
|
use url::Host;
|
2023-10-23 17:11:12 +00:00
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
|
pub async fn connect(
|
|
|
|
request_id: Uuid,
|
|
|
|
client_cfg: &WsClientConfig,
|
2024-01-07 17:37:50 +00:00
|
|
|
dest_addr: &RemoteAddr,
|
2023-12-15 08:47:06 +00:00
|
|
|
) -> anyhow::Result<(WebSocket<TokioIo<Upgraded>>, Response<Incoming>)> {
|
2023-10-28 13:55:14 +00:00
|
|
|
let mut pooled_cnx = match client_cfg.cnx_pool().get().await {
|
2024-01-07 17:37:50 +00:00
|
|
|
Ok(cnx) => Ok(cnx),
|
|
|
|
Err(err) => Err(anyhow!("failed to get a connection to the server from the pool: {err:?}")),
|
|
|
|
}?;
|
2023-10-23 17:11:12 +00:00
|
|
|
|
|
|
|
let mut req = Request::builder()
|
|
|
|
.method("GET")
|
2023-12-26 20:16:34 +00:00
|
|
|
.uri(format!("/{}/events", &client_cfg.http_upgrade_path_prefix,))
|
2023-10-27 07:15:15 +00:00
|
|
|
.header(HOST, &client_cfg.http_header_host)
|
2023-10-23 17:11:12 +00:00
|
|
|
.header(UPGRADE, "websocket")
|
|
|
|
.header(CONNECTION, "upgrade")
|
|
|
|
.header(SEC_WEBSOCKET_KEY, fastwebsockets::handshake::generate_key())
|
|
|
|
.header(SEC_WEBSOCKET_VERSION, "13")
|
2023-12-26 20:16:34 +00:00
|
|
|
.header(
|
|
|
|
SEC_WEBSOCKET_PROTOCOL,
|
2024-01-07 17:37:50 +00:00
|
|
|
format!("v1, {}{}", JWT_HEADER_PREFIX, tunnel_to_jwt_token(request_id, dest_addr)),
|
2023-12-26 20:16:34 +00:00
|
|
|
)
|
2023-10-23 17:11:12 +00:00
|
|
|
.version(hyper::Version::HTTP_11);
|
|
|
|
|
|
|
|
for (k, v) in &client_cfg.http_headers {
|
2023-10-26 20:04:41 +00:00
|
|
|
req = req.header(k, v);
|
2023-10-26 16:23:54 +00:00
|
|
|
}
|
2023-10-23 17:11:12 +00:00
|
|
|
if let Some(auth) = &client_cfg.http_upgrade_credentials {
|
2023-10-26 20:04:41 +00:00
|
|
|
req = req.header(AUTHORIZATION, auth);
|
2023-10-23 17:11:12 +00:00
|
|
|
}
|
|
|
|
|
2023-12-15 08:47:06 +00:00
|
|
|
let req = req.body(Empty::<Bytes>::new()).with_context(|| {
|
2023-10-23 17:11:12 +00:00
|
|
|
format!(
|
|
|
|
"failed to build HTTP request to contact the server {:?}",
|
|
|
|
client_cfg.remote_addr
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
debug!("with HTTP upgrade request {:?}", req);
|
2023-10-28 13:55:14 +00:00
|
|
|
let transport = pooled_cnx.deref_mut().take().unwrap();
|
2023-12-15 08:47:06 +00:00
|
|
|
let (ws, response) = fastwebsockets::handshake::client(&TokioExecutor::new(), req, transport)
|
2023-10-28 13:55:14 +00:00
|
|
|
.await
|
2023-10-30 07:13:38 +00:00
|
|
|
.with_context(|| format!("failed to do websocket handshake with the server {:?}", client_cfg.remote_addr))?;
|
2023-10-23 17:11:12 +00:00
|
|
|
|
2023-12-01 21:25:01 +00:00
|
|
|
Ok((ws, response))
|
2023-10-23 17:11:12 +00:00
|
|
|
}
|
|
|
|
|
2023-10-28 13:55:14 +00:00
|
|
|
async fn connect_to_server<R, W>(
|
2023-10-23 17:11:12 +00:00
|
|
|
request_id: Uuid,
|
|
|
|
client_cfg: &WsClientConfig,
|
2024-01-07 17:37:50 +00:00
|
|
|
remote_cfg: &RemoteAddr,
|
2023-10-23 17:11:12 +00:00
|
|
|
duplex_stream: (R, W),
|
|
|
|
) -> anyhow::Result<()>
|
|
|
|
where
|
|
|
|
R: AsyncRead + Send + 'static,
|
|
|
|
W: AsyncWrite + Send + 'static,
|
|
|
|
{
|
2023-12-01 21:25:01 +00:00
|
|
|
let (mut ws, _) = connect(request_id, client_cfg, remote_cfg).await?;
|
2023-10-23 17:11:12 +00:00
|
|
|
ws.set_auto_apply_mask(client_cfg.websocket_mask_frame);
|
|
|
|
|
|
|
|
let (ws_rx, ws_tx) = ws.split(tokio::io::split);
|
|
|
|
let (local_rx, local_tx) = duplex_stream;
|
|
|
|
let (close_tx, close_rx) = oneshot::channel::<()>();
|
|
|
|
|
|
|
|
// Forward local tx to websocket tx
|
|
|
|
let ping_frequency = client_cfg.websocket_ping_frequency;
|
2023-11-09 16:18:38 +00:00
|
|
|
tokio::spawn(
|
|
|
|
super::io::propagate_read(local_rx, ws_tx, close_tx, Some(ping_frequency)).instrument(Span::current()),
|
|
|
|
);
|
2023-10-23 17:11:12 +00:00
|
|
|
|
|
|
|
// Forward websocket rx to local rx
|
|
|
|
let _ = super::io::propagate_write(local_tx, ws_rx, close_rx).await;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2023-10-28 13:55:14 +00:00
|
|
|
|
2024-01-07 17:37:50 +00:00
|
|
|
pub async fn run_tunnel<T, R, W>(client_config: Arc<WsClientConfig>, incoming_cnx: T) -> anyhow::Result<()>
|
2023-10-28 13:55:14 +00:00
|
|
|
where
|
2024-01-07 17:37:50 +00:00
|
|
|
T: Stream<Item = anyhow::Result<((R, W), RemoteAddr)>>,
|
2023-10-28 13:55:14 +00:00
|
|
|
R: AsyncRead + Send + 'static,
|
|
|
|
W: AsyncWrite + Send + 'static,
|
|
|
|
{
|
|
|
|
pin_mut!(incoming_cnx);
|
2024-01-07 17:37:50 +00:00
|
|
|
while let Some(Ok((cnx_stream, remote_addr))) = incoming_cnx.next().await {
|
2023-10-28 13:55:14 +00:00
|
|
|
let request_id = Uuid::now_v7();
|
|
|
|
let span = span!(
|
|
|
|
Level::INFO,
|
|
|
|
"tunnel",
|
|
|
|
id = request_id.to_string(),
|
2024-01-07 17:37:50 +00:00
|
|
|
remote = format!("{}:{}", remote_addr.host, remote_addr.port)
|
2023-10-28 13:55:14 +00:00
|
|
|
);
|
|
|
|
let client_config = client_config.clone();
|
|
|
|
|
|
|
|
let tunnel = async move {
|
2024-01-07 17:37:50 +00:00
|
|
|
let _ = connect_to_server(request_id, &client_config, &remote_addr, cnx_stream)
|
2023-10-28 13:55:14 +00:00
|
|
|
.await
|
|
|
|
.map_err(|err| error!("{:?}", err));
|
|
|
|
}
|
|
|
|
.instrument(span);
|
|
|
|
|
|
|
|
tokio::spawn(tunnel);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2023-11-26 14:47:49 +00:00
|
|
|
|
2023-11-26 17:22:28 +00:00
|
|
|
pub async fn run_reverse_tunnel<F, Fut, T>(
|
2023-11-26 14:47:49 +00:00
|
|
|
client_config: Arc<WsClientConfig>,
|
2024-01-07 17:37:50 +00:00
|
|
|
remote_addr: RemoteAddr,
|
2023-11-26 17:22:28 +00:00
|
|
|
connect_to_dest: F,
|
|
|
|
) -> anyhow::Result<()>
|
|
|
|
where
|
2024-01-07 17:37:50 +00:00
|
|
|
F: Fn(Option<RemoteAddr>) -> Fut,
|
2023-11-26 17:22:28 +00:00
|
|
|
Fut: Future<Output = anyhow::Result<T>>,
|
|
|
|
T: AsyncRead + AsyncWrite + Send + 'static,
|
|
|
|
{
|
2023-11-26 14:47:49 +00:00
|
|
|
loop {
|
|
|
|
let client_config = client_config.clone();
|
|
|
|
let request_id = Uuid::now_v7();
|
|
|
|
let span = span!(
|
|
|
|
Level::INFO,
|
|
|
|
"tunnel",
|
|
|
|
id = request_id.to_string(),
|
2024-01-07 17:37:50 +00:00
|
|
|
remote = format!("{}:{}", remote_addr.host, remote_addr.port)
|
2023-11-26 14:47:49 +00:00
|
|
|
);
|
|
|
|
let _span = span.enter();
|
|
|
|
|
|
|
|
// Correctly configure tunnel cfg
|
2024-01-07 17:37:50 +00:00
|
|
|
let (mut ws, response) = connect(request_id, &client_config, &remote_addr)
|
2023-11-26 14:47:49 +00:00
|
|
|
.instrument(span.clone())
|
|
|
|
.await?;
|
|
|
|
ws.set_auto_apply_mask(client_config.websocket_mask_frame);
|
|
|
|
|
|
|
|
// Connect to endpoint
|
2023-12-04 17:21:55 +00:00
|
|
|
let remote = response
|
2023-12-01 21:25:01 +00:00
|
|
|
.headers()
|
|
|
|
.get(COOKIE)
|
2023-12-04 17:21:55 +00:00
|
|
|
.and_then(|h| h.to_str().ok())
|
2024-01-07 17:37:50 +00:00
|
|
|
.and_then(|h| {
|
|
|
|
let (validation, decode_key) = JWT_DECODE.deref();
|
|
|
|
let jwt: Option<TokenData<JwtTunnelConfig>> = jsonwebtoken::decode(h, decode_key, validation).ok();
|
|
|
|
jwt
|
2023-12-01 21:25:01 +00:00
|
|
|
})
|
2024-01-07 17:37:50 +00:00
|
|
|
.map(|jwt| RemoteAddr {
|
|
|
|
protocol: jwt.claims.p,
|
|
|
|
host: Host::parse(&jwt.claims.r).unwrap_or_else(|_| Host::Domain(String::new())),
|
|
|
|
port: jwt.claims.rp,
|
|
|
|
});
|
2023-11-26 14:47:49 +00:00
|
|
|
|
2024-01-07 17:37:50 +00:00
|
|
|
let stream = match connect_to_dest(remote).instrument(span.clone()).await {
|
2023-11-26 14:47:49 +00:00
|
|
|
Ok(s) => s,
|
|
|
|
Err(err) => {
|
2024-01-07 17:37:50 +00:00
|
|
|
error!("Cannot connect to xxxx: {err:?}");
|
2023-11-26 14:47:49 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let (local_rx, local_tx) = tokio::io::split(stream);
|
|
|
|
let (ws_rx, ws_tx) = ws.split(tokio::io::split);
|
|
|
|
let (close_tx, close_rx) = oneshot::channel::<()>();
|
|
|
|
|
|
|
|
let tunnel = async move {
|
|
|
|
let ping_frequency = client_config.websocket_ping_frequency;
|
|
|
|
tokio::spawn(
|
|
|
|
super::io::propagate_read(local_rx, ws_tx, close_tx, Some(ping_frequency)).instrument(Span::current()),
|
|
|
|
);
|
|
|
|
|
|
|
|
// Forward websocket rx to local rx
|
|
|
|
let _ = super::io::propagate_write(local_tx, ws_rx, close_rx).await;
|
|
|
|
}
|
|
|
|
.instrument(span.clone());
|
|
|
|
tokio::spawn(tunnel);
|
|
|
|
}
|
|
|
|
}
|