chore(http-proxy): improve http proxy
This commit is contained in:
parent
38cb7ed5f8
commit
279896126c
2 changed files with 78 additions and 43 deletions
|
@ -5,6 +5,7 @@ use bytes::Bytes;
|
|||
use log::{debug, error};
|
||||
use std::net::{Ipv4Addr, SocketAddr};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use base64::Engine;
|
||||
use futures_util::{future, stream, Stream};
|
||||
|
@ -17,6 +18,8 @@ use hyper_util::rt::TokioTimer;
|
|||
use parking_lot::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::select;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::log::info;
|
||||
use url::Host;
|
||||
|
||||
|
@ -98,29 +101,61 @@ pub async fn run_server(
|
|||
};
|
||||
let auth_header =
|
||||
credentials.map(|(user, pass)| base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, pass)));
|
||||
let tasks = JoinSet::<Option<(TcpStream, (Host, u16))>>::new();
|
||||
|
||||
let listener = stream::unfold((listener, http1, auth_header), |(listener, http1, auth_header)| async {
|
||||
let proxy_cfg = Arc::new((auth_header, http1));
|
||||
let listener = stream::unfold((listener, tasks, proxy_cfg), |(listener, mut tasks, proxy_cfg)| async {
|
||||
loop {
|
||||
let (mut stream, _) = match listener.accept().await {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
error!("Error while accepting connection {:?}", err);
|
||||
continue;
|
||||
let (mut stream, forward_to) = select! {
|
||||
biased;
|
||||
|
||||
cnx = tasks.join_next(), if !tasks.is_empty() => {
|
||||
match cnx {
|
||||
Some(Ok(Some((stream, f)))) => (stream, Some(f)),
|
||||
None | Some(Ok(None)) => continue,
|
||||
Some(Err(err)) => {
|
||||
error!("Error while joinning tasks {:?}", err);
|
||||
continue
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
stream = listener.accept() => {
|
||||
match stream {
|
||||
Ok((stream, _)) => (stream, None),
|
||||
Err(err) => {
|
||||
error!("Error while accepting connection {:?}", err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let forward_to = Mutex::new((Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0)), 0));
|
||||
let conn_fut = http1.serve_connection(
|
||||
hyper_util::rt::TokioIo::new(&mut stream),
|
||||
service_fn(|req| handle_request(&auth_header, &forward_to, req)),
|
||||
);
|
||||
match conn_fut.await {
|
||||
Ok(_) => return Some((Ok((stream, forward_to.into_inner())), (listener, http1, auth_header))),
|
||||
Err(err) => {
|
||||
info!("Error while serving connection: {}", err);
|
||||
continue;
|
||||
}
|
||||
if let Some(forward_to) = forward_to {
|
||||
return Some((Ok((stream, forward_to)), (listener, tasks, proxy_cfg)));
|
||||
}
|
||||
|
||||
let handle_new_cnx = {
|
||||
let proxy_cfg = proxy_cfg.clone();
|
||||
async move {
|
||||
let http1 = &proxy_cfg.1;
|
||||
let auth_header = &proxy_cfg.0;
|
||||
let forward_to = Mutex::new((Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0)), 0));
|
||||
let conn_fut = http1.serve_connection(
|
||||
hyper_util::rt::TokioIo::new(&mut stream),
|
||||
service_fn(|req| handle_request(auth_header, &forward_to, req)),
|
||||
);
|
||||
|
||||
match conn_fut.await {
|
||||
Ok(_) => Some((stream, forward_to.into_inner())),
|
||||
Err(err) => {
|
||||
info!("Error while serving connection: {}", err);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
tasks.spawn(handle_new_cnx);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue