feat(fs): delegate win --server file reading to CM (#13736)

- Route Windows server-to-client file reads through CM instead of the connection layer
- Add FS IPC commands (ReadFile, CancelRead, SendConfirmForRead, ReadAllFiles) and CM data messages
  (ReadJobInitResult, FileBlockFromCM, FileReadDone, FileReadError, FileDigestFromCM, AllFilesResult)
- Track pending read validations and read jobs to coordinate CM-driven file transfers and clean them up
  on completion, cancellation, and errors
- Enforce a configurable file-transfer-max-files limit for ReadAllFiles and add stronger file name/path
  validation on the CM side
- Improve Flutter file transfer UX and robustness:
  - Use explicit percent/percentText progress fields
  - Derive speed and cancel actions from the active job
  - Handle job errors via FileModel.handleJobError and complete pending recursive tasks on failure
  - Wrap recursive directory operations in try/catch and await sendRemoveEmptyDir when removing empty directories

Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
fufesou
2025-12-28 15:39:35 +08:00
committed by GitHub
parent 5b2101e17d
commit 969ea28d06
11 changed files with 1349 additions and 97 deletions

View File

@@ -50,6 +50,7 @@ use serde_json::{json, value::Value};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use std::sync::atomic::Ordering;
use std::{
collections::HashSet,
net::Ipv6Addr,
num::NonZeroI64,
path::PathBuf,
@@ -63,8 +64,6 @@ use windows::Win32::Foundation::{CloseHandle, HANDLE};
#[cfg(windows)]
use crate::virtual_display_manager;
#[cfg(not(any(target_os = "ios")))]
use std::collections::HashSet;
pub type Sender = mpsc::UnboundedSender<(Instant, Arc<Message>)>;
lazy_static::lazy_static! {
@@ -287,6 +286,11 @@ pub struct Connection {
// For post requests that need to be sent sequentially.
// eg. post_conn_audit
tx_post_seq: mpsc::UnboundedSender<(String, Value)>,
// Tracks read job IDs delegated to CM process.
// When a read job is delegated to CM (via FS::ReadFile), the job id is added here.
// Used to filter stale responses (FileBlockFromCM, FileReadDone, etc.) for
// cancelled or unknown jobs.
cm_read_job_ids: HashSet<i32>,
terminal_service_id: String,
terminal_persistent: bool,
// The user token must be set when terminal is enabled.
@@ -459,6 +463,7 @@ impl Connection {
tx_from_authed,
printer_data: Vec::new(),
tx_post_seq,
cm_read_job_ids: HashSet::new(),
terminal_service_id: "".to_owned(),
terminal_persistent: false,
#[cfg(not(any(target_os = "android", target_os = "ios")))]
@@ -717,6 +722,36 @@ impl Connection {
let msg = new_voice_call_request(false);
conn.send(msg).await;
}
ipc::Data::ReadJobInitResult { id, file_num, include_hidden, conn_id, result } => {
if conn_id == conn.inner.id() {
conn.handle_read_job_init_result(id, file_num, include_hidden, result).await;
}
}
ipc::Data::FileBlockFromCM { id, file_num, data, compressed, conn_id } => {
if conn_id == conn.inner.id() {
conn.handle_file_block_from_cm(id, file_num, data, compressed).await;
}
}
ipc::Data::FileReadDone { id, file_num, conn_id } => {
if conn_id == conn.inner.id() {
conn.handle_file_read_done(id, file_num).await;
}
}
ipc::Data::FileReadError { id, file_num, err, conn_id } => {
if conn_id == conn.inner.id() {
conn.handle_file_read_error(id, file_num, err).await;
}
}
ipc::Data::FileDigestFromCM { id, file_num, last_modified, file_size, is_resume, conn_id } => {
if conn_id == conn.inner.id() {
conn.handle_file_digest_from_cm(id, file_num, last_modified, file_size, is_resume).await;
}
}
ipc::Data::AllFilesResult { id, conn_id, path, result } => {
if conn_id == conn.inner.id() {
conn.handle_all_files_result(id, path, result).await;
}
}
_ => {}
}
},
@@ -2666,28 +2701,74 @@ impl Connection {
self.read_dir(&rd.path, rd.include_hidden);
}
Some(file_action::Union::AllFiles(f)) => {
match fs::get_recursive_files(&f.path, f.include_hidden) {
Err(err) => {
self.send(fs::new_error(f.id, err, -1)).await;
}
Ok(files) => {
self.send(fs::new_dir(f.id, f.path, files)).await;
if crate::common::need_fs_cm_send_files() {
self.send_fs(ipc::FS::ReadAllFiles {
path: f.path,
id: f.id,
include_hidden: f.include_hidden,
conn_id: self.inner.id(),
});
} else {
match fs::get_recursive_files(&f.path, f.include_hidden) {
Err(err) => {
log::error!(
"Failed to get recursive files for {}: {}",
f.path,
err
);
self.send(fs::new_error(f.id, err, -1)).await;
}
Ok(files) => {
if let Err(msg) =
crate::ui_cm_interface::check_file_count_limit(
files.len(),
)
{
self.send(fs::new_error(f.id, msg, -1)).await;
} else {
self.send(fs::new_dir(f.id, f.path, files)).await;
}
}
}
}
}
Some(file_action::Union::Send(s)) => {
// server to client
let id = s.id;
let od = can_enable_overwrite_detection(get_version_number(
&self.lr.version,
));
let path = s.path.clone();
let r#type = JobType::from_proto(s.file_type);
let data_source;
match r#type {
let job_type = JobType::from_proto(s.file_type);
match job_type {
JobType::Generic => {
data_source =
fs::DataSource::FilePath(PathBuf::from(&path));
let od = can_enable_overwrite_detection(
get_version_number(&self.lr.version),
);
if crate::common::need_fs_cm_send_files() {
// Delegate file reading to CM on Windows
self.cm_read_job_ids.insert(id);
self.send_fs(ipc::FS::ReadFile {
path,
id,
file_num: s.file_num,
include_hidden: s.include_hidden,
conn_id: self.inner.id(),
overwrite_detection: od,
});
} else {
// Handle file reading in Connection on non-Windows
let data_source =
fs::DataSource::FilePath(PathBuf::from(&path));
self.create_and_start_read_job(
id,
job_type,
data_source,
s.file_num,
s.include_hidden,
od,
path,
true, // check file count limit
)
.await;
}
}
JobType::Printer => {
if let Some((_, _, data)) = self
@@ -2696,49 +2777,26 @@ impl Connection {
.position(|(_, p, _)| *p == path)
.map(|index| self.printer_data.remove(index))
{
data_source = fs::DataSource::MemoryCursor(
let data_source = fs::DataSource::MemoryCursor(
std::io::Cursor::new(data),
);
// Printer jobs don't need file count limit check
self.create_and_start_read_job(
id,
job_type,
data_source,
s.file_num,
s.include_hidden,
true, // always enable overwrite detection for printer
path,
false, // no file count limit for printer
)
.await;
} else {
// Ignore this message if the printer data is not found
return true;
}
}
};
match fs::TransferJob::new_read(
id,
r#type,
"".to_string(),
data_source,
s.file_num,
s.include_hidden,
false,
od,
) {
Err(err) => {
self.send(fs::new_error(id, err, 0)).await;
}
Ok(mut job) => {
self.send(fs::new_dir(id, path, job.files().to_vec()))
.await;
let files = job.files().to_owned();
job.is_remote = true;
job.conn_id = self.inner.id();
let job_type = job.r#type;
self.read_jobs.push(job);
self.file_timer =
crate::rustdesk_interval(time::interval(MILLI1));
self.post_file_audit(
FileAuditType::RemoteSend,
if job_type == fs::JobType::Printer {
"Remote print"
} else {
&s.path
},
Self::get_files_for_audit(job_type, files),
json!({}),
);
}
}
self.file_transferred = true;
}
@@ -2805,6 +2863,11 @@ impl Connection {
}
Some(file_action::Union::Cancel(c)) => {
self.send_fs(ipc::FS::CancelWrite { id: c.id });
let _ = self.cm_read_job_ids.remove(&c.id);
self.send_fs(ipc::FS::CancelRead {
id: c.id,
conn_id: self.inner.id(),
});
if let Some(job) = fs::remove_job(c.id, &mut self.read_jobs) {
self.send_to_cm(ipc::Data::FileTransferLog((
"transfer".to_string(),
@@ -2815,6 +2878,15 @@ impl Connection {
Some(file_action::Union::SendConfirm(r)) => {
if let Some(job) = fs::get_job(r.id, &mut self.read_jobs) {
job.confirm(&r).await;
} else if self.cm_read_job_ids.contains(&r.id) {
// Forward to CM for CM-read jobs
self.send_fs(ipc::FS::SendConfirmForRead {
id: r.id,
file_num: r.file_num,
skip: r.skip(),
offset_blk: r.offset_blk(),
conn_id: self.inner.id(),
});
} else {
if let Ok(sc) = r.write_to_bytes() {
self.send_fs(ipc::FS::SendConfirm(sc));
@@ -4013,6 +4085,219 @@ impl Connection {
raii::AuthedConnID::check_remove_session(self.inner.id(), self.session_key());
}
async fn handle_read_job_init_result(
&mut self,
id: i32,
_file_num: i32,
_include_hidden: bool,
result: Result<Vec<u8>, String>,
) {
// Check if this response is still expected (not stale/cancelled)
if !self.cm_read_job_ids.contains(&id) {
log::warn!(
"Received ReadJobInitResult for unknown or stale job id={}, ignoring",
id
);
return;
}
match result {
Err(error) => {
self.cm_read_job_ids.remove(&id);
self.send(fs::new_error(id, error, 0)).await;
}
Ok(dir_bytes) => {
// Deserialize FileDirectory from protobuf bytes
let dir = match FileDirectory::parse_from_bytes(&dir_bytes) {
Ok(d) => d,
Err(e) => {
log::error!("Failed to parse FileDirectory: {}", e);
self.cm_read_job_ids.remove(&id);
self.send(fs::new_error(id, "internal error".to_string(), 0))
.await;
return;
}
};
let path_str = dir.path.clone();
let file_entries: Vec<FileEntry> = dir.entries.into();
// Send file directory to client
self.send(fs::new_dir(id, path_str.clone(), file_entries.clone()))
.await;
// Post audit for file transfer
self.post_file_audit(
FileAuditType::RemoteSend,
&path_str,
Self::get_files_for_audit(fs::JobType::Generic, file_entries),
json!({}),
);
// CM will handle the actual file reading and send blocks via IPC
self.file_transferred = true;
}
}
}
async fn handle_file_block_from_cm(
&mut self,
id: i32,
file_num: i32,
data: bytes::Bytes,
compressed: bool,
) {
// Check if the job is still valid (not cancelled)
if !self.cm_read_job_ids.contains(&id) {
log::debug!(
"Dropping file block for cancelled/unknown job id={}, file_num={}",
id,
file_num
);
return;
}
// Forward file block to client
let mut block = FileTransferBlock::new();
block.id = id;
block.file_num = file_num;
block.data = data.to_vec().into();
block.compressed = compressed;
let mut msg = Message::new();
let mut fr = FileResponse::new();
fr.set_block(block);
msg.set_file_response(fr);
self.send(msg).await;
}
async fn handle_file_read_done(&mut self, id: i32, file_num: i32) {
// Drop stale completions for cancelled/unknown jobs
if !self.cm_read_job_ids.remove(&id) {
log::debug!(
"Dropping FileReadDone for cancelled/unknown job id={}, file_num={}",
id,
file_num
);
return;
}
// Forward done message to client
let mut done = FileTransferDone::new();
done.id = id;
done.file_num = file_num;
let mut msg = Message::new();
let mut fr = FileResponse::new();
fr.set_done(done);
msg.set_file_response(fr);
self.send(msg).await;
}
async fn handle_file_read_error(&mut self, id: i32, file_num: i32, err: String) {
// Drop stale errors for cancelled/unknown jobs
if !self.cm_read_job_ids.remove(&id) {
log::debug!(
"Dropping FileReadError for cancelled/unknown job id={}, file_num={}",
id,
file_num
);
return;
}
// Forward error to client
self.send(fs::new_error(id, err, file_num)).await;
}
async fn handle_file_digest_from_cm(
&mut self,
id: i32,
file_num: i32,
last_modified: u64,
file_size: u64,
is_resume: bool,
) {
// Check if the job is still valid (not cancelled)
if !self.cm_read_job_ids.contains(&id) {
log::debug!(
"Dropping digest for cancelled/unknown job id={}, file_num={}",
id,
file_num
);
return;
}
// Forward digest to client for overwrite detection
let mut digest = FileTransferDigest::new();
digest.id = id;
digest.file_num = file_num;
digest.last_modified = last_modified;
digest.file_size = file_size;
digest.is_upload = false; // Server sending to client
digest.is_resume = is_resume;
let mut msg = Message::new();
let mut fr = FileResponse::new();
fr.set_digest(digest);
msg.set_file_response(fr);
self.send(msg).await;
}
async fn process_new_read_job(&mut self, mut job: fs::TransferJob, path: String) {
let files = job.files().to_owned();
let job_type = job.r#type;
self.send(fs::new_dir(job.id, path.clone(), files.clone()))
.await;
job.is_remote = true;
job.conn_id = self.inner.id();
self.read_jobs.push(job);
self.file_timer = crate::rustdesk_interval(time::interval(MILLI1));
let audit_path = if job_type == fs::JobType::Printer {
"Remote print".to_owned()
} else {
path
};
self.post_file_audit(
FileAuditType::RemoteSend,
&audit_path,
Self::get_files_for_audit(job_type, files),
json!({}),
);
}
async fn handle_all_files_result(
&mut self,
id: i32,
path: String,
result: Result<Vec<u8>, String>,
) {
match result {
Err(err) => {
self.send(fs::new_error(id, err, -1)).await;
}
Ok(bytes) => {
// Deserialize FileDirectory from protobuf bytes and send as FileResponse
match FileDirectory::parse_from_bytes(&bytes) {
Ok(fd) => {
let mut msg = Message::new();
let mut fr = FileResponse::new();
fr.set_dir(fd);
msg.set_file_response(fr);
self.send(msg).await;
}
Err(e) => {
self.send(fs::new_error(
id,
format!("deserialize failed for {}: {}", path, e),
-1,
))
.await;
}
}
}
}
}
fn read_empty_dirs(&mut self, dir: &str, include_hidden: bool) {
let dir = dir.to_string();
self.send_fs(ipc::FS::ReadEmptyDirs {
@@ -4029,6 +4314,57 @@ impl Connection {
});
}
/// Create a new read job and start processing it (Connection-side).
///
/// This is a generic Connection-side read job creation helper used for:
/// - Generic file transfers on non-Windows platforms
/// - Printer jobs on all platforms (including Windows)
///
/// On Windows, generic file reads are delegated to CM via `start_read_job()` in
/// `src/ui_cm_interface.rs` for elevated access. Printer jobs bypass this delegation
/// since they read from in-memory data (`MemoryCursor`), not the filesystem.
///
/// Both Connection-side and CM-side implementations use `TransferJob::new_read()`
/// with similar parameters. When modifying job creation logic, ensure both paths
/// stay in sync.
async fn create_and_start_read_job(
&mut self,
id: i32,
job_type: fs::JobType,
data_source: fs::DataSource,
file_num: i32,
include_hidden: bool,
overwrite_detection: bool,
path: String,
check_file_limit: bool,
) {
match fs::TransferJob::new_read(
id,
job_type,
"".to_string(),
data_source,
file_num,
include_hidden,
false,
overwrite_detection,
) {
Err(err) => {
self.send(fs::new_error(id, err, 0)).await;
}
Ok(job) => {
if check_file_limit {
if let Err(msg) =
crate::ui_cm_interface::check_file_count_limit(job.files().len())
{
self.send(fs::new_error(id, msg, -1)).await;
return;
}
}
self.process_new_read_job(job, path).await;
}
}
}
#[inline]
async fn send(&mut self, msg: Message) {
allow_err!(self.stream.send(&msg).await);
@@ -4436,6 +4772,23 @@ async fn start_ipc(
let data = ipc::Data::ClickTime(ct);
stream.send(&data).await?;
}
// FileBlockFromCM: data is always sent separately via send_raw.
// The data field has #[serde(skip)], so it's empty after deserialization.
// Read the raw data bytes following this message.
//
// Note: Empty data (for empty files) is correctly handled. BytesCodec with
// raw=false adds a length prefix, so next_raw() returns empty BytesMut for
// zero-length frames. This mirrors the WriteBlock pattern below.
ipc::Data::FileBlockFromCM { id, file_num, data: _, compressed, conn_id } => {
let raw_data = stream.next_raw().await?;
tx_from_cm.send(ipc::Data::FileBlockFromCM {
id,
file_num,
data: raw_data.into(),
compressed,
conn_id,
})?;
}
_ => {
tx_from_cm.send(data)?;
}

View File

@@ -17,13 +17,12 @@ use rdev::{self, EventType, Key as RdevKey, KeyCode, RawKey};
use rdev::{CGEventSourceStateID, CGEventTapLocation, VirtualInput};
#[cfg(target_os = "linux")]
use scrap::wayland::pipewire::RDP_SESSION_INFO;
#[cfg(target_os = "linux")]
use std::sync::mpsc;
use std::{
convert::TryFrom,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicBool, Ordering},
mpsc,
},
sync::atomic::{AtomicBool, Ordering},
thread,
time::{self, Duration, Instant},
};