This commit is contained in:
Σrebe - Romain GERARD 2023-11-03 09:17:56 +01:00
parent 0a9cb00342
commit 297176293c
No known key found for this signature in database
GPG key ID: 7A42B4B97E0332F4
3 changed files with 7 additions and 9 deletions

View file

@ -102,7 +102,7 @@ where
// Forward local tx to websocket tx // Forward local tx to websocket tx
let ping_frequency = client_cfg.websocket_ping_frequency; let ping_frequency = client_cfg.websocket_ping_frequency;
tokio::spawn(super::io::propagate_read(local_rx, ws_tx, close_tx, ping_frequency).instrument(Span::current())); tokio::spawn(super::io::propagate_read(local_rx, ws_tx, close_tx, Some(ping_frequency)).instrument(Span::current()));
// Forward websocket rx to local rx // Forward websocket rx to local rx
let _ = super::io::propagate_write(local_tx, ws_rx, close_rx).await; let _ = super::io::propagate_write(local_tx, ws_rx, close_rx).await;

View file

@ -14,7 +14,7 @@ pub(super) async fn propagate_read(
local_rx: impl AsyncRead, local_rx: impl AsyncRead,
mut ws_tx: WebSocketWrite<WriteHalf<Upgraded>>, mut ws_tx: WebSocketWrite<WriteHalf<Upgraded>>,
mut close_tx: oneshot::Sender<()>, mut close_tx: oneshot::Sender<()>,
ping_frequency: Duration, ping_frequency: Option<Duration>,
) -> Result<(), WebSocketError> { ) -> Result<(), WebSocketError> {
let _guard = scopeguard::guard((), |_| { let _guard = scopeguard::guard((), |_| {
info!("Closing local tx ==> websocket tx tunnel"); info!("Closing local tx ==> websocket tx tunnel");
@ -25,10 +25,9 @@ pub(super) async fn propagate_read(
// We do our own pin_mut! to avoid shadowing timeout and be able to reset it, on next loop iteration // We do our own pin_mut! to avoid shadowing timeout and be able to reset it, on next loop iteration
// We reuse the future to avoid creating a timer in the tight loop // We reuse the future to avoid creating a timer in the tight loop
let start_at = Instant::now() let frequency = ping_frequency.unwrap_or(Duration::from_secs(u64::MAX));
.checked_add(ping_frequency) let start_at = Instant::now().checked_add(frequency).unwrap_or(Instant::now());
.unwrap_or(Instant::now() + Duration::from_secs(3600 * 24)); let timeout = tokio::time::interval_at(start_at, frequency);
let timeout = tokio::time::interval_at(start_at, ping_frequency);
pin_mut!(timeout); pin_mut!(timeout);
pin_mut!(local_rx); pin_mut!(local_rx);
@ -40,7 +39,7 @@ pub(super) async fn propagate_read(
_ = close_tx.closed() => break, _ = close_tx.closed() => break,
_ = timeout.tick() => { _ = timeout.tick(), if ping_frequency.is_some() => {
debug!("sending ping to keep websocket connection alive"); debug!("sending ping to keep websocket connection alive");
ws_tx.write_frame(Frame::new(true, OpCode::Ping, None, Payload::BorrowedMut(&mut []))).await?; ws_tx.write_frame(Frame::new(true, OpCode::Ping, None, Payload::BorrowedMut(&mut []))).await?;

View file

@ -143,12 +143,11 @@ async fn server_upgrade(
} }
}; };
let (close_tx, close_rx) = oneshot::channel::<()>(); let (close_tx, close_rx) = oneshot::channel::<()>();
let ping_frequency = server_config.websocket_ping_frequency.unwrap_or(Duration::MAX);
ws_tx.set_auto_apply_mask(server_config.websocket_mask_frame); ws_tx.set_auto_apply_mask(server_config.websocket_mask_frame);
tokio::task::spawn(super::io::propagate_write(local_tx, ws_rx, close_rx).instrument(Span::current())); tokio::task::spawn(super::io::propagate_write(local_tx, ws_rx, close_rx).instrument(Span::current()));
let _ = super::io::propagate_read(local_rx, ws_tx, close_tx, ping_frequency).await; let _ = super::io::propagate_read(local_rx, ws_tx, close_tx, None).await;
} }
.instrument(Span::current()), .instrument(Span::current()),
); );