diff --git a/src/transport.rs b/src/transport.rs index a4c4cb0..725eee0 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -173,7 +173,7 @@ where ); // Forward websocket rx to local rx - let _ = propagate_write(local_tx, ws_rx, close_rx, server_config.timeout_connect).await; + let _ = propagate_write(local_tx, ws_rx, close_rx).await; Ok(()) } @@ -312,15 +312,13 @@ async fn server_upgrade( } }; let (close_tx, close_rx) = oneshot::channel::<()>(); - let connect_timeout = server_config.timeout_connect; let ping_frequency = server_config .websocket_ping_frequency .unwrap_or(Duration::MAX); ws_tx.set_auto_apply_mask(server_config.websocket_mask_frame); tokio::task::spawn( - propagate_write(local_tx, ws_rx, close_rx, connect_timeout) - .instrument(Span::current()), + propagate_write(local_tx, ws_rx, close_rx).instrument(Span::current()), ); let _ = propagate_read(local_rx, ws_tx, close_tx, ping_frequency).await; @@ -471,7 +469,6 @@ async fn propagate_write( local_tx: impl AsyncWrite, mut ws_rx: WebSocketRead>, mut close_rx: oneshot::Receiver<()>, - timeout_connect: Duration, ) -> Result<(), WebSocketError> { let _guard = scopeguard::guard((), |_| { info!("Closing local rx <== websocket rx tunnel"); @@ -485,24 +482,20 @@ async fn propagate_write( loop { let ret = select! { biased; - ret = timeout(timeout_connect, ws_rx.read_frame(&mut x)) => ret, + ret = ws_rx.read_frame(&mut x) => ret, _ = &mut close_rx => break, }; let msg = match ret { - Ok(Ok(msg)) => msg, - Ok(Err(err)) => { + Ok(msg) => msg, + Err(err) => { error!("error while reading from websocket rx {}", err); break; } - Err(_) => { - // TODO: Check that the connection is not closed (no easy method to know if a tx is closed ...) - continue; - } }; - trace!("frame {:?} {:?}", msg.opcode, msg.payload); + trace!("receive ws frame {:?} {:?}", msg.opcode, msg.payload); let ret = match msg.opcode { OpCode::Continuation | OpCode::Text | OpCode::Binary => { local_tx.write_all(msg.payload.as_ref()).await