mirror of
https://github.com/rustdesk/rustdesk.git
synced 2026-03-07 12:20:03 +03:00
feat(terminal): add reconnection buffer support for persistent sessions (#14377)
* feat(terminal): add reconnection buffer support for persistent sessions Fix two related issues: 1. Reconnecting to persistent sessions shows blank screen - server now automatically sends historical buffer on reconnection via SessionState machine with pending_buffer, eliminating the need for client-initiated buffer requests. 2. Terminal output before view ready causes NaN errors - buffer output chunks on client side until terminal view has valid dimensions, then flush in order on first valid resize. Rust side: - Introduce SessionState enum (Closed/Active) replacing bool is_opened - Auto-attach pending buffer on reconnection in handle_open() - Always drain output channel in read_outputs() to prevent overflow - Increase channel buffer from 100 to 500 - Optimize get_recent() to collect whole chunks (avoids ANSI truncation) - Extract create_terminal_data_response() helper (DRY) - Add reconnected flag to TerminalOpened protobuf message Flutter side: - Buffer output chunks until terminal view has valid dimensions - Flush buffered output on first valid resize via _markViewReady() - Clear terminal on reconnection to avoid duplicate output from buffer replay - Fix max_bytes type (u32) to match protobuf definition - Pass reconnected field through FlutterHandler event Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): add two-phase SIGWINCH for TUI app redraw and session remap on reconnection Fix TUI apps (top, htop) not redrawing after reconnection. A single resize-then-restore is too fast for ncurses to detect a size change, so split across two read_outputs() polling cycles (~30ms apart) to force a full redraw. Also fix reconnection failure when client terminal_id doesn't match any surviving server-side session ID by remapping the lowest surviving session to the requested ID. Rust side: - Add two-phase SIGWINCH state machine (SigwinchPhase: TempResize → Restore → Idle) with retry logic (max 3 attempts per phase) - Add do_sigwinch_resize() for cross-platform PTY resize (direct PTY and Windows helper mode) - Add session remap logic for non-contiguous terminal_id reconnection - Extract try_send_output() helper with rate-limited drop logging (DRY) - Add 3-byte limit to UTF-8 continuation byte skipping in get_recent() to prevent runaway on non-UTF-8 binary data - Remove reconnected flag from flutter.rs (unused on client side) Flutter side: - Add reconnection screen clear and deferred flush logic - Filter self from persistent_sessions restore list - Add comments for web-related changes Signed-off-by: fufesou <linlong1266@gmail.com> --------- Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
@@ -3063,6 +3063,11 @@ Future<void> start_service(bool is_start) async {
|
||||
}
|
||||
|
||||
Future<bool> canBeBlocked() async {
|
||||
if (isWeb) {
|
||||
// Web can only act as a controller, never as a controlled side,
|
||||
// so it should never be blocked by a remote session.
|
||||
return false;
|
||||
}
|
||||
// First check control permission
|
||||
final controlPermission = await bind.mainGetCommon(
|
||||
key: "is-remote-modify-enabled-by-control-permissions");
|
||||
|
||||
@@ -36,6 +36,8 @@ class _TerminalTabPageState extends State<TerminalTabPage> {
|
||||
int _nextTerminalId = 1;
|
||||
// Lightweight idempotency guard for async close operations
|
||||
final Set<String> _closingTabs = {};
|
||||
// When true, all session cleanup should persist (window-level close in progress)
|
||||
bool _windowClosing = false;
|
||||
|
||||
_TerminalTabPageState(Map<String, dynamic> params) {
|
||||
Get.put(DesktopTabController(tabType: DesktopTabType.terminal));
|
||||
@@ -139,6 +141,7 @@ class _TerminalTabPageState extends State<TerminalTabPage> {
|
||||
/// UI tabs are removed immediately; session cleanup runs in parallel with a
|
||||
/// bounded timeout so window close is not blocked indefinitely.
|
||||
Future<void> _closeAllTabs() async {
|
||||
_windowClosing = true;
|
||||
final tabKeys = tabController.state.value.tabs.map((t) => t.key).toList();
|
||||
// Remove all UI tabs immediately (same instant behavior as the old tabController.clear())
|
||||
tabController.clear();
|
||||
@@ -171,8 +174,17 @@ class _TerminalTabPageState extends State<TerminalTabPage> {
|
||||
/// - `true` (window close): persist all sessions, don't close any.
|
||||
/// - `false` (tab close): only persist the last session for the peer,
|
||||
/// close others so only the most recent disconnected session survives.
|
||||
///
|
||||
/// Note: if [_windowClosing] is true, persistAll is forced to true so that
|
||||
/// in-flight _closeTab() calls don't accidentally close sessions that the
|
||||
/// window-close flow intends to preserve.
|
||||
Future<void> _closeTerminalSessionIfNeeded(String tabKey,
|
||||
{bool persistAll = false, int? peerTabCount}) async {
|
||||
// If window close is in progress, override to persist all sessions
|
||||
// even if this call originated from an individual tab close.
|
||||
if (_windowClosing) {
|
||||
persistAll = true;
|
||||
}
|
||||
final parsed = _parseTabKey(tabKey);
|
||||
if (parsed == null) return;
|
||||
final (peerId, terminalId) = parsed;
|
||||
|
||||
@@ -83,7 +83,10 @@ class _TerminalPageState extends State<TerminalPage>
|
||||
// Register this terminal model with FFI for event routing
|
||||
_ffi.registerTerminalModel(widget.terminalId, _terminalModel);
|
||||
|
||||
_showTerminalExtraKeys = mainGetLocalBoolOptionSync(kOptionEnableShowTerminalExtraKeys);
|
||||
// Web desktop users have full hardware keyboard access, so the on-screen
|
||||
// terminal extra keys bar is unnecessary and disabled.
|
||||
_showTerminalExtraKeys = !isWebDesktop &&
|
||||
mainGetLocalBoolOptionSync(kOptionEnableShowTerminalExtraKeys);
|
||||
// Initialize terminal connection
|
||||
WidgetsBinding.instance.addPostFrameCallback((_) {
|
||||
_ffi.dialogManager
|
||||
|
||||
@@ -266,8 +266,8 @@ class TerminalModel with ChangeNotifier {
|
||||
|
||||
void _handleTerminalOpened(Map<String, dynamic> evt) {
|
||||
final bool success = getSuccessFromEvt(evt);
|
||||
final String message = evt['message'] ?? '';
|
||||
final String? serviceId = evt['service_id'];
|
||||
final String message = evt['message']?.toString() ?? '';
|
||||
final String? serviceId = evt['service_id']?.toString();
|
||||
|
||||
debugPrint(
|
||||
'[TerminalModel] Terminal opened response: success=$success, message=$message, service_id=$serviceId');
|
||||
@@ -275,7 +275,18 @@ class TerminalModel with ChangeNotifier {
|
||||
if (success) {
|
||||
_terminalOpened = true;
|
||||
|
||||
// Service ID is now saved on the Rust side in handle_terminal_response
|
||||
// On reconnect ("Reconnected to existing terminal"), server may replay recent output.
|
||||
// If this TerminalView instance is reused (not rebuilt), duplicate lines can appear.
|
||||
// We intentionally accept this tradeoff for now to keep logic simple.
|
||||
|
||||
// Fallback: if terminal view is not yet ready but already has valid
|
||||
// dimensions (e.g. layout completed before open response arrived),
|
||||
// mark view ready now to avoid output stuck in buffer indefinitely.
|
||||
if (!_terminalViewReady &&
|
||||
terminal.viewWidth > 0 &&
|
||||
terminal.viewHeight > 0) {
|
||||
_markViewReady();
|
||||
}
|
||||
|
||||
// Process any buffered input
|
||||
_processBufferedInputAsync().then((_) {
|
||||
@@ -358,8 +369,7 @@ class TerminalModel with ChangeNotifier {
|
||||
// because it only affects the pre-layout buffering window and the
|
||||
// terminal will self-correct on subsequent output.
|
||||
if (text.length >= _kMaxOutputBufferChars) {
|
||||
final truncated =
|
||||
text.substring(text.length - _kMaxOutputBufferChars);
|
||||
final truncated = text.substring(text.length - _kMaxOutputBufferChars);
|
||||
_pendingOutputChunks
|
||||
..clear()
|
||||
..add(truncated);
|
||||
|
||||
@@ -30,8 +30,54 @@ const MAX_OUTPUT_BUFFER_SIZE: usize = 1024 * 1024; // 1MB per terminal
|
||||
const MAX_BUFFER_LINES: usize = 10000;
|
||||
const MAX_SERVICES: usize = 100; // Maximum number of persistent terminal services
|
||||
const SERVICE_IDLE_TIMEOUT: Duration = Duration::from_secs(3600); // 1 hour idle timeout
|
||||
const CHANNEL_BUFFER_SIZE: usize = 100; // Number of messages to buffer in channel
|
||||
const CHANNEL_BUFFER_SIZE: usize = 500; // Channel buffer size. Max per-message size ~4KB (reader buffer), so worst case ~500*4KB ≈ 2MB/terminal. Increased from 100 to reduce data loss during disconnects.
|
||||
const COMPRESS_THRESHOLD: usize = 512; // Compress terminal data larger than this
|
||||
// Default max bytes for reconnection buffer replay.
|
||||
const DEFAULT_RECONNECT_BUFFER_BYTES: usize = 8 * 1024;
|
||||
const MAX_SIGWINCH_PHASE_ATTEMPTS: u8 = 3; // Max attempts per SIGWINCH phase before giving up
|
||||
|
||||
/// Two-phase SIGWINCH trigger for TUI app redraw on reconnection.
|
||||
///
|
||||
/// Why two phases? A single resize-then-restore done back-to-back is too fast:
|
||||
/// by the time the TUI app handles the asynchronous SIGWINCH signal and calls
|
||||
/// `ioctl(TIOCGWINSZ)`, the PTY size has already been restored to the original.
|
||||
/// ncurses sees no size change and skips the full redraw.
|
||||
///
|
||||
/// Splitting across two `read_outputs()` calls (~30ms apart) ensures the app
|
||||
/// sees a real size change on each SIGWINCH, forcing a complete redraw.
|
||||
#[derive(Debug, Clone)]
|
||||
enum SigwinchPhase {
|
||||
/// No SIGWINCH needed.
|
||||
Idle,
|
||||
/// Phase 1: Resize PTY to temp dimensions (rows±1). The app handles SIGWINCH
|
||||
/// and redraws at the temporary size.
|
||||
TempResize { retries: u8 },
|
||||
/// Phase 2: Restore PTY to correct dimensions. The app handles SIGWINCH,
|
||||
/// detects the size change, and performs a full redraw at the correct size.
|
||||
Restore { retries: u8 },
|
||||
}
|
||||
|
||||
/// Which resize to perform in the two-phase SIGWINCH sequence.
|
||||
enum SigwinchAction {
|
||||
/// Phase 1: resize to temp dimensions (rows±1) to trigger SIGWINCH with a visible size change.
|
||||
TempResize,
|
||||
/// Phase 2: restore to correct dimensions to trigger SIGWINCH and force full redraw.
|
||||
Restore,
|
||||
}
|
||||
|
||||
/// Session state machine for terminal streaming.
|
||||
#[derive(Debug)]
|
||||
enum SessionState {
|
||||
/// Session is closed, not streaming data to client.
|
||||
Closed,
|
||||
/// Session is active, streaming data to client.
|
||||
/// pending_buffer: historical buffer to send before real-time data (set on reconnection).
|
||||
/// sigwinch: two-phase SIGWINCH trigger state for TUI app redraw.
|
||||
Active {
|
||||
pending_buffer: Option<Vec<u8>>,
|
||||
sigwinch: SigwinchPhase,
|
||||
},
|
||||
}
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
// Global registry of persistent terminal services indexed by service_id
|
||||
@@ -433,22 +479,103 @@ impl OutputBuffer {
|
||||
}
|
||||
|
||||
fn get_recent(&self, max_bytes: usize) -> Vec<u8> {
|
||||
let mut result = Vec::new();
|
||||
if max_bytes == 0 {
|
||||
return Vec::new();
|
||||
}
|
||||
let mut chunks: Vec<&[u8]> = Vec::new();
|
||||
let mut size = 0;
|
||||
|
||||
// Get recent lines up to max_bytes
|
||||
// Collect whole chunks from newest to oldest, preserving chronological continuity.
|
||||
// If the newest chunk alone exceeds max_bytes, take its tail (truncation may split
|
||||
// an ANSI escape, but the terminal will self-correct on subsequent output).
|
||||
for line in self.lines.iter().rev() {
|
||||
if size + line.len() > max_bytes {
|
||||
if size == 0 && line.len() > max_bytes {
|
||||
// Single oversized chunk: take the tail to preserve the most recent content.
|
||||
// Align offset forward to a UTF-8 char boundary so that downstream
|
||||
// clients (e.g. Dart) that decode the payload as UTF-8 text don't
|
||||
// encounter split code points. The protobuf bytes field itself allows
|
||||
// arbitrary bytes; this is a best-effort mitigation for client-side decoding.
|
||||
let mut offset = line.len() - max_bytes;
|
||||
// Skip at most 3 continuation bytes (UTF-8 max 4-byte sequence).
|
||||
// Prevents runaway skipping on non-UTF-8 binary data.
|
||||
let mut skipped = 0u8;
|
||||
while skipped < 3
|
||||
&& offset < line.len()
|
||||
&& (line[offset] & 0b1100_0000) == 0b1000_0000
|
||||
{
|
||||
offset += 1;
|
||||
skipped += 1;
|
||||
}
|
||||
// If we skipped past all remaining bytes (degenerate data), drop the
|
||||
// chunk entirely rather than emitting a slice that decodes poorly on the client.
|
||||
if offset < line.len() {
|
||||
chunks.push(&line[offset..]);
|
||||
size = line.len() - offset;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
size += line.len();
|
||||
result.splice(0..0, line.iter().cloned());
|
||||
chunks.push(line);
|
||||
}
|
||||
|
||||
// Reverse to restore chronological order and concatenate
|
||||
chunks.reverse();
|
||||
let mut result = Vec::with_capacity(size);
|
||||
for chunk in chunks {
|
||||
result.extend_from_slice(chunk);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to send data through the output channel with rate-limited drop logging.
|
||||
/// Returns `true` if the caller should break out of the read loop (channel disconnected).
|
||||
fn try_send_output(
|
||||
output_tx: &mpsc::SyncSender<Vec<u8>>,
|
||||
data: Vec<u8>,
|
||||
terminal_id: i32,
|
||||
label: &str,
|
||||
drop_count: &mut u64,
|
||||
last_drop_warn: &mut Instant,
|
||||
) -> bool {
|
||||
match output_tx.try_send(data) {
|
||||
Ok(_) => {
|
||||
if *drop_count > 0 {
|
||||
log::trace!(
|
||||
"Terminal {}{} output channel recovered, dropped {} chunks since last report",
|
||||
terminal_id,
|
||||
label,
|
||||
*drop_count
|
||||
);
|
||||
*drop_count = 0;
|
||||
}
|
||||
false
|
||||
}
|
||||
Err(mpsc::TrySendError::Full(_)) => {
|
||||
*drop_count += 1;
|
||||
if last_drop_warn.elapsed() >= Duration::from_secs(5) {
|
||||
log::trace!(
|
||||
"Terminal {}{} output channel full, dropped {} chunks in last {:?}",
|
||||
terminal_id,
|
||||
label,
|
||||
*drop_count,
|
||||
last_drop_warn.elapsed()
|
||||
);
|
||||
*drop_count = 0;
|
||||
*last_drop_warn = Instant::now();
|
||||
}
|
||||
false
|
||||
}
|
||||
Err(mpsc::TrySendError::Disconnected(_)) => {
|
||||
log::debug!("Terminal {}{} output channel disconnected", terminal_id, label);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TerminalSession {
|
||||
pub created_at: Instant,
|
||||
last_activity: Instant,
|
||||
@@ -469,7 +596,8 @@ pub struct TerminalSession {
|
||||
cols: u16,
|
||||
// Track if we've already sent the closed message
|
||||
closed_message_sent: bool,
|
||||
is_opened: bool,
|
||||
// Session state machine for reconnection handling
|
||||
state: SessionState,
|
||||
// Helper mode: PTY is managed by helper process, communication via message protocol
|
||||
#[cfg(target_os = "windows")]
|
||||
is_helper_mode: bool,
|
||||
@@ -496,7 +624,7 @@ impl TerminalSession {
|
||||
rows,
|
||||
cols,
|
||||
closed_message_sent: false,
|
||||
is_opened: false,
|
||||
state: SessionState::Closed,
|
||||
#[cfg(target_os = "windows")]
|
||||
is_helper_mode: false,
|
||||
#[cfg(target_os = "windows")]
|
||||
@@ -511,7 +639,7 @@ impl TerminalSession {
|
||||
// This helper function is to ensure that the threads are joined before the child process is dropped.
|
||||
// Though this is not strictly necessary on macOS.
|
||||
fn stop(&mut self) {
|
||||
self.is_opened = false;
|
||||
self.state = SessionState::Closed;
|
||||
self.exiting.store(true, Ordering::SeqCst);
|
||||
|
||||
// Drop the input channel to signal writer thread to exit
|
||||
@@ -668,7 +796,9 @@ impl PersistentTerminalService {
|
||||
(
|
||||
session.rows,
|
||||
session.cols,
|
||||
session.output_buffer.get_recent(4096),
|
||||
session
|
||||
.output_buffer
|
||||
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES),
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -683,7 +813,7 @@ impl PersistentTerminalService {
|
||||
self.needs_session_sync = true;
|
||||
for session in self.sessions.values() {
|
||||
let mut session = session.lock().unwrap();
|
||||
session.is_opened = false;
|
||||
session.state = SessionState::Closed;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -807,7 +937,25 @@ impl TerminalServiceProxy {
|
||||
if let Some(session_arc) = service.sessions.get(&open.terminal_id) {
|
||||
// Reconnect to existing terminal
|
||||
let mut session = session_arc.lock().unwrap();
|
||||
session.is_opened = true;
|
||||
// Directly enter Active state with pending buffer for immediate streaming.
|
||||
// Historical buffer is sent first by read_outputs(), then real-time data follows.
|
||||
// No overlap: pending_buffer comes from output_buffer (pre-disconnect history),
|
||||
// while received_data in read_outputs() comes from the channel (post-reconnect).
|
||||
// During disconnect, the run loop (sp.ok()) exits so read_outputs() stops being
|
||||
// called; output_buffer is not updated, and channel data may be lost if it fills up.
|
||||
let buffer = session
|
||||
.output_buffer
|
||||
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
|
||||
let has_pending = !buffer.is_empty();
|
||||
session.state = SessionState::Active {
|
||||
pending_buffer: if has_pending { Some(buffer) } else { None },
|
||||
// Always trigger two-phase SIGWINCH on reconnect to force TUI app redraw,
|
||||
// regardless of whether there's pending buffer data. This avoids edge cases
|
||||
// where buffer is empty but a TUI app (top/htop) still needs a full redraw.
|
||||
sigwinch: SigwinchPhase::TempResize {
|
||||
retries: MAX_SIGWINCH_PHASE_ATTEMPTS,
|
||||
},
|
||||
};
|
||||
let mut opened = TerminalOpened::new();
|
||||
opened.terminal_id = open.terminal_id;
|
||||
opened.success = true;
|
||||
@@ -829,13 +977,6 @@ impl TerminalServiceProxy {
|
||||
}
|
||||
response.set_opened(opened);
|
||||
|
||||
// Send buffered output
|
||||
let buffer = session.output_buffer.get_recent(4096);
|
||||
if !buffer.is_empty() {
|
||||
// We'll need to send this separately or extend the protocol
|
||||
// For now, just acknowledge the reconnection
|
||||
}
|
||||
|
||||
return Ok(Some(response));
|
||||
}
|
||||
|
||||
@@ -945,6 +1086,9 @@ impl TerminalServiceProxy {
|
||||
let reader_thread = thread::spawn(move || {
|
||||
let mut reader = reader;
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let mut drop_count: u64 = 0;
|
||||
// Initialize to > 5s ago so the first drop triggers a warning immediately.
|
||||
let mut last_drop_warn = Instant::now() - Duration::from_secs(6);
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => {
|
||||
@@ -958,21 +1102,24 @@ impl TerminalServiceProxy {
|
||||
break;
|
||||
}
|
||||
let data = buf[..n].to_vec();
|
||||
// Try to send, if channel is full, drop the data
|
||||
match output_tx.try_send(data) {
|
||||
Ok(_) => {}
|
||||
Err(mpsc::TrySendError::Full(_)) => {
|
||||
log::debug!(
|
||||
"Terminal {} output channel full, dropping data",
|
||||
terminal_id
|
||||
);
|
||||
}
|
||||
Err(mpsc::TrySendError::Disconnected(_)) => {
|
||||
log::debug!("Terminal {} output channel disconnected", terminal_id);
|
||||
// Use try_send to avoid blocking the reader thread when channel is full.
|
||||
// During disconnect, the run loop (sp.ok()) stops and read_outputs() is
|
||||
// no longer called, so the channel won't be drained. Blocking send would
|
||||
// deadlock the reader thread in that case.
|
||||
// Note: data produced during disconnect may be lost if channel fills up,
|
||||
// since output_buffer is only updated in read_outputs(). The buffer will
|
||||
// contain history from before the disconnect, not data produced after it.
|
||||
if try_send_output(
|
||||
&output_tx,
|
||||
data,
|
||||
terminal_id,
|
||||
"",
|
||||
&mut drop_count,
|
||||
&mut last_drop_warn,
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
// This branch is not reached in my tests, but we still add `exiting` check to ensure we can exit.
|
||||
if exiting.load(Ordering::SeqCst) {
|
||||
@@ -996,7 +1143,10 @@ impl TerminalServiceProxy {
|
||||
session.output_rx = Some(output_rx);
|
||||
session.reader_thread = Some(reader_thread);
|
||||
session.writer_thread = Some(writer_thread);
|
||||
session.is_opened = true;
|
||||
session.state = SessionState::Active {
|
||||
pending_buffer: None,
|
||||
sigwinch: SigwinchPhase::Idle,
|
||||
};
|
||||
|
||||
let mut opened = TerminalOpened::new();
|
||||
opened.terminal_id = open.terminal_id;
|
||||
@@ -1158,6 +1308,9 @@ impl TerminalServiceProxy {
|
||||
let terminal_id = open.terminal_id;
|
||||
let reader_thread = thread::spawn(move || {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let mut drop_count: u64 = 0;
|
||||
// Initialize to > 5s ago so the first drop triggers a warning immediately.
|
||||
let mut last_drop_warn = Instant::now() - Duration::from_secs(6);
|
||||
loop {
|
||||
match output_pipe.read(&mut buf) {
|
||||
Ok(0) => {
|
||||
@@ -1170,20 +1323,18 @@ impl TerminalServiceProxy {
|
||||
break;
|
||||
}
|
||||
let data = buf[..n].to_vec();
|
||||
match output_tx.try_send(data) {
|
||||
Ok(_) => {}
|
||||
Err(mpsc::TrySendError::Full(_)) => {
|
||||
log::debug!(
|
||||
"Terminal {} output channel full, dropping data",
|
||||
terminal_id
|
||||
);
|
||||
}
|
||||
Err(mpsc::TrySendError::Disconnected(_)) => {
|
||||
log::debug!("Terminal {} output channel disconnected", terminal_id);
|
||||
// Use try_send to avoid blocking the reader thread (same as direct PTY mode)
|
||||
if try_send_output(
|
||||
&output_tx,
|
||||
data,
|
||||
terminal_id,
|
||||
" (helper)",
|
||||
&mut drop_count,
|
||||
&mut last_drop_warn,
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
// Defensive: WouldBlock is unlikely with synchronous File::read(),
|
||||
// but handle it gracefully just in case.
|
||||
@@ -1211,7 +1362,10 @@ impl TerminalServiceProxy {
|
||||
session.output_rx = Some(output_rx);
|
||||
session.reader_thread = Some(reader_thread);
|
||||
session.writer_thread = Some(writer_thread);
|
||||
session.is_opened = true;
|
||||
session.state = SessionState::Active {
|
||||
pending_buffer: None,
|
||||
sigwinch: SigwinchPhase::Idle,
|
||||
};
|
||||
session.is_helper_mode = true;
|
||||
session.helper_process_handle = Some(SendableHandle::new(helper_raw_handle));
|
||||
|
||||
@@ -1253,6 +1407,11 @@ impl TerminalServiceProxy {
|
||||
session.rows = resize.rows as u16;
|
||||
session.cols = resize.cols as u16;
|
||||
|
||||
// Note: we do NOT clear the sigwinch phase here. The server-side two-phase
|
||||
// SIGWINCH mechanism in read_outputs() is self-contained (temp resize → restore
|
||||
// across two polling cycles), so client resize is purely a dimension sync and
|
||||
// doesn't affect it.
|
||||
|
||||
// Windows: handle helper mode vs direct PTY mode
|
||||
#[cfg(target_os = "windows")]
|
||||
{
|
||||
@@ -1358,6 +1517,116 @@ impl TerminalServiceProxy {
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform a single PTY resize as part of the two-phase SIGWINCH sequence.
|
||||
/// Returns true if the resize succeeded.
|
||||
///
|
||||
/// Takes individual field references to avoid borrowing the entire TerminalSession,
|
||||
/// which would conflict with the mutable borrow of session.state in read_outputs().
|
||||
fn do_sigwinch_resize(
|
||||
terminal_id: i32,
|
||||
rows: u16,
|
||||
cols: u16,
|
||||
pty_pair: &Option<portable_pty::PtyPair>,
|
||||
input_tx: &Option<SyncSender<Vec<u8>>>,
|
||||
_is_helper_mode: bool,
|
||||
action: &SigwinchAction,
|
||||
) -> bool {
|
||||
// Skip if dimensions are not initialized (shouldn't happen on reconnect,
|
||||
// but guard against it to avoid resizing to nonsensical values).
|
||||
if rows == 0 || cols == 0 {
|
||||
return false;
|
||||
}
|
||||
|
||||
let target_rows = match action {
|
||||
SigwinchAction::TempResize => {
|
||||
// For very small terminals (≤2 rows), subtracting 1 would result in an unusable
|
||||
// size (0 or 1 row), so we add 1 instead. Either direction triggers SIGWINCH.
|
||||
if rows > 2 {
|
||||
rows.saturating_sub(1)
|
||||
} else {
|
||||
rows.saturating_add(1)
|
||||
}
|
||||
}
|
||||
SigwinchAction::Restore => rows,
|
||||
};
|
||||
|
||||
let phase_name = match action {
|
||||
SigwinchAction::TempResize => "temp resize",
|
||||
SigwinchAction::Restore => "restore",
|
||||
};
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
let use_helper = _is_helper_mode;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
let use_helper = false;
|
||||
|
||||
if use_helper {
|
||||
#[cfg(target_os = "windows")]
|
||||
{
|
||||
let input_tx = match input_tx {
|
||||
Some(tx) => tx,
|
||||
None => return false,
|
||||
};
|
||||
let msg = encode_resize_message(target_rows, cols);
|
||||
if let Err(e) = input_tx.try_send(msg) {
|
||||
log::warn!(
|
||||
"Terminal {} SIGWINCH {} via helper failed: {}",
|
||||
terminal_id,
|
||||
phase_name,
|
||||
e
|
||||
);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
{
|
||||
let _ = (input_tx, phase_name);
|
||||
false
|
||||
}
|
||||
} else if let Some(pty_pair) = pty_pair {
|
||||
if let Err(e) = pty_pair.master.resize(PtySize {
|
||||
rows: target_rows,
|
||||
cols,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
}) {
|
||||
log::warn!(
|
||||
"Terminal {} SIGWINCH {} failed: {}",
|
||||
terminal_id,
|
||||
phase_name,
|
||||
e
|
||||
);
|
||||
return false;
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to create a TerminalResponse with optional compression.
|
||||
fn create_terminal_data_response(terminal_id: i32, data: Vec<u8>) -> TerminalResponse {
|
||||
let mut response = TerminalResponse::new();
|
||||
let mut terminal_data = TerminalData::new();
|
||||
terminal_data.terminal_id = terminal_id;
|
||||
|
||||
if data.len() > COMPRESS_THRESHOLD {
|
||||
let compressed = compress::compress(&data);
|
||||
if compressed.len() < data.len() {
|
||||
terminal_data.data = bytes::Bytes::from(compressed);
|
||||
terminal_data.compressed = true;
|
||||
} else {
|
||||
terminal_data.data = bytes::Bytes::from(data);
|
||||
}
|
||||
} else {
|
||||
terminal_data.data = bytes::Bytes::from(data);
|
||||
}
|
||||
|
||||
response.set_data(terminal_data);
|
||||
response
|
||||
}
|
||||
|
||||
pub fn read_outputs(&self) -> Vec<TerminalResponse> {
|
||||
let service = match get_service(&self.service_id) {
|
||||
Some(s) => s,
|
||||
@@ -1399,12 +1668,11 @@ impl TerminalServiceProxy {
|
||||
closed_terminals.push(terminal_id);
|
||||
}
|
||||
|
||||
if !session.is_opened {
|
||||
// Skip the session if it is not opened.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Read from output channel
|
||||
// Always drain the output channel regardless of session state.
|
||||
// When Active: data is sent to client. When Closed (within the same
|
||||
// connection): data is buffered in output_buffer for reconnection replay.
|
||||
// Note: during actual disconnect, the run loop exits and read_outputs()
|
||||
// is not called, so channel data produced after disconnect may be lost.
|
||||
let mut has_activity = false;
|
||||
let mut received_data = Vec::new();
|
||||
if let Some(output_rx) = &session.output_rx {
|
||||
@@ -1415,37 +1683,111 @@ impl TerminalServiceProxy {
|
||||
}
|
||||
}
|
||||
|
||||
// Update buffer after reading
|
||||
if has_activity {
|
||||
session.update_activity();
|
||||
}
|
||||
|
||||
// Update buffer (always buffer for reconnection support)
|
||||
for data in &received_data {
|
||||
session.output_buffer.append(data);
|
||||
}
|
||||
|
||||
// Process received data for responses
|
||||
// Skip sending responses if session is not Active.
|
||||
// Data is already buffered above and will be sent on next reconnection.
|
||||
// Use a scoped block to limit the mutable borrow of session.state,
|
||||
// so we can immutably borrow other session fields afterwards.
|
||||
let sigwinch_action = {
|
||||
let (pending_buffer, sigwinch) = match &mut session.state {
|
||||
SessionState::Active {
|
||||
pending_buffer,
|
||||
sigwinch,
|
||||
} => (pending_buffer, sigwinch),
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
// Send pending buffer response first (set on reconnection in handle_open).
|
||||
// This ensures historical buffer is sent before any real-time data.
|
||||
if let Some(buffer) = pending_buffer.take() {
|
||||
if !buffer.is_empty() {
|
||||
responses
|
||||
.push(Self::create_terminal_data_response(terminal_id, buffer));
|
||||
}
|
||||
}
|
||||
|
||||
// Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale.
|
||||
// Each phase is a single PTY resize, spaced ~30ms apart by the polling
|
||||
// interval, ensuring the TUI app sees a real size change on each signal.
|
||||
match sigwinch {
|
||||
SigwinchPhase::TempResize { retries } => {
|
||||
if *retries == 0 {
|
||||
log::warn!(
|
||||
"Terminal {} SIGWINCH phase 1 (temp resize) failed after {} attempts, giving up",
|
||||
terminal_id, MAX_SIGWINCH_PHASE_ATTEMPTS
|
||||
);
|
||||
*sigwinch = SigwinchPhase::Idle;
|
||||
None
|
||||
} else {
|
||||
*retries -= 1;
|
||||
Some(SigwinchAction::TempResize)
|
||||
}
|
||||
}
|
||||
SigwinchPhase::Restore { retries } => {
|
||||
if *retries == 0 {
|
||||
log::warn!(
|
||||
"Terminal {} SIGWINCH phase 2 (restore) failed after {} attempts, giving up",
|
||||
terminal_id, MAX_SIGWINCH_PHASE_ATTEMPTS
|
||||
);
|
||||
*sigwinch = SigwinchPhase::Idle;
|
||||
None
|
||||
} else {
|
||||
*retries -= 1;
|
||||
Some(SigwinchAction::Restore)
|
||||
}
|
||||
}
|
||||
SigwinchPhase::Idle => None,
|
||||
}
|
||||
};
|
||||
|
||||
// Execute SIGWINCH resize outside the mutable borrow scope of session.state.
|
||||
if let Some(action) = sigwinch_action {
|
||||
#[cfg(target_os = "windows")]
|
||||
let is_helper = session.is_helper_mode;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
let is_helper = false;
|
||||
let resize_ok = Self::do_sigwinch_resize(
|
||||
terminal_id,
|
||||
session.rows,
|
||||
session.cols,
|
||||
&session.pty_pair,
|
||||
&session.input_tx,
|
||||
is_helper,
|
||||
&action,
|
||||
);
|
||||
if let SessionState::Active { sigwinch, .. } = &mut session.state {
|
||||
match action {
|
||||
SigwinchAction::TempResize => {
|
||||
if resize_ok {
|
||||
// Phase 1 succeeded — advance to phase 2 (restore).
|
||||
*sigwinch = SigwinchPhase::Restore {
|
||||
retries: MAX_SIGWINCH_PHASE_ATTEMPTS,
|
||||
};
|
||||
}
|
||||
// If failed, retries already decremented; will retry phase 1.
|
||||
}
|
||||
SigwinchAction::Restore => {
|
||||
if resize_ok {
|
||||
// Phase 2 succeeded — SIGWINCH sequence complete.
|
||||
*sigwinch = SigwinchPhase::Idle;
|
||||
}
|
||||
// If failed, retries already decremented; will retry phase 2.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send real-time data after historical buffer
|
||||
for data in received_data {
|
||||
let mut response = TerminalResponse::new();
|
||||
let mut terminal_data = TerminalData::new();
|
||||
terminal_data.terminal_id = terminal_id;
|
||||
|
||||
// Compress data if it exceeds threshold
|
||||
if data.len() > COMPRESS_THRESHOLD {
|
||||
let compressed = compress::compress(&data);
|
||||
if compressed.len() < data.len() {
|
||||
terminal_data.data = bytes::Bytes::from(compressed);
|
||||
terminal_data.compressed = true;
|
||||
} else {
|
||||
// Compression didn't help, send uncompressed
|
||||
terminal_data.data = bytes::Bytes::from(data);
|
||||
}
|
||||
} else {
|
||||
terminal_data.data = bytes::Bytes::from(data);
|
||||
}
|
||||
|
||||
response.set_data(terminal_data);
|
||||
responses.push(response);
|
||||
}
|
||||
|
||||
if has_activity {
|
||||
session.update_activity();
|
||||
responses.push(Self::create_terminal_data_response(terminal_id, data));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user