Remove useless timeout
Former-commit-id: 191840bb8848691830762fdd1eeb67e0a3b036ab [formerly 8945a115947915b46a98028b4da732408ae87ea2] [formerly c55ece9b56df88c695b3cd9c7eca4c76462e779c [formerly 59db895e9138736928e346a12b2802dd36173a3c]] Former-commit-id: 62404ae9125e40bdb8d7c753283f913e8c527c2e [formerly 2d87ea4729096dff990afb94f91b62e805669608] Former-commit-id: 0b618ec5c5546780096d0a4418e96a45d47b9e63 Former-commit-id: 3ee30f721128eec0d10c8297bd0b13a1c4a8e36e Former-commit-id: a6866378f3dce77943b5c8901a73e3b9a82ff37f Former-commit-id: 7d267f5ccd2769061c7883c8b3087e523c719445 [formerly b9b9e2ebe518e1d20e4b9b15e40518d35952e802] Former-commit-id: b2cbd84fce1fe7e52ae2b3ec4bf195913bcfc7a8
This commit is contained in:
parent
a7263ba3dd
commit
11e12d1cc0
1 changed files with 6 additions and 13 deletions
|
@ -173,7 +173,7 @@ where
|
||||||
);
|
);
|
||||||
|
|
||||||
// Forward websocket rx to local rx
|
// Forward websocket rx to local rx
|
||||||
let _ = propagate_write(local_tx, ws_rx, close_rx, server_config.timeout_connect).await;
|
let _ = propagate_write(local_tx, ws_rx, close_rx).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -312,15 +312,13 @@ async fn server_upgrade(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let (close_tx, close_rx) = oneshot::channel::<()>();
|
let (close_tx, close_rx) = oneshot::channel::<()>();
|
||||||
let connect_timeout = server_config.timeout_connect;
|
|
||||||
let ping_frequency = server_config
|
let ping_frequency = server_config
|
||||||
.websocket_ping_frequency
|
.websocket_ping_frequency
|
||||||
.unwrap_or(Duration::MAX);
|
.unwrap_or(Duration::MAX);
|
||||||
ws_tx.set_auto_apply_mask(server_config.websocket_mask_frame);
|
ws_tx.set_auto_apply_mask(server_config.websocket_mask_frame);
|
||||||
|
|
||||||
tokio::task::spawn(
|
tokio::task::spawn(
|
||||||
propagate_write(local_tx, ws_rx, close_rx, connect_timeout)
|
propagate_write(local_tx, ws_rx, close_rx).instrument(Span::current()),
|
||||||
.instrument(Span::current()),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let _ = propagate_read(local_rx, ws_tx, close_tx, ping_frequency).await;
|
let _ = propagate_read(local_rx, ws_tx, close_tx, ping_frequency).await;
|
||||||
|
@ -471,7 +469,6 @@ async fn propagate_write(
|
||||||
local_tx: impl AsyncWrite,
|
local_tx: impl AsyncWrite,
|
||||||
mut ws_rx: WebSocketRead<ReadHalf<Upgraded>>,
|
mut ws_rx: WebSocketRead<ReadHalf<Upgraded>>,
|
||||||
mut close_rx: oneshot::Receiver<()>,
|
mut close_rx: oneshot::Receiver<()>,
|
||||||
timeout_connect: Duration,
|
|
||||||
) -> Result<(), WebSocketError> {
|
) -> Result<(), WebSocketError> {
|
||||||
let _guard = scopeguard::guard((), |_| {
|
let _guard = scopeguard::guard((), |_| {
|
||||||
info!("Closing local rx <== websocket rx tunnel");
|
info!("Closing local rx <== websocket rx tunnel");
|
||||||
|
@ -485,24 +482,20 @@ async fn propagate_write(
|
||||||
loop {
|
loop {
|
||||||
let ret = select! {
|
let ret = select! {
|
||||||
biased;
|
biased;
|
||||||
ret = timeout(timeout_connect, ws_rx.read_frame(&mut x)) => ret,
|
ret = ws_rx.read_frame(&mut x) => ret,
|
||||||
|
|
||||||
_ = &mut close_rx => break,
|
_ = &mut close_rx => break,
|
||||||
};
|
};
|
||||||
|
|
||||||
let msg = match ret {
|
let msg = match ret {
|
||||||
Ok(Ok(msg)) => msg,
|
Ok(msg) => msg,
|
||||||
Ok(Err(err)) => {
|
Err(err) => {
|
||||||
error!("error while reading from websocket rx {}", err);
|
error!("error while reading from websocket rx {}", err);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
|
||||||
// TODO: Check that the connection is not closed (no easy method to know if a tx is closed ...)
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!("frame {:?} {:?}", msg.opcode, msg.payload);
|
trace!("receive ws frame {:?} {:?}", msg.opcode, msg.payload);
|
||||||
let ret = match msg.opcode {
|
let ret = match msg.opcode {
|
||||||
OpCode::Continuation | OpCode::Text | OpCode::Binary => {
|
OpCode::Continuation | OpCode::Text | OpCode::Binary => {
|
||||||
local_tx.write_all(msg.payload.as_ref()).await
|
local_tx.write_all(msg.payload.as_ref()).await
|
||||||
|
|
Loading…
Reference in a new issue