[enhance] add ping pong send (ver.2)

This commit is contained in:
YinMo19 2025-04-23 10:58:28 +08:00
parent f31d1ec1b8
commit 5c6b12c438

View file

@ -14,6 +14,7 @@ use std::{
time::Duration, time::Duration,
}; };
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::Instant;
use tokio::{net::TcpStream, time::timeout}; use tokio::{net::TcpStream, time::timeout};
use tokio_tungstenite::{ use tokio_tungstenite::{
connect_async, tungstenite::protocol::Message as WsMessage, MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message as WsMessage, MaybeTlsStream, WebSocketStream,
@ -34,10 +35,9 @@ pub struct WsFramedStream {
// read_buf: BytesMut, // read_buf: BytesMut,
} }
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(10);
impl WsFramedStream { impl WsFramedStream {
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15);
pub async fn new<T: AsRef<str>>( pub async fn new<T: AsRef<str>>(
url: T, url: T,
local_addr: Option<SocketAddr>, local_addr: Option<SocketAddr>,
@ -121,15 +121,26 @@ impl WsFramedStream {
fn start_heartbeat(&self) { fn start_heartbeat(&self) {
let writer = Arc::clone(&self.writer); let writer = Arc::clone(&self.writer);
tokio::spawn(async move { tokio::spawn(async move {
let mut interval = tokio::time::interval(Self::HEARTBEAT_INTERVAL); let mut last_pong = Instant::now();
let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL);
loop { loop {
interval.tick().await; tokio::select! {
let mut lock = writer.lock().await; _ = interval.tick() => {
if let Err(e) = lock.send(WsMessage::Ping(Bytes::new())).await { let mut lock = writer.lock().await;
log::error!("Failed to send ping: {}", e); if let Err(e) = lock.send(WsMessage::Ping(Bytes::new())).await {
break; log::error!("Heartbeat failed: {}", e);
break;
}
log::debug!("Sent ping");
}
_ = tokio::time::sleep(HEARTBEAT_TIMEOUT) => {
if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
log::error!("Heartbeat timeout");
break;
}
}
} }
drop(lock); // 及时释放锁
} }
}); });
} }
@ -271,7 +282,7 @@ impl WsFramedStream {
} }
} }
if start.elapsed() > Self::HEARTBEAT_TIMEOUT { if start.elapsed() > HEARTBEAT_TIMEOUT {
log::warn!("No message received within heartbeat timeout"); log::warn!("No message received within heartbeat timeout");
return Some(Err(Error::new(ErrorKind::TimedOut, "Heartbeat timeout"))); return Some(Err(Error::new(ErrorKind::TimedOut, "Heartbeat timeout")));
} }