feat: error handling
This commit is contained in:
parent
3c4f3b7533
commit
17c802bab8
6 changed files with 145 additions and 59 deletions
|
@ -65,7 +65,7 @@ impl Endpoint {
|
||||||
&mut self,
|
&mut self,
|
||||||
db: &mut BoxyDatabase,
|
db: &mut BoxyDatabase,
|
||||||
hostname: String,
|
hostname: String,
|
||||||
) -> Result<(), Box<dyn Error>> {
|
) -> Result<(), tokio_postgres::Error> {
|
||||||
let tx = db.client.transaction().await?;
|
let tx = db.client.transaction().await?;
|
||||||
|
|
||||||
let endpoint_id: i32 = tx
|
let endpoint_id: i32 = tx
|
||||||
|
|
|
@ -68,11 +68,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
|
||||||
let database_shared = Arc::new(Mutex::new(Box::leak(database)));
|
let database_shared = Arc::new(Mutex::new(Box::leak(database)));
|
||||||
|
|
||||||
let api_svc = ApiService {
|
let api_svc = ApiService::new(database_shared.clone(), config.clone()).await;
|
||||||
database: database_shared.clone(),
|
|
||||||
config: config.clone(),
|
|
||||||
_address: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let svc = ControllerService {
|
let svc = ControllerService {
|
||||||
database: database_shared,
|
database: database_shared,
|
||||||
|
|
|
@ -2,10 +2,7 @@ use std::{any::type_name_of_val, error::Error};
|
||||||
|
|
||||||
use http_body_util::{Either, Full};
|
use http_body_util::{Either, Full};
|
||||||
use hyper::{
|
use hyper::{
|
||||||
Request, Response,
|
body::{Body, Bytes, Incoming}, server::conn::http1, service::{HttpService, Service}, Request, Response, StatusCode
|
||||||
body::{Body, Bytes, Incoming},
|
|
||||||
server::conn::http1,
|
|
||||||
service::{HttpService, Service},
|
|
||||||
};
|
};
|
||||||
use hyper_util::rt::TokioIo;
|
use hyper_util::rt::TokioIo;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
|
@ -28,6 +25,22 @@ pub trait TcpIntercept {
|
||||||
fn stream(&mut self, stream: &TcpStream);
|
fn stream(&mut self, stream: &TcpStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn default_response() -> GeneralResponse {
|
||||||
|
Response::builder()
|
||||||
|
.status(404)
|
||||||
|
.body(GeneralBody::Right(Full::from(Bytes::from(
|
||||||
|
"That route doesn't exist.",
|
||||||
|
))))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn custom_resp(e: StatusCode, m: &'static str) -> GeneralResponse {
|
||||||
|
Response::builder()
|
||||||
|
.status(e)
|
||||||
|
.body(GeneralBody::Right(Full::from(Bytes::from(m))))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
impl<S> Server<S>
|
impl<S> Server<S>
|
||||||
where
|
where
|
||||||
S: TcpIntercept,
|
S: TcpIntercept,
|
||||||
|
|
|
@ -7,13 +7,13 @@ use hyper::{
|
||||||
body::{Bytes, Incoming},
|
body::{Bytes, Incoming},
|
||||||
service::Service,
|
service::Service,
|
||||||
};
|
};
|
||||||
use log::{debug, warn};
|
use log::{debug, error, warn};
|
||||||
use tokio::{net::TcpStream, sync::Mutex};
|
use tokio::{net::TcpStream, sync::Mutex};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::{Client, Config},
|
config::{Client, Config},
|
||||||
db::{BoxyDatabase, Endpoint},
|
db::{BoxyDatabase, Endpoint},
|
||||||
server::{GeneralBody, GeneralResponse, TcpIntercept},
|
server::{custom_resp, default_response, GeneralBody, GeneralResponse, TcpIntercept},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -23,21 +23,6 @@ pub struct ApiService {
|
||||||
pub _address: Option<IpAddr>,
|
pub _address: Option<IpAddr>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn default_response() -> GeneralResponse {
|
|
||||||
Response::builder()
|
|
||||||
.status(404)
|
|
||||||
.body(GeneralBody::Right(Full::from(Bytes::from(
|
|
||||||
"That route doesn't exist.",
|
|
||||||
))))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn custom_resp(e: StatusCode, m: &'static str) -> GeneralResponse {
|
|
||||||
Response::builder()
|
|
||||||
.status(e)
|
|
||||||
.body(GeneralBody::Right(Full::from(Bytes::from(m))))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TcpIntercept for ApiService {
|
impl TcpIntercept for ApiService {
|
||||||
fn stream(&mut self, stream: &TcpStream) {
|
fn stream(&mut self, stream: &TcpStream) {
|
||||||
|
@ -45,6 +30,16 @@ impl TcpIntercept for ApiService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ApiService {
|
||||||
|
pub async fn new(database: Arc<Mutex<&'static mut BoxyDatabase>>, config: Config) -> Self {
|
||||||
|
Self {
|
||||||
|
database,
|
||||||
|
config,
|
||||||
|
_address: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Service<Request<Incoming>> for ApiService {
|
impl Service<Request<Incoming>> for ApiService {
|
||||||
type Response = GeneralResponse;
|
type Response = GeneralResponse;
|
||||||
type Error = hyper::Error;
|
type Error = hyper::Error;
|
||||||
|
@ -53,7 +48,7 @@ impl Service<Request<Incoming>> for ApiService {
|
||||||
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
||||||
let database = self.database.clone();
|
let database = self.database.clone();
|
||||||
let config = self.config.clone();
|
let config = self.config.clone();
|
||||||
let address = self._address.clone().unwrap();
|
let address = self._address.unwrap();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
match *req.method() {
|
match *req.method() {
|
||||||
|
@ -61,19 +56,55 @@ impl Service<Request<Incoming>> for ApiService {
|
||||||
"/register" => {
|
"/register" => {
|
||||||
debug!("new api register request from {}", address);
|
debug!("new api register request from {}", address);
|
||||||
|
|
||||||
let encoded_header = req
|
let encoded_header =
|
||||||
.headers()
|
match req.headers().get(hyper::header::AUTHORIZATION) {
|
||||||
.get(hyper::header::AUTHORIZATION)
|
None => {
|
||||||
.unwrap()
|
error!(
|
||||||
|
"Authorization header not given for request from {address}",
|
||||||
|
);
|
||||||
|
|
||||||
|
return Ok(custom_resp(
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"Invalid credentials.",
|
||||||
|
)
|
||||||
|
.await);
|
||||||
|
}
|
||||||
|
Some(x) => x,
|
||||||
|
}
|
||||||
.to_str()
|
.to_str()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
debug!("authorization header: {}", encoded_header);
|
debug!("authorization header: {}", encoded_header);
|
||||||
|
|
||||||
let auth_string = String::from_utf8(
|
let auth_bytes = match BASE64_STANDARD.decode(&encoded_header[6..]) {
|
||||||
BASE64_STANDARD.decode(&encoded_header[6..]).unwrap(),
|
Ok(x) => x,
|
||||||
)
|
Err(e) => {
|
||||||
.unwrap();
|
error!(
|
||||||
|
"Error while decoding authorization header from {address}: {e}",
|
||||||
|
);
|
||||||
|
|
||||||
|
return Ok(custom_resp(
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"Invalid base64 string given.",
|
||||||
|
)
|
||||||
|
.await);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let auth_string = match String::from_utf8(auth_bytes) {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"Error while decoding authorization header from {address}: {e}",
|
||||||
|
);
|
||||||
|
|
||||||
|
return Ok(custom_resp(
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"Invalid UTF-8 in authentication string.",
|
||||||
|
)
|
||||||
|
.await);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
debug!("decoded auth string: {}", auth_string);
|
debug!("decoded auth string: {}", auth_string);
|
||||||
|
|
||||||
|
@ -90,7 +121,7 @@ impl Service<Request<Incoming>> for ApiService {
|
||||||
.await);
|
.await);
|
||||||
}
|
}
|
||||||
|
|
||||||
let body = String::from_utf8(
|
let body = match String::from_utf8(
|
||||||
req.collect()
|
req.collect()
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -98,16 +129,40 @@ impl Service<Request<Incoming>> for ApiService {
|
||||||
.iter()
|
.iter()
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect::<Vec<u8>>(),
|
.collect::<Vec<u8>>(),
|
||||||
)
|
) {
|
||||||
.unwrap();
|
Ok(x) => x,
|
||||||
let json = json::parse(body.as_str()).unwrap();
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"Error while inferring UTF-8 string from {address}'s request body: {e}",
|
||||||
|
);
|
||||||
|
|
||||||
|
return Ok(custom_resp(
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"Invalid UTF-8 in body.",
|
||||||
|
)
|
||||||
|
.await);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let json = match json::parse(body.as_str()) {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error while parsing JSON body from {address}: {e}",);
|
||||||
|
|
||||||
|
return Ok(custom_resp(
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"Invalid JSON in body.",
|
||||||
|
)
|
||||||
|
.await);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
debug!("body: {}", body);
|
debug!("body: {}", body);
|
||||||
|
|
||||||
let mut endpoint = Endpoint::new(
|
let mut endpoint = Endpoint::new(
|
||||||
None,
|
None,
|
||||||
address,
|
address,
|
||||||
json["port"].as_u16().unwrap(),
|
json["port"].as_u16().unwrap_or(8080),
|
||||||
json["callback"].as_str().unwrap_or("/").to_string(),
|
json["callback"].as_str().unwrap_or("/").to_string(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
@ -120,7 +175,7 @@ impl Service<Request<Incoming>> for ApiService {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
Ok(custom_resp(StatusCode::OK, "").await)
|
Ok(custom_resp(StatusCode::OK, "Success").await)
|
||||||
}
|
}
|
||||||
_ => Ok(default_response().await),
|
_ => Ok(default_response().await),
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,15 +1,12 @@
|
||||||
use std::{pin::Pin, sync::Arc};
|
use std::{pin::Pin, sync::Arc};
|
||||||
|
|
||||||
use hyper::{
|
use hyper::{Request, StatusCode, body::Incoming, service::Service};
|
||||||
Request,
|
use log::error;
|
||||||
body::Incoming,
|
|
||||||
service::Service,
|
|
||||||
};
|
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
db::{BoxyDatabase, Endpoint},
|
db::{BoxyDatabase, Endpoint},
|
||||||
server::{GeneralResponse, TcpIntercept},
|
server::{GeneralResponse, TcpIntercept, custom_resp},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::proxy::ProxyService;
|
use super::proxy::ProxyService;
|
||||||
|
@ -31,19 +28,32 @@ impl Service<Request<Incoming>> for ControllerService {
|
||||||
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
||||||
let database = self.database.clone();
|
let database = self.database.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let hostname = req
|
let hostname = match req.headers().get(hyper::header::HOST) {
|
||||||
.headers()
|
Some(x) => x,
|
||||||
.get(hyper::header::HOST)
|
None => {
|
||||||
.unwrap()
|
error!("No host header given.");
|
||||||
.to_str()
|
|
||||||
.unwrap()
|
return Ok(custom_resp(StatusCode::BAD_REQUEST, "No host header given.").await);
|
||||||
.to_string();
|
}
|
||||||
|
}
|
||||||
|
.to_str()
|
||||||
|
.unwrap()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
let endpoints = Endpoint::get_by_hostname(*database.lock().await, hostname.clone())
|
let endpoints = Endpoint::get_by_hostname(*database.lock().await, hostname.clone())
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let endpoint = endpoints.first().unwrap();
|
let endpoint = match endpoints.first() {
|
||||||
|
Some(x) => x,
|
||||||
|
None => {
|
||||||
|
error!("No endpoint found for request.");
|
||||||
|
|
||||||
|
return Ok(
|
||||||
|
custom_resp(StatusCode::NOT_FOUND, "No endpoint found for host.").await,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let proxy = ProxyService {
|
let proxy = ProxyService {
|
||||||
address: format!("{}:{}", endpoint.address.clone(), endpoint.port),
|
address: format!("{}:{}", endpoint.address.clone(), endpoint.port),
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
use hyper::{Request, body::Incoming, service::Service};
|
use hyper::{Request, StatusCode, body::Incoming, service::Service};
|
||||||
use hyper_util::rt::TokioIo;
|
use hyper_util::rt::TokioIo;
|
||||||
use log::error;
|
use log::error;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
use crate::server::{GeneralResponse, to_general_response};
|
use crate::server::{GeneralResponse, custom_resp, to_general_response};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct ProxyService {
|
pub struct ProxyService {
|
||||||
|
@ -22,7 +22,19 @@ impl Service<Request<Incoming>> for ProxyService {
|
||||||
let address = self.address.clone();
|
let address = self.address.clone();
|
||||||
let hostname = self.hostname.clone();
|
let hostname = self.hostname.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let stream = TcpStream::connect(address).await.unwrap();
|
let stream = match TcpStream::connect(address).await {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Could not open connection to endpoint: {e}");
|
||||||
|
|
||||||
|
return Ok(custom_resp(
|
||||||
|
StatusCode::BAD_GATEWAY,
|
||||||
|
"Unable to open connection to endpoint.",
|
||||||
|
)
|
||||||
|
.await);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let io = TokioIo::new(stream);
|
let io = TokioIo::new(stream);
|
||||||
|
|
||||||
let (mut sender, conn) = hyper::client::conn::http1::Builder::new()
|
let (mut sender, conn) = hyper::client::conn::http1::Builder::new()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue