wstunnel/src/tunnel/client.rs

187 lines
6.5 KiB
Rust
Raw Normal View History

2024-01-13 22:31:54 +00:00
use super::{JwtTunnelConfig, RemoteAddr, JWT_DECODE};
use crate::{tunnel, WsClientConfig};
2023-10-28 13:55:14 +00:00
use futures_util::pin_mut;
2024-01-13 22:31:54 +00:00
use hyper::header::COOKIE;
2024-01-07 17:37:50 +00:00
use jsonwebtoken::TokenData;
use std::future::Future;
2024-01-13 22:31:54 +00:00
use std::ops::Deref;
2023-10-28 13:55:14 +00:00
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::oneshot;
2023-10-28 13:55:14 +00:00
use tokio_stream::{Stream, StreamExt};
use tracing::{error, span, Instrument, Level, Span};
2024-01-07 17:37:50 +00:00
use url::Host;
use uuid::Uuid;
2024-01-13 17:42:15 +00:00
//async fn connect_http2(
// request_id: Uuid,
// client_cfg: &WsClientConfig,
// dest_addr: &RemoteAddr,
//) -> anyhow::Result<BodyStream<Incoming>> {
// let mut pooled_cnx = match client_cfg.cnx_pool().get().await {
// Ok(cnx) => Ok(cnx),
// Err(err) => Err(anyhow!("failed to get a connection to the server from the pool: {err:?}")),
// }?;
//
// let mut req = Request::builder()
// .method("GET")
// .uri(format!("/{}/events", &client_cfg.http_upgrade_path_prefix))
// .header(HOST, &client_cfg.http_header_host)
// .header(COOKIE, tunnel_to_jwt_token(request_id, dest_addr))
// .version(hyper::Version::HTTP_2);
//
// for (k, v) in &client_cfg.http_headers {
// req = req.header(k, v);
// }
// if let Some(auth) = &client_cfg.http_upgrade_credentials {
// req = req.header(AUTHORIZATION, auth);
// }
//
// let x: Vec<u8> = vec![];
// //let bosy = StreamBody::new(stream::iter(vec![anyhow::Result::Ok(hyper::body::Frame::data(x.as_slice()))]));
// let req = req.body(Empty::<Bytes>::new()).with_context(|| {
// format!(
// "failed to build HTTP request to contact the server {:?}",
// client_cfg.remote_addr
// )
// })?;
// debug!("with HTTP upgrade request {:?}", req);
// let transport = pooled_cnx.deref_mut().take().unwrap();
// let (mut request_sender, cnx) = hyper::client::conn::http2::Builder::new(TokioExecutor::new()).handshake(TokioIo::new(transport)).await
// .with_context(|| format!("failed to do http2 handshake with the server {:?}", client_cfg.remote_addr))?;
// tokio::spawn(cnx);
//
// let response = request_sender.send_request(req)
// .await
// .with_context(|| format!("failed to send http2 request with the server {:?}", client_cfg.remote_addr))?;
//
// // TODO: verify response is ok
// Ok(BodyStream::new(response.into_body()))
//}
2023-10-28 13:55:14 +00:00
async fn connect_to_server<R, W>(
request_id: Uuid,
client_cfg: &WsClientConfig,
2024-01-07 17:37:50 +00:00
remote_cfg: &RemoteAddr,
duplex_stream: (R, W),
) -> anyhow::Result<()>
where
R: AsyncRead + Send + 'static,
W: AsyncWrite + Send + 'static,
{
2024-01-13 22:31:54 +00:00
let ((ws_rx, ws_tx), _) = tunnel::transport::websocket::connect(request_id, client_cfg, remote_cfg).await?;
let (local_rx, local_tx) = duplex_stream;
let (close_tx, close_rx) = oneshot::channel::<()>();
// Forward local tx to websocket tx
let ping_frequency = client_cfg.websocket_ping_frequency;
tokio::spawn(
2024-01-13 20:06:57 +00:00
super::transport::io::propagate_local_to_remote(local_rx, ws_tx, close_tx, Some(ping_frequency))
.instrument(Span::current()),
);
// Forward websocket rx to local rx
2024-01-13 20:06:57 +00:00
let _ = super::transport::io::propagate_remote_to_local(local_tx, ws_rx, close_rx).await;
Ok(())
}
2023-10-28 13:55:14 +00:00
2024-01-07 17:37:50 +00:00
pub async fn run_tunnel<T, R, W>(client_config: Arc<WsClientConfig>, incoming_cnx: T) -> anyhow::Result<()>
2023-10-28 13:55:14 +00:00
where
2024-01-07 17:37:50 +00:00
T: Stream<Item = anyhow::Result<((R, W), RemoteAddr)>>,
2023-10-28 13:55:14 +00:00
R: AsyncRead + Send + 'static,
W: AsyncWrite + Send + 'static,
{
pin_mut!(incoming_cnx);
2024-01-07 17:37:50 +00:00
while let Some(Ok((cnx_stream, remote_addr))) = incoming_cnx.next().await {
2023-10-28 13:55:14 +00:00
let request_id = Uuid::now_v7();
let span = span!(
Level::INFO,
"tunnel",
id = request_id.to_string(),
2024-01-07 17:37:50 +00:00
remote = format!("{}:{}", remote_addr.host, remote_addr.port)
2023-10-28 13:55:14 +00:00
);
let client_config = client_config.clone();
let tunnel = async move {
2024-01-07 17:37:50 +00:00
let _ = connect_to_server(request_id, &client_config, &remote_addr, cnx_stream)
2023-10-28 13:55:14 +00:00
.await
.map_err(|err| error!("{:?}", err));
}
.instrument(span);
tokio::spawn(tunnel);
}
Ok(())
}
2023-11-26 14:47:49 +00:00
2023-11-26 17:22:28 +00:00
pub async fn run_reverse_tunnel<F, Fut, T>(
2023-11-26 14:47:49 +00:00
client_config: Arc<WsClientConfig>,
2024-01-07 17:37:50 +00:00
remote_addr: RemoteAddr,
2023-11-26 17:22:28 +00:00
connect_to_dest: F,
) -> anyhow::Result<()>
where
2024-01-07 17:37:50 +00:00
F: Fn(Option<RemoteAddr>) -> Fut,
2023-11-26 17:22:28 +00:00
Fut: Future<Output = anyhow::Result<T>>,
T: AsyncRead + AsyncWrite + Send + 'static,
{
2023-11-26 14:47:49 +00:00
loop {
let client_config = client_config.clone();
let request_id = Uuid::now_v7();
let span = span!(
Level::INFO,
"tunnel",
id = request_id.to_string(),
2024-01-07 17:37:50 +00:00
remote = format!("{}:{}", remote_addr.host, remote_addr.port)
2023-11-26 14:47:49 +00:00
);
let _span = span.enter();
// Correctly configure tunnel cfg
2024-01-13 22:31:54 +00:00
let ((ws_rx, ws_tx), response) =
tunnel::transport::websocket::connect(request_id, &client_config, &remote_addr)
.instrument(span.clone())
.await?;
2023-11-26 14:47:49 +00:00
// Connect to endpoint
let remote = response
2023-12-01 21:25:01 +00:00
.headers()
.get(COOKIE)
.and_then(|h| h.to_str().ok())
2024-01-07 17:37:50 +00:00
.and_then(|h| {
let (validation, decode_key) = JWT_DECODE.deref();
let jwt: Option<TokenData<JwtTunnelConfig>> = jsonwebtoken::decode(h, decode_key, validation).ok();
jwt
2023-12-01 21:25:01 +00:00
})
2024-01-07 17:37:50 +00:00
.map(|jwt| RemoteAddr {
protocol: jwt.claims.p,
host: Host::parse(&jwt.claims.r).unwrap_or_else(|_| Host::Domain(String::new())),
port: jwt.claims.rp,
});
2023-11-26 14:47:49 +00:00
2024-01-07 17:37:50 +00:00
let stream = match connect_to_dest(remote).instrument(span.clone()).await {
2023-11-26 14:47:49 +00:00
Ok(s) => s,
Err(err) => {
2024-01-07 17:37:50 +00:00
error!("Cannot connect to xxxx: {err:?}");
2023-11-26 14:47:49 +00:00
continue;
}
};
let (local_rx, local_tx) = tokio::io::split(stream);
let (close_tx, close_rx) = oneshot::channel::<()>();
let tunnel = async move {
let ping_frequency = client_config.websocket_ping_frequency;
tokio::spawn(
2024-01-13 20:06:57 +00:00
super::transport::io::propagate_local_to_remote(local_rx, ws_tx, close_tx, Some(ping_frequency))
.instrument(Span::current()),
2023-11-26 14:47:49 +00:00
);
// Forward websocket rx to local rx
2024-01-13 20:06:57 +00:00
let _ = super::transport::io::propagate_remote_to_local(local_tx, ws_rx, close_rx).await;
2023-11-26 14:47:49 +00:00
}
.instrument(span.clone());
tokio::spawn(tunnel);
}
}