From 01e72931d1319b2c8bd65a422ea3200d006848a4 Mon Sep 17 00:00:00 2001 From: ZeroZipp Date: Fri, 22 May 2026 08:13:06 +0200 Subject: [PATCH] ping fix --- readme.md | 2 ++ src/app/config.rs | 14 ++++++++++++ src/main.rs | 58 +++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/readme.md b/readme.md index a491e27..ea1add1 100644 --- a/readme.md +++ b/readme.md @@ -21,3 +21,5 @@ docker build -t client . - `TOKEN` is required and is sent as `Authorization: Bearer ` - `BASE_URL` is optional and defaults to `ws://127.0.0.1:8000` - `DEVICE_ID` is optional and defaults to `device` +- `PING_INTERVAL_SECS` is optional and defaults to `30` +- `PING_TIMEOUT_SECS` is optional and defaults to `10` diff --git a/src/app/config.rs b/src/app/config.rs index 0ad3fd7..7fd1261 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -1,9 +1,13 @@ +use std::time::Duration; + pub struct Config { pub token: String, pub device_id: String, pub base_url: String, pub max_output_bytes: u64, pub writer_channel_capacity: usize, + pub ping_interval: Duration, + pub ping_timeout: Duration, } impl Config { @@ -11,6 +15,8 @@ impl Config { const DEFAULT_DEVICE_ID: &str = "device"; const DEFAULT_MAX_OUTPUT_BYTES: u64 = 10 * 1024 * 1024; const DEFAULT_WRITER_CHANNEL_CAPACITY: usize = 32; + const DEFAULT_PING_INTERVAL_SECS: u64 = 30; + const DEFAULT_PING_TIMEOUT_SECS: u64 = 10; pub fn from_env() -> Result { let token = std::env::var("TOKEN") @@ -32,6 +38,14 @@ impl Config { "WRITER_CHANNEL_CAPACITY", Self::DEFAULT_WRITER_CHANNEL_CAPACITY, ), + ping_interval: Duration::from_secs(parse_env( + "PING_INTERVAL_SECS", + Self::DEFAULT_PING_INTERVAL_SECS, + )), + ping_timeout: Duration::from_secs(parse_env( + "PING_TIMEOUT_SECS", + Self::DEFAULT_PING_TIMEOUT_SECS, + )), }) } } diff --git a/src/main.rs b/src/main.rs index 3aaef68..e05e357 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,6 +29,7 @@ type Writer = SplitSink; enum OutgoingMessage { Application(ClientMsg), + Ping, Pong(Vec), } @@ -121,11 +122,38 @@ async fn process_messages( mut writer_closed: oneshot::Receiver<()>, config: &Arc, ) -> Result<(), String> { + let mut ping_interval = tokio::time::interval_at( + tokio::time::Instant::now() + config.ping_interval, + config.ping_interval, + ); + + let pong_timer = tokio::time::sleep(Duration::from_secs(0)); + tokio::pin!(pong_timer); + let mut awaiting_pong = false; + loop { tokio::select! { _ = &mut writer_closed => return Err("Write half closed".into()), + _ = ping_interval.tick() => { + if awaiting_pong { + return Err("Server did not respond to ping".into()); + } + + send_outgoing(&outgoing_tx, OutgoingMessage::Ping).await?; + awaiting_pong = true; + pong_timer.as_mut().reset( + tokio::time::Instant::now() + config.ping_timeout, + ); + } + _ = &mut pong_timer, if awaiting_pong => { + return Err("Server did not respond to ping".into()); + } next_message = stream.next() => { - handle_stream_message(next_message, &outgoing_tx, config.max_output_bytes).await?; + let action = handle_stream_message(next_message, &outgoing_tx, config.max_output_bytes).await?; + + if action.clear_pong_deadline { + awaiting_pong = false; + } } } } @@ -160,6 +188,10 @@ async fn write_message(sink: &mut Writer, message: OutgoingMessage) -> Result<() .await .map_err(|e| e.to_string()) } + OutgoingMessage::Ping => sink + .send(Message::Ping(Vec::new().into())) + .await + .map_err(|e| e.to_string()), OutgoingMessage::Pong(payload) => sink .send(Message::Pong(payload.into())) .await @@ -167,6 +199,10 @@ async fn write_message(sink: &mut Writer, message: OutgoingMessage) -> Result<() } } +struct StreamAction { + clear_pong_deadline: bool, +} + async fn send_outgoing( outgoing_tx: &mpsc::Sender, message: OutgoingMessage, @@ -178,17 +214,29 @@ async fn handle_stream_message( message: Option>, outgoing_tx: &mpsc::Sender, max_output_bytes: u64, -) -> Result<(), String> { +) -> Result { match message { Some(Ok(Message::Text(text))) => { - handle_server_message(parse_server_message(&text)?, outgoing_tx, max_output_bytes).await + handle_server_message(parse_server_message(&text)?, outgoing_tx, max_output_bytes) + .await?; + Ok(StreamAction { + clear_pong_deadline: false, + }) } Some(Ok(Message::Ping(payload))) => { - send_outgoing(outgoing_tx, OutgoingMessage::Pong(payload.to_vec())).await + send_outgoing(outgoing_tx, OutgoingMessage::Pong(payload.to_vec())).await?; + Ok(StreamAction { + clear_pong_deadline: false, + }) } Some(Ok(Message::Close(_))) | None => Err("Connection closed".into()), Some(Err(error)) => Err(error.to_string()), - Some(Ok(_)) => Ok(()), + Some(Ok(Message::Pong(_))) => Ok(StreamAction { + clear_pong_deadline: true, + }), + Some(Ok(_)) => Ok(StreamAction { + clear_pong_deadline: false, + }), } }