ping fix
This commit is contained in:
@@ -21,3 +21,5 @@ docker build -t client .
|
|||||||
- `TOKEN` is required and is sent as `Authorization: Bearer <token>`
|
- `TOKEN` is required and is sent as `Authorization: Bearer <token>`
|
||||||
- `BASE_URL` is optional and defaults to `ws://127.0.0.1:8000`
|
- `BASE_URL` is optional and defaults to `ws://127.0.0.1:8000`
|
||||||
- `DEVICE_ID` is optional and defaults to `device`
|
- `DEVICE_ID` is optional and defaults to `device`
|
||||||
|
- `PING_INTERVAL_SECS` is optional and defaults to `30`
|
||||||
|
- `PING_TIMEOUT_SECS` is optional and defaults to `10`
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub token: String,
|
pub token: String,
|
||||||
pub device_id: String,
|
pub device_id: String,
|
||||||
pub base_url: String,
|
pub base_url: String,
|
||||||
pub max_output_bytes: u64,
|
pub max_output_bytes: u64,
|
||||||
pub writer_channel_capacity: usize,
|
pub writer_channel_capacity: usize,
|
||||||
|
pub ping_interval: Duration,
|
||||||
|
pub ping_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
@@ -11,6 +15,8 @@ impl Config {
|
|||||||
const DEFAULT_DEVICE_ID: &str = "device";
|
const DEFAULT_DEVICE_ID: &str = "device";
|
||||||
const DEFAULT_MAX_OUTPUT_BYTES: u64 = 10 * 1024 * 1024;
|
const DEFAULT_MAX_OUTPUT_BYTES: u64 = 10 * 1024 * 1024;
|
||||||
const DEFAULT_WRITER_CHANNEL_CAPACITY: usize = 32;
|
const DEFAULT_WRITER_CHANNEL_CAPACITY: usize = 32;
|
||||||
|
const DEFAULT_PING_INTERVAL_SECS: u64 = 30;
|
||||||
|
const DEFAULT_PING_TIMEOUT_SECS: u64 = 10;
|
||||||
|
|
||||||
pub fn from_env() -> Result<Self, String> {
|
pub fn from_env() -> Result<Self, String> {
|
||||||
let token = std::env::var("TOKEN")
|
let token = std::env::var("TOKEN")
|
||||||
@@ -32,6 +38,14 @@ impl Config {
|
|||||||
"WRITER_CHANNEL_CAPACITY",
|
"WRITER_CHANNEL_CAPACITY",
|
||||||
Self::DEFAULT_WRITER_CHANNEL_CAPACITY,
|
Self::DEFAULT_WRITER_CHANNEL_CAPACITY,
|
||||||
),
|
),
|
||||||
|
ping_interval: Duration::from_secs(parse_env(
|
||||||
|
"PING_INTERVAL_SECS",
|
||||||
|
Self::DEFAULT_PING_INTERVAL_SECS,
|
||||||
|
)),
|
||||||
|
ping_timeout: Duration::from_secs(parse_env(
|
||||||
|
"PING_TIMEOUT_SECS",
|
||||||
|
Self::DEFAULT_PING_TIMEOUT_SECS,
|
||||||
|
)),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+53
-5
@@ -29,6 +29,7 @@ type Writer = SplitSink<Sock, Message>;
|
|||||||
|
|
||||||
enum OutgoingMessage {
|
enum OutgoingMessage {
|
||||||
Application(ClientMsg),
|
Application(ClientMsg),
|
||||||
|
Ping,
|
||||||
Pong(Vec<u8>),
|
Pong(Vec<u8>),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,11 +122,38 @@ async fn process_messages(
|
|||||||
mut writer_closed: oneshot::Receiver<()>,
|
mut writer_closed: oneshot::Receiver<()>,
|
||||||
config: &Arc<Config>,
|
config: &Arc<Config>,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
|
let mut ping_interval = tokio::time::interval_at(
|
||||||
|
tokio::time::Instant::now() + config.ping_interval,
|
||||||
|
config.ping_interval,
|
||||||
|
);
|
||||||
|
|
||||||
|
let pong_timer = tokio::time::sleep(Duration::from_secs(0));
|
||||||
|
tokio::pin!(pong_timer);
|
||||||
|
let mut awaiting_pong = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = &mut writer_closed => return Err("Write half closed".into()),
|
_ = &mut writer_closed => return Err("Write half closed".into()),
|
||||||
|
_ = ping_interval.tick() => {
|
||||||
|
if awaiting_pong {
|
||||||
|
return Err("Server did not respond to ping".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
send_outgoing(&outgoing_tx, OutgoingMessage::Ping).await?;
|
||||||
|
awaiting_pong = true;
|
||||||
|
pong_timer.as_mut().reset(
|
||||||
|
tokio::time::Instant::now() + config.ping_timeout,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ = &mut pong_timer, if awaiting_pong => {
|
||||||
|
return Err("Server did not respond to ping".into());
|
||||||
|
}
|
||||||
next_message = stream.next() => {
|
next_message = stream.next() => {
|
||||||
handle_stream_message(next_message, &outgoing_tx, config.max_output_bytes).await?;
|
let action = handle_stream_message(next_message, &outgoing_tx, config.max_output_bytes).await?;
|
||||||
|
|
||||||
|
if action.clear_pong_deadline {
|
||||||
|
awaiting_pong = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -160,6 +188,10 @@ async fn write_message(sink: &mut Writer, message: OutgoingMessage) -> Result<()
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| e.to_string())
|
.map_err(|e| e.to_string())
|
||||||
}
|
}
|
||||||
|
OutgoingMessage::Ping => sink
|
||||||
|
.send(Message::Ping(Vec::new().into()))
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string()),
|
||||||
OutgoingMessage::Pong(payload) => sink
|
OutgoingMessage::Pong(payload) => sink
|
||||||
.send(Message::Pong(payload.into()))
|
.send(Message::Pong(payload.into()))
|
||||||
.await
|
.await
|
||||||
@@ -167,6 +199,10 @@ async fn write_message(sink: &mut Writer, message: OutgoingMessage) -> Result<()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct StreamAction {
|
||||||
|
clear_pong_deadline: bool,
|
||||||
|
}
|
||||||
|
|
||||||
async fn send_outgoing(
|
async fn send_outgoing(
|
||||||
outgoing_tx: &mpsc::Sender<OutgoingMessage>,
|
outgoing_tx: &mpsc::Sender<OutgoingMessage>,
|
||||||
message: OutgoingMessage,
|
message: OutgoingMessage,
|
||||||
@@ -178,17 +214,29 @@ async fn handle_stream_message(
|
|||||||
message: Option<Result<Message, tokio_tungstenite::tungstenite::Error>>,
|
message: Option<Result<Message, tokio_tungstenite::tungstenite::Error>>,
|
||||||
outgoing_tx: &mpsc::Sender<OutgoingMessage>,
|
outgoing_tx: &mpsc::Sender<OutgoingMessage>,
|
||||||
max_output_bytes: u64,
|
max_output_bytes: u64,
|
||||||
) -> Result<(), String> {
|
) -> Result<StreamAction, String> {
|
||||||
match message {
|
match message {
|
||||||
Some(Ok(Message::Text(text))) => {
|
Some(Ok(Message::Text(text))) => {
|
||||||
handle_server_message(parse_server_message(&text)?, outgoing_tx, max_output_bytes).await
|
handle_server_message(parse_server_message(&text)?, outgoing_tx, max_output_bytes)
|
||||||
|
.await?;
|
||||||
|
Ok(StreamAction {
|
||||||
|
clear_pong_deadline: false,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Ping(payload))) => {
|
Some(Ok(Message::Ping(payload))) => {
|
||||||
send_outgoing(outgoing_tx, OutgoingMessage::Pong(payload.to_vec())).await
|
send_outgoing(outgoing_tx, OutgoingMessage::Pong(payload.to_vec())).await?;
|
||||||
|
Ok(StreamAction {
|
||||||
|
clear_pong_deadline: false,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Close(_))) | None => Err("Connection closed".into()),
|
Some(Ok(Message::Close(_))) | None => Err("Connection closed".into()),
|
||||||
Some(Err(error)) => Err(error.to_string()),
|
Some(Err(error)) => Err(error.to_string()),
|
||||||
Some(Ok(_)) => Ok(()),
|
Some(Ok(Message::Pong(_))) => Ok(StreamAction {
|
||||||
|
clear_pong_deadline: true,
|
||||||
|
}),
|
||||||
|
Some(Ok(_)) => Ok(StreamAction {
|
||||||
|
clear_pong_deadline: false,
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user