From b69b097c6f9b7fee2420a43e4741de1450ec35f3 Mon Sep 17 00:00:00 2001 From: RustDesk <71636191+rustdesk@users.noreply.github.com> Date: Tue, 3 Jun 2025 19:41:30 +0800 Subject: [PATCH] kcp stream --- Cargo.toml | 3 +- protos/rendezvous.proto | 2 + src/kcp_stream.rs | 128 ++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 4 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 src/kcp_stream.rs diff --git a/Cargo.toml b/Cargo.toml index 0689f62..865c553 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] # new flexi_logger failed on rustc 1.75 -flexi_logger = { version = "0.27", features = ["async"] } +flexi_logger = { version = "0.30", features = ["async"] } protobuf = { version = "3.7", features = ["with-bytes"] } tokio = { version = "1.44", features = ["full"] } tokio-util = { version = "0.7", features = ["full"] } @@ -47,6 +47,7 @@ base64 = "0.22" url = "2.5" sha2 = "0.10" whoami = "1.5" +kcp-sys="0.1" [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] mac_address = "1.1" diff --git a/protos/rendezvous.proto b/protos/rendezvous.proto index 2ed923c..7bd02c6 100644 --- a/protos/rendezvous.proto +++ b/protos/rendezvous.proto @@ -53,6 +53,7 @@ message PunchHoleSent { string relay_server = 3; NatType nat_type = 4; string version = 5; + bool is_udp = 6; } message RegisterPk { @@ -94,6 +95,7 @@ message PunchHoleResponse { } string other_failure = 7; int32 feedback = 8; + bool is_udp = 9; } message ConfigUpdate { diff --git a/src/kcp_stream.rs b/src/kcp_stream.rs new file mode 100644 index 0000000..6f1d27e --- /dev/null +++ b/src/kcp_stream.rs @@ -0,0 +1,128 @@ +use crate::tcp::{DynTcpStream, FramedStream}; +use kcp_sys::{ + endpoint::*, + packet_def::{Bytes, BytesMut, KcpPacket}, + stream, +}; +use std::{net::SocketAddr, sync::Arc}; +use tokio::{net::UdpSocket, sync::mpsc}; + +pub struct KcpStream { + pub endpoint: Arc, + pub stream: FramedStream, +} + +impl KcpStream { + fn create_framed(stream: stream::KcpStream, local_addr: Option) -> FramedStream { + FramedStream( + tokio_util::codec::Framed::new( + DynTcpStream(Box::new(stream)), + crate::bytes_codec::BytesCodec::new(), + ), + local_addr.unwrap_or(crate::config::Config::get_any_listen_addr(true)), + None, + 0, + ) + } + + pub async fn accept( + udp_socket: Arc, + from_addr: SocketAddr, + ) -> crate::ResultType { + let mut endpoint = KcpEndpoint::new(); + endpoint.run().await; + + let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap()); + udp_socket.connect(&[from_addr][..]).await?; + Self::kcp_io(udp_socket.clone(), input, output).await; + + let conn_id = endpoint.accept().await?; + if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) { + Ok(Self { + endpoint: Arc::new(endpoint), + stream: Self::create_framed(stream, udp_socket.local_addr().ok()), + }) + } else { + Err(anyhow::anyhow!("Failed to create KcpStream")) + } + } + + pub async fn connect( + udp_socket: Arc, + to_addr: SocketAddr, + timeout: std::time::Duration, + ) -> crate::ResultType { + let mut endpoint = KcpEndpoint::new(); + endpoint.run().await; + + let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap()); + udp_socket.connect(&[to_addr][..]).await?; + Self::kcp_io(udp_socket.clone(), input, output).await; + + let conn_id = endpoint.connect(timeout, 0, 0, Bytes::new()).await.unwrap(); + if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) { + Ok(Self { + endpoint: Arc::new(endpoint), + stream: Self::create_framed(stream, udp_socket.local_addr().ok()), + }) + } else { + Err(anyhow::anyhow!("Failed to create KcpStream")) + } + } + + async fn kcp_io( + udp_socket: Arc, + input: mpsc::Sender, + mut output: mpsc::Receiver, + ) { + let udp = udp_socket.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + Some(data) = output.recv() => { + if let Err(e) = udp.send(&data.inner()).await { + // Break on fatal errors, but ignore WouldBlock or Interrupted + if e.kind() != std::io::ErrorKind::WouldBlock && e.kind() != std::io::ErrorKind::Interrupted { + log::error!("kcp send error: {:?}", e); + break; + } + } + } + else => { + log::debug!("kcp endpoint output closed"); + break; + } + } + } + }); + + let udp = udp_socket.clone(); + tokio::spawn(async move { + let mut buf = vec![0; 10240]; + loop { + tokio::select! { + result = udp.recv_from(&mut buf) => { + match result { + Ok((size, _)) => { + input + .send(BytesMut::from(&buf[..size]).into()) + .await.ok(); + } + Err(e) => { + // Break on fatal errors, but ignore WouldBlock or Interrupted + if e.kind() != std::io::ErrorKind::WouldBlock && e.kind() != std::io::ErrorKind::Interrupted { + log::error!("kcp recv_from error: {:?}", e); + break; + } + } + } + } + else => { + log::debug!("kcp endpoint input closed"); + break; + } + } + } + }); + } +} diff --git a/src/lib.rs b/src/lib.rs index 3f24fdf..5f8027c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,6 +61,7 @@ pub mod websocket; pub mod stream; pub use stream::Stream; pub use whoami; +pub mod kcp_stream; pub type SessionID = uuid::Uuid;