Bump to hyper 1.x
This commit is contained in:
parent
9a16c38909
commit
d1de41646f
5 changed files with 109 additions and 66 deletions
|
@ -3,12 +3,16 @@ use crate::{LocalToRemote, WsClientConfig};
|
|||
use anyhow::{anyhow, Context};
|
||||
|
||||
use base64::Engine;
|
||||
use bytes::Bytes;
|
||||
use fastwebsockets::WebSocket;
|
||||
use futures_util::pin_mut;
|
||||
use http_body_util::Empty;
|
||||
use hyper::body::Incoming;
|
||||
use hyper::header::{AUTHORIZATION, COOKIE, SEC_WEBSOCKET_VERSION, UPGRADE};
|
||||
use hyper::header::{CONNECTION, HOST, SEC_WEBSOCKET_KEY};
|
||||
use hyper::upgrade::Upgraded;
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::{Request, Response};
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use std::future::Future;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::Arc;
|
||||
|
@ -20,18 +24,6 @@ use tracing::{error, span, Instrument, Level, Span};
|
|||
use url::{Host, Url};
|
||||
use uuid::Uuid;
|
||||
|
||||
struct SpawnExecutor;
|
||||
|
||||
impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
|
||||
where
|
||||
Fut: Future + Send + 'static,
|
||||
Fut::Output: Send + 'static,
|
||||
{
|
||||
fn execute(&self, fut: Fut) {
|
||||
tokio::task::spawn(fut);
|
||||
}
|
||||
}
|
||||
|
||||
fn tunnel_to_jwt_token(request_id: Uuid, tunnel: &LocalToRemote) -> String {
|
||||
let cfg = JwtTunnelConfig::new(request_id, tunnel);
|
||||
let (alg, secret) = JWT_KEY.deref();
|
||||
|
@ -42,7 +34,7 @@ pub async fn connect(
|
|||
request_id: Uuid,
|
||||
client_cfg: &WsClientConfig,
|
||||
tunnel_cfg: &LocalToRemote,
|
||||
) -> anyhow::Result<(WebSocket<Upgraded>, Response<Body>)> {
|
||||
) -> anyhow::Result<(WebSocket<TokioIo<Upgraded>>, Response<Incoming>)> {
|
||||
let mut pooled_cnx = match client_cfg.cnx_pool().get().await {
|
||||
Ok(tcp_stream) => tcp_stream,
|
||||
Err(err) => Err(anyhow!("failed to get a connection to the server from the pool: {err:?}"))?,
|
||||
|
@ -69,7 +61,7 @@ pub async fn connect(
|
|||
req = req.header(AUTHORIZATION, auth);
|
||||
}
|
||||
|
||||
let req = req.body(Body::empty()).with_context(|| {
|
||||
let req = req.body(Empty::<Bytes>::new()).with_context(|| {
|
||||
format!(
|
||||
"failed to build HTTP request to contact the server {:?}",
|
||||
client_cfg.remote_addr
|
||||
|
@ -77,7 +69,7 @@ pub async fn connect(
|
|||
})?;
|
||||
debug!("with HTTP upgrade request {:?}", req);
|
||||
let transport = pooled_cnx.deref_mut().take().unwrap();
|
||||
let (ws, response) = fastwebsockets::handshake::client(&SpawnExecutor, req, transport)
|
||||
let (ws, response) = fastwebsockets::handshake::client(&TokioExecutor::new(), req, transport)
|
||||
.await
|
||||
.with_context(|| format!("failed to do websocket handshake with the server {:?}", client_cfg.remote_addr))?;
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ use fastwebsockets::{Frame, OpCode, Payload, WebSocketError, WebSocketRead, WebS
|
|||
use futures_util::{pin_mut, FutureExt};
|
||||
use hyper::upgrade::Upgraded;
|
||||
|
||||
use hyper_util::rt::TokioIo;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
|
||||
use tokio::select;
|
||||
|
@ -12,7 +13,7 @@ use tracing::{error, info, trace, warn};
|
|||
|
||||
pub(super) async fn propagate_read(
|
||||
local_rx: impl AsyncRead,
|
||||
mut ws_tx: WebSocketWrite<WriteHalf<Upgraded>>,
|
||||
mut ws_tx: WebSocketWrite<WriteHalf<TokioIo<Upgraded>>>,
|
||||
mut close_tx: oneshot::Sender<()>,
|
||||
ping_frequency: Option<Duration>,
|
||||
) -> Result<(), WebSocketError> {
|
||||
|
@ -84,7 +85,7 @@ pub(super) async fn propagate_read(
|
|||
|
||||
pub(super) async fn propagate_write(
|
||||
local_tx: impl AsyncWrite,
|
||||
mut ws_rx: WebSocketRead<ReadHalf<Upgraded>>,
|
||||
mut ws_rx: WebSocketRead<ReadHalf<TokioIo<Upgraded>>>,
|
||||
mut close_rx: oneshot::Receiver<()>,
|
||||
) -> Result<(), WebSocketError> {
|
||||
let _guard = scopeguard::guard((), |_| {
|
||||
|
|
|
@ -12,11 +12,12 @@ use std::time::Duration;
|
|||
|
||||
use super::{JwtTunnelConfig, JWT_DECODE};
|
||||
use crate::{socks5, tcp, tls, udp, LocalProtocol, WsServerConfig};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::header::COOKIE;
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::server::conn::Http;
|
||||
use hyper::server::conn::http1;
|
||||
use hyper::service::service_fn;
|
||||
use hyper::{http, Body, Request, Response, StatusCode};
|
||||
use hyper::{http, Request, Response, StatusCode};
|
||||
use jsonwebtoken::TokenData;
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::Mutex;
|
||||
|
@ -188,8 +189,8 @@ where
|
|||
|
||||
async fn server_upgrade(
|
||||
server_config: Arc<WsServerConfig>,
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, anyhow::Error> {
|
||||
mut req: Request<Incoming>,
|
||||
) -> Result<Response<String>, anyhow::Error> {
|
||||
if let Some(x) = req.headers().get("X-Forwarded-For") {
|
||||
info!("Request X-Forwarded-For: {:?}", x);
|
||||
Span::current().record("forwarded_for", x.to_str().unwrap_or_default());
|
||||
|
@ -199,8 +200,8 @@ async fn server_upgrade(
|
|||
warn!("Rejecting connection with bad upgrade request: {}", req.uri());
|
||||
return Ok(http::Response::builder()
|
||||
.status(StatusCode::BAD_REQUEST)
|
||||
.body(Body::from("Invalid upgrade request"))
|
||||
.unwrap_or_default());
|
||||
.body("Invalid upgrade request".into())
|
||||
.unwrap());
|
||||
}
|
||||
|
||||
if let Some(paths_prefix) = &server_config.restrict_http_upgrade_path_prefix {
|
||||
|
@ -217,8 +218,8 @@ async fn server_upgrade(
|
|||
warn!("Rejecting connection with bad path prefix in upgrade request: {}", req.uri());
|
||||
return Ok(http::Response::builder()
|
||||
.status(StatusCode::BAD_REQUEST)
|
||||
.body(Body::from("Invalid upgrade request"))
|
||||
.unwrap_or_default());
|
||||
.body("Invalid upgrade request".to_string())
|
||||
.unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,8 +230,8 @@ async fn server_upgrade(
|
|||
warn!("Rejecting connection with bad upgrade request: {} {}", err, req.uri());
|
||||
return Ok(http::Response::builder()
|
||||
.status(StatusCode::BAD_REQUEST)
|
||||
.body(Body::from(format!("Invalid upgrade request: {:?}", err)))
|
||||
.unwrap_or_default());
|
||||
.body("Invalid upgrade request".to_string())
|
||||
.unwrap());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -241,8 +242,8 @@ async fn server_upgrade(
|
|||
warn!("Rejecting connection with bad upgrade request: {} {}", err, req.uri());
|
||||
return Ok(http::Response::builder()
|
||||
.status(StatusCode::BAD_REQUEST)
|
||||
.body(Body::from(format!("Invalid upgrade request: {:?}", err)))
|
||||
.unwrap_or_default());
|
||||
.body(format!("Invalid upgrade request: {:?}", err))
|
||||
.unwrap());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -273,6 +274,8 @@ async fn server_upgrade(
|
|||
)?,
|
||||
);
|
||||
}
|
||||
|
||||
let response = Response::from_parts(response.into_parts().0, "".to_string());
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
|
@ -280,7 +283,7 @@ pub async fn run_server(server_config: Arc<WsServerConfig>) -> anyhow::Result<()
|
|||
info!("Starting wstunnel server listening on {}", server_config.bind);
|
||||
|
||||
let config = server_config.clone();
|
||||
let upgrade_fn = move |req: Request<Body>| server_upgrade(config.clone(), req);
|
||||
let upgrade_fn = move |req: Request<Incoming>| server_upgrade(config.clone(), req);
|
||||
|
||||
let listener = TcpListener::bind(&server_config.bind).await?;
|
||||
let tls_acceptor = if let Some(tls) = &server_config.tls {
|
||||
|
@ -310,14 +313,14 @@ pub async fn run_server(server_config: Arc<WsServerConfig>) -> anyhow::Result<()
|
|||
let fut = async move {
|
||||
info!("Doing TLS handshake");
|
||||
let tls_stream = match tls_acceptor.accept(stream).await {
|
||||
Ok(tls_stream) => tls_stream,
|
||||
Ok(tls_stream) => hyper_util::rt::TokioIo::new(tls_stream),
|
||||
Err(err) => {
|
||||
error!("error while accepting TLS connection {}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let conn_fut = Http::new()
|
||||
.http1_only(true)
|
||||
|
||||
let conn_fut = http1::Builder::new()
|
||||
.serve_connection(tls_stream, service_fn(upgrade_fn))
|
||||
.with_upgrades();
|
||||
|
||||
|
@ -330,8 +333,8 @@ pub async fn run_server(server_config: Arc<WsServerConfig>) -> anyhow::Result<()
|
|||
tokio::spawn(fut);
|
||||
// Normal
|
||||
} else {
|
||||
let conn_fut = Http::new()
|
||||
.http1_only(true)
|
||||
let stream = hyper_util::rt::TokioIo::new(stream);
|
||||
let conn_fut = http1::Builder::new()
|
||||
.serve_connection(stream, service_fn(upgrade_fn))
|
||||
.with_upgrades();
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue