From 00160339375eeb6d54fa847b2b937d4a2426817c Mon Sep 17 00:00:00 2001 From: fufesou <13586388+fufesou@users.noreply.github.com> Date: Tue, 24 Feb 2026 21:12:06 +0800 Subject: [PATCH] feat(terminal): add reconnection buffer support for persistent sessions (#14377) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(terminal): add reconnection buffer support for persistent sessions Fix two related issues: 1. Reconnecting to persistent sessions shows blank screen - server now automatically sends historical buffer on reconnection via SessionState machine with pending_buffer, eliminating the need for client-initiated buffer requests. 2. Terminal output before view ready causes NaN errors - buffer output chunks on client side until terminal view has valid dimensions, then flush in order on first valid resize. Rust side: - Introduce SessionState enum (Closed/Active) replacing bool is_opened - Auto-attach pending buffer on reconnection in handle_open() - Always drain output channel in read_outputs() to prevent overflow - Increase channel buffer from 100 to 500 - Optimize get_recent() to collect whole chunks (avoids ANSI truncation) - Extract create_terminal_data_response() helper (DRY) - Add reconnected flag to TerminalOpened protobuf message Flutter side: - Buffer output chunks until terminal view has valid dimensions - Flush buffered output on first valid resize via _markViewReady() - Clear terminal on reconnection to avoid duplicate output from buffer replay - Fix max_bytes type (u32) to match protobuf definition - Pass reconnected field through FlutterHandler event Signed-off-by: fufesou * fix(terminal): add two-phase SIGWINCH for TUI app redraw and session remap on reconnection Fix TUI apps (top, htop) not redrawing after reconnection. A single resize-then-restore is too fast for ncurses to detect a size change, so split across two read_outputs() polling cycles (~30ms apart) to force a full redraw. Also fix reconnection failure when client terminal_id doesn't match any surviving server-side session ID by remapping the lowest surviving session to the requested ID. Rust side: - Add two-phase SIGWINCH state machine (SigwinchPhase: TempResize → Restore → Idle) with retry logic (max 3 attempts per phase) - Add do_sigwinch_resize() for cross-platform PTY resize (direct PTY and Windows helper mode) - Add session remap logic for non-contiguous terminal_id reconnection - Extract try_send_output() helper with rate-limited drop logging (DRY) - Add 3-byte limit to UTF-8 continuation byte skipping in get_recent() to prevent runaway on non-UTF-8 binary data - Remove reconnected flag from flutter.rs (unused on client side) Flutter side: - Add reconnection screen clear and deferred flush logic - Filter self from persistent_sessions restore list - Add comments for web-related changes Signed-off-by: fufesou --------- Signed-off-by: fufesou --- flutter/lib/common.dart | 5 + .../lib/desktop/pages/terminal_tab_page.dart | 12 + flutter/lib/mobile/pages/terminal_page.dart | 5 +- flutter/lib/models/terminal_model.dart | 20 +- src/server/terminal_service.rs | 484 +++++++++++++++--- 5 files changed, 449 insertions(+), 77 deletions(-) diff --git a/flutter/lib/common.dart b/flutter/lib/common.dart index b941632dd..ab1b0b3c5 100644 --- a/flutter/lib/common.dart +++ b/flutter/lib/common.dart @@ -3063,6 +3063,11 @@ Future start_service(bool is_start) async { } Future canBeBlocked() async { + if (isWeb) { + // Web can only act as a controller, never as a controlled side, + // so it should never be blocked by a remote session. + return false; + } // First check control permission final controlPermission = await bind.mainGetCommon( key: "is-remote-modify-enabled-by-control-permissions"); diff --git a/flutter/lib/desktop/pages/terminal_tab_page.dart b/flutter/lib/desktop/pages/terminal_tab_page.dart index bc3ee1a8c..28e59fb05 100644 --- a/flutter/lib/desktop/pages/terminal_tab_page.dart +++ b/flutter/lib/desktop/pages/terminal_tab_page.dart @@ -36,6 +36,8 @@ class _TerminalTabPageState extends State { int _nextTerminalId = 1; // Lightweight idempotency guard for async close operations final Set _closingTabs = {}; + // When true, all session cleanup should persist (window-level close in progress) + bool _windowClosing = false; _TerminalTabPageState(Map params) { Get.put(DesktopTabController(tabType: DesktopTabType.terminal)); @@ -139,6 +141,7 @@ class _TerminalTabPageState extends State { /// UI tabs are removed immediately; session cleanup runs in parallel with a /// bounded timeout so window close is not blocked indefinitely. Future _closeAllTabs() async { + _windowClosing = true; final tabKeys = tabController.state.value.tabs.map((t) => t.key).toList(); // Remove all UI tabs immediately (same instant behavior as the old tabController.clear()) tabController.clear(); @@ -171,8 +174,17 @@ class _TerminalTabPageState extends State { /// - `true` (window close): persist all sessions, don't close any. /// - `false` (tab close): only persist the last session for the peer, /// close others so only the most recent disconnected session survives. + /// + /// Note: if [_windowClosing] is true, persistAll is forced to true so that + /// in-flight _closeTab() calls don't accidentally close sessions that the + /// window-close flow intends to preserve. Future _closeTerminalSessionIfNeeded(String tabKey, {bool persistAll = false, int? peerTabCount}) async { + // If window close is in progress, override to persist all sessions + // even if this call originated from an individual tab close. + if (_windowClosing) { + persistAll = true; + } final parsed = _parseTabKey(tabKey); if (parsed == null) return; final (peerId, terminalId) = parsed; diff --git a/flutter/lib/mobile/pages/terminal_page.dart b/flutter/lib/mobile/pages/terminal_page.dart index ab34a35ec..aff85b40c 100644 --- a/flutter/lib/mobile/pages/terminal_page.dart +++ b/flutter/lib/mobile/pages/terminal_page.dart @@ -83,7 +83,10 @@ class _TerminalPageState extends State // Register this terminal model with FFI for event routing _ffi.registerTerminalModel(widget.terminalId, _terminalModel); - _showTerminalExtraKeys = mainGetLocalBoolOptionSync(kOptionEnableShowTerminalExtraKeys); + // Web desktop users have full hardware keyboard access, so the on-screen + // terminal extra keys bar is unnecessary and disabled. + _showTerminalExtraKeys = !isWebDesktop && + mainGetLocalBoolOptionSync(kOptionEnableShowTerminalExtraKeys); // Initialize terminal connection WidgetsBinding.instance.addPostFrameCallback((_) { _ffi.dialogManager diff --git a/flutter/lib/models/terminal_model.dart b/flutter/lib/models/terminal_model.dart index 764528ab6..a74241ccb 100644 --- a/flutter/lib/models/terminal_model.dart +++ b/flutter/lib/models/terminal_model.dart @@ -266,8 +266,8 @@ class TerminalModel with ChangeNotifier { void _handleTerminalOpened(Map evt) { final bool success = getSuccessFromEvt(evt); - final String message = evt['message'] ?? ''; - final String? serviceId = evt['service_id']; + final String message = evt['message']?.toString() ?? ''; + final String? serviceId = evt['service_id']?.toString(); debugPrint( '[TerminalModel] Terminal opened response: success=$success, message=$message, service_id=$serviceId'); @@ -275,7 +275,18 @@ class TerminalModel with ChangeNotifier { if (success) { _terminalOpened = true; - // Service ID is now saved on the Rust side in handle_terminal_response + // On reconnect ("Reconnected to existing terminal"), server may replay recent output. + // If this TerminalView instance is reused (not rebuilt), duplicate lines can appear. + // We intentionally accept this tradeoff for now to keep logic simple. + + // Fallback: if terminal view is not yet ready but already has valid + // dimensions (e.g. layout completed before open response arrived), + // mark view ready now to avoid output stuck in buffer indefinitely. + if (!_terminalViewReady && + terminal.viewWidth > 0 && + terminal.viewHeight > 0) { + _markViewReady(); + } // Process any buffered input _processBufferedInputAsync().then((_) { @@ -358,8 +369,7 @@ class TerminalModel with ChangeNotifier { // because it only affects the pre-layout buffering window and the // terminal will self-correct on subsequent output. if (text.length >= _kMaxOutputBufferChars) { - final truncated = - text.substring(text.length - _kMaxOutputBufferChars); + final truncated = text.substring(text.length - _kMaxOutputBufferChars); _pendingOutputChunks ..clear() ..add(truncated); diff --git a/src/server/terminal_service.rs b/src/server/terminal_service.rs index ed7d02f68..fb6b4fd29 100644 --- a/src/server/terminal_service.rs +++ b/src/server/terminal_service.rs @@ -30,8 +30,54 @@ const MAX_OUTPUT_BUFFER_SIZE: usize = 1024 * 1024; // 1MB per terminal const MAX_BUFFER_LINES: usize = 10000; const MAX_SERVICES: usize = 100; // Maximum number of persistent terminal services const SERVICE_IDLE_TIMEOUT: Duration = Duration::from_secs(3600); // 1 hour idle timeout -const CHANNEL_BUFFER_SIZE: usize = 100; // Number of messages to buffer in channel +const CHANNEL_BUFFER_SIZE: usize = 500; // Channel buffer size. Max per-message size ~4KB (reader buffer), so worst case ~500*4KB ≈ 2MB/terminal. Increased from 100 to reduce data loss during disconnects. const COMPRESS_THRESHOLD: usize = 512; // Compress terminal data larger than this + // Default max bytes for reconnection buffer replay. +const DEFAULT_RECONNECT_BUFFER_BYTES: usize = 8 * 1024; +const MAX_SIGWINCH_PHASE_ATTEMPTS: u8 = 3; // Max attempts per SIGWINCH phase before giving up + +/// Two-phase SIGWINCH trigger for TUI app redraw on reconnection. +/// +/// Why two phases? A single resize-then-restore done back-to-back is too fast: +/// by the time the TUI app handles the asynchronous SIGWINCH signal and calls +/// `ioctl(TIOCGWINSZ)`, the PTY size has already been restored to the original. +/// ncurses sees no size change and skips the full redraw. +/// +/// Splitting across two `read_outputs()` calls (~30ms apart) ensures the app +/// sees a real size change on each SIGWINCH, forcing a complete redraw. +#[derive(Debug, Clone)] +enum SigwinchPhase { + /// No SIGWINCH needed. + Idle, + /// Phase 1: Resize PTY to temp dimensions (rows±1). The app handles SIGWINCH + /// and redraws at the temporary size. + TempResize { retries: u8 }, + /// Phase 2: Restore PTY to correct dimensions. The app handles SIGWINCH, + /// detects the size change, and performs a full redraw at the correct size. + Restore { retries: u8 }, +} + +/// Which resize to perform in the two-phase SIGWINCH sequence. +enum SigwinchAction { + /// Phase 1: resize to temp dimensions (rows±1) to trigger SIGWINCH with a visible size change. + TempResize, + /// Phase 2: restore to correct dimensions to trigger SIGWINCH and force full redraw. + Restore, +} + +/// Session state machine for terminal streaming. +#[derive(Debug)] +enum SessionState { + /// Session is closed, not streaming data to client. + Closed, + /// Session is active, streaming data to client. + /// pending_buffer: historical buffer to send before real-time data (set on reconnection). + /// sigwinch: two-phase SIGWINCH trigger state for TUI app redraw. + Active { + pending_buffer: Option>, + sigwinch: SigwinchPhase, + }, +} lazy_static::lazy_static! { // Global registry of persistent terminal services indexed by service_id @@ -433,22 +479,103 @@ impl OutputBuffer { } fn get_recent(&self, max_bytes: usize) -> Vec { - let mut result = Vec::new(); + if max_bytes == 0 { + return Vec::new(); + } + let mut chunks: Vec<&[u8]> = Vec::new(); let mut size = 0; - // Get recent lines up to max_bytes + // Collect whole chunks from newest to oldest, preserving chronological continuity. + // If the newest chunk alone exceeds max_bytes, take its tail (truncation may split + // an ANSI escape, but the terminal will self-correct on subsequent output). for line in self.lines.iter().rev() { if size + line.len() > max_bytes { + if size == 0 && line.len() > max_bytes { + // Single oversized chunk: take the tail to preserve the most recent content. + // Align offset forward to a UTF-8 char boundary so that downstream + // clients (e.g. Dart) that decode the payload as UTF-8 text don't + // encounter split code points. The protobuf bytes field itself allows + // arbitrary bytes; this is a best-effort mitigation for client-side decoding. + let mut offset = line.len() - max_bytes; + // Skip at most 3 continuation bytes (UTF-8 max 4-byte sequence). + // Prevents runaway skipping on non-UTF-8 binary data. + let mut skipped = 0u8; + while skipped < 3 + && offset < line.len() + && (line[offset] & 0b1100_0000) == 0b1000_0000 + { + offset += 1; + skipped += 1; + } + // If we skipped past all remaining bytes (degenerate data), drop the + // chunk entirely rather than emitting a slice that decodes poorly on the client. + if offset < line.len() { + chunks.push(&line[offset..]); + size = line.len() - offset; + } + } break; } size += line.len(); - result.splice(0..0, line.iter().cloned()); + chunks.push(line); + } + + // Reverse to restore chronological order and concatenate + chunks.reverse(); + let mut result = Vec::with_capacity(size); + for chunk in chunks { + result.extend_from_slice(chunk); } result } } +/// Try to send data through the output channel with rate-limited drop logging. +/// Returns `true` if the caller should break out of the read loop (channel disconnected). +fn try_send_output( + output_tx: &mpsc::SyncSender>, + data: Vec, + terminal_id: i32, + label: &str, + drop_count: &mut u64, + last_drop_warn: &mut Instant, +) -> bool { + match output_tx.try_send(data) { + Ok(_) => { + if *drop_count > 0 { + log::trace!( + "Terminal {}{} output channel recovered, dropped {} chunks since last report", + terminal_id, + label, + *drop_count + ); + *drop_count = 0; + } + false + } + Err(mpsc::TrySendError::Full(_)) => { + *drop_count += 1; + if last_drop_warn.elapsed() >= Duration::from_secs(5) { + log::trace!( + "Terminal {}{} output channel full, dropped {} chunks in last {:?}", + terminal_id, + label, + *drop_count, + last_drop_warn.elapsed() + ); + *drop_count = 0; + *last_drop_warn = Instant::now(); + } + false + } + Err(mpsc::TrySendError::Disconnected(_)) => { + log::debug!("Terminal {}{} output channel disconnected", terminal_id, label); + true + } + } +} + pub struct TerminalSession { pub created_at: Instant, last_activity: Instant, @@ -469,7 +596,8 @@ pub struct TerminalSession { cols: u16, // Track if we've already sent the closed message closed_message_sent: bool, - is_opened: bool, + // Session state machine for reconnection handling + state: SessionState, // Helper mode: PTY is managed by helper process, communication via message protocol #[cfg(target_os = "windows")] is_helper_mode: bool, @@ -496,7 +624,7 @@ impl TerminalSession { rows, cols, closed_message_sent: false, - is_opened: false, + state: SessionState::Closed, #[cfg(target_os = "windows")] is_helper_mode: false, #[cfg(target_os = "windows")] @@ -511,7 +639,7 @@ impl TerminalSession { // This helper function is to ensure that the threads are joined before the child process is dropped. // Though this is not strictly necessary on macOS. fn stop(&mut self) { - self.is_opened = false; + self.state = SessionState::Closed; self.exiting.store(true, Ordering::SeqCst); // Drop the input channel to signal writer thread to exit @@ -668,7 +796,9 @@ impl PersistentTerminalService { ( session.rows, session.cols, - session.output_buffer.get_recent(4096), + session + .output_buffer + .get_recent(DEFAULT_RECONNECT_BUFFER_BYTES), ) }) } @@ -683,7 +813,7 @@ impl PersistentTerminalService { self.needs_session_sync = true; for session in self.sessions.values() { let mut session = session.lock().unwrap(); - session.is_opened = false; + session.state = SessionState::Closed; } } } @@ -807,7 +937,25 @@ impl TerminalServiceProxy { if let Some(session_arc) = service.sessions.get(&open.terminal_id) { // Reconnect to existing terminal let mut session = session_arc.lock().unwrap(); - session.is_opened = true; + // Directly enter Active state with pending buffer for immediate streaming. + // Historical buffer is sent first by read_outputs(), then real-time data follows. + // No overlap: pending_buffer comes from output_buffer (pre-disconnect history), + // while received_data in read_outputs() comes from the channel (post-reconnect). + // During disconnect, the run loop (sp.ok()) exits so read_outputs() stops being + // called; output_buffer is not updated, and channel data may be lost if it fills up. + let buffer = session + .output_buffer + .get_recent(DEFAULT_RECONNECT_BUFFER_BYTES); + let has_pending = !buffer.is_empty(); + session.state = SessionState::Active { + pending_buffer: if has_pending { Some(buffer) } else { None }, + // Always trigger two-phase SIGWINCH on reconnect to force TUI app redraw, + // regardless of whether there's pending buffer data. This avoids edge cases + // where buffer is empty but a TUI app (top/htop) still needs a full redraw. + sigwinch: SigwinchPhase::TempResize { + retries: MAX_SIGWINCH_PHASE_ATTEMPTS, + }, + }; let mut opened = TerminalOpened::new(); opened.terminal_id = open.terminal_id; opened.success = true; @@ -829,13 +977,6 @@ impl TerminalServiceProxy { } response.set_opened(opened); - // Send buffered output - let buffer = session.output_buffer.get_recent(4096); - if !buffer.is_empty() { - // We'll need to send this separately or extend the protocol - // For now, just acknowledge the reconnection - } - return Ok(Some(response)); } @@ -945,6 +1086,9 @@ impl TerminalServiceProxy { let reader_thread = thread::spawn(move || { let mut reader = reader; let mut buf = vec![0u8; 4096]; + let mut drop_count: u64 = 0; + // Initialize to > 5s ago so the first drop triggers a warning immediately. + let mut last_drop_warn = Instant::now() - Duration::from_secs(6); loop { match reader.read(&mut buf) { Ok(0) => { @@ -958,19 +1102,22 @@ impl TerminalServiceProxy { break; } let data = buf[..n].to_vec(); - // Try to send, if channel is full, drop the data - match output_tx.try_send(data) { - Ok(_) => {} - Err(mpsc::TrySendError::Full(_)) => { - log::debug!( - "Terminal {} output channel full, dropping data", - terminal_id - ); - } - Err(mpsc::TrySendError::Disconnected(_)) => { - log::debug!("Terminal {} output channel disconnected", terminal_id); - break; - } + // Use try_send to avoid blocking the reader thread when channel is full. + // During disconnect, the run loop (sp.ok()) stops and read_outputs() is + // no longer called, so the channel won't be drained. Blocking send would + // deadlock the reader thread in that case. + // Note: data produced during disconnect may be lost if channel fills up, + // since output_buffer is only updated in read_outputs(). The buffer will + // contain history from before the disconnect, not data produced after it. + if try_send_output( + &output_tx, + data, + terminal_id, + "", + &mut drop_count, + &mut last_drop_warn, + ) { + break; } } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { @@ -996,7 +1143,10 @@ impl TerminalServiceProxy { session.output_rx = Some(output_rx); session.reader_thread = Some(reader_thread); session.writer_thread = Some(writer_thread); - session.is_opened = true; + session.state = SessionState::Active { + pending_buffer: None, + sigwinch: SigwinchPhase::Idle, + }; let mut opened = TerminalOpened::new(); opened.terminal_id = open.terminal_id; @@ -1158,6 +1308,9 @@ impl TerminalServiceProxy { let terminal_id = open.terminal_id; let reader_thread = thread::spawn(move || { let mut buf = vec![0u8; 4096]; + let mut drop_count: u64 = 0; + // Initialize to > 5s ago so the first drop triggers a warning immediately. + let mut last_drop_warn = Instant::now() - Duration::from_secs(6); loop { match output_pipe.read(&mut buf) { Ok(0) => { @@ -1170,18 +1323,16 @@ impl TerminalServiceProxy { break; } let data = buf[..n].to_vec(); - match output_tx.try_send(data) { - Ok(_) => {} - Err(mpsc::TrySendError::Full(_)) => { - log::debug!( - "Terminal {} output channel full, dropping data", - terminal_id - ); - } - Err(mpsc::TrySendError::Disconnected(_)) => { - log::debug!("Terminal {} output channel disconnected", terminal_id); - break; - } + // Use try_send to avoid blocking the reader thread (same as direct PTY mode) + if try_send_output( + &output_tx, + data, + terminal_id, + " (helper)", + &mut drop_count, + &mut last_drop_warn, + ) { + break; } } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { @@ -1211,7 +1362,10 @@ impl TerminalServiceProxy { session.output_rx = Some(output_rx); session.reader_thread = Some(reader_thread); session.writer_thread = Some(writer_thread); - session.is_opened = true; + session.state = SessionState::Active { + pending_buffer: None, + sigwinch: SigwinchPhase::Idle, + }; session.is_helper_mode = true; session.helper_process_handle = Some(SendableHandle::new(helper_raw_handle)); @@ -1253,6 +1407,11 @@ impl TerminalServiceProxy { session.rows = resize.rows as u16; session.cols = resize.cols as u16; + // Note: we do NOT clear the sigwinch phase here. The server-side two-phase + // SIGWINCH mechanism in read_outputs() is self-contained (temp resize → restore + // across two polling cycles), so client resize is purely a dimension sync and + // doesn't affect it. + // Windows: handle helper mode vs direct PTY mode #[cfg(target_os = "windows")] { @@ -1358,6 +1517,116 @@ impl TerminalServiceProxy { } } + /// Perform a single PTY resize as part of the two-phase SIGWINCH sequence. + /// Returns true if the resize succeeded. + /// + /// Takes individual field references to avoid borrowing the entire TerminalSession, + /// which would conflict with the mutable borrow of session.state in read_outputs(). + fn do_sigwinch_resize( + terminal_id: i32, + rows: u16, + cols: u16, + pty_pair: &Option, + input_tx: &Option>>, + _is_helper_mode: bool, + action: &SigwinchAction, + ) -> bool { + // Skip if dimensions are not initialized (shouldn't happen on reconnect, + // but guard against it to avoid resizing to nonsensical values). + if rows == 0 || cols == 0 { + return false; + } + + let target_rows = match action { + SigwinchAction::TempResize => { + // For very small terminals (≤2 rows), subtracting 1 would result in an unusable + // size (0 or 1 row), so we add 1 instead. Either direction triggers SIGWINCH. + if rows > 2 { + rows.saturating_sub(1) + } else { + rows.saturating_add(1) + } + } + SigwinchAction::Restore => rows, + }; + + let phase_name = match action { + SigwinchAction::TempResize => "temp resize", + SigwinchAction::Restore => "restore", + }; + + #[cfg(target_os = "windows")] + let use_helper = _is_helper_mode; + #[cfg(not(target_os = "windows"))] + let use_helper = false; + + if use_helper { + #[cfg(target_os = "windows")] + { + let input_tx = match input_tx { + Some(tx) => tx, + None => return false, + }; + let msg = encode_resize_message(target_rows, cols); + if let Err(e) = input_tx.try_send(msg) { + log::warn!( + "Terminal {} SIGWINCH {} via helper failed: {}", + terminal_id, + phase_name, + e + ); + return false; + } + true + } + #[cfg(not(target_os = "windows"))] + { + let _ = (input_tx, phase_name); + false + } + } else if let Some(pty_pair) = pty_pair { + if let Err(e) = pty_pair.master.resize(PtySize { + rows: target_rows, + cols, + pixel_width: 0, + pixel_height: 0, + }) { + log::warn!( + "Terminal {} SIGWINCH {} failed: {}", + terminal_id, + phase_name, + e + ); + return false; + } + true + } else { + false + } + } + + /// Helper to create a TerminalResponse with optional compression. + fn create_terminal_data_response(terminal_id: i32, data: Vec) -> TerminalResponse { + let mut response = TerminalResponse::new(); + let mut terminal_data = TerminalData::new(); + terminal_data.terminal_id = terminal_id; + + if data.len() > COMPRESS_THRESHOLD { + let compressed = compress::compress(&data); + if compressed.len() < data.len() { + terminal_data.data = bytes::Bytes::from(compressed); + terminal_data.compressed = true; + } else { + terminal_data.data = bytes::Bytes::from(data); + } + } else { + terminal_data.data = bytes::Bytes::from(data); + } + + response.set_data(terminal_data); + response + } + pub fn read_outputs(&self) -> Vec { let service = match get_service(&self.service_id) { Some(s) => s, @@ -1399,12 +1668,11 @@ impl TerminalServiceProxy { closed_terminals.push(terminal_id); } - if !session.is_opened { - // Skip the session if it is not opened. - continue; - } - - // Read from output channel + // Always drain the output channel regardless of session state. + // When Active: data is sent to client. When Closed (within the same + // connection): data is buffered in output_buffer for reconnection replay. + // Note: during actual disconnect, the run loop exits and read_outputs() + // is not called, so channel data produced after disconnect may be lost. let mut has_activity = false; let mut received_data = Vec::new(); if let Some(output_rx) = &session.output_rx { @@ -1415,37 +1683,111 @@ impl TerminalServiceProxy { } } - // Update buffer after reading + if has_activity { + session.update_activity(); + } + + // Update buffer (always buffer for reconnection support) for data in &received_data { session.output_buffer.append(data); } - // Process received data for responses - for data in received_data { - let mut response = TerminalResponse::new(); - let mut terminal_data = TerminalData::new(); - terminal_data.terminal_id = terminal_id; + // Skip sending responses if session is not Active. + // Data is already buffered above and will be sent on next reconnection. + // Use a scoped block to limit the mutable borrow of session.state, + // so we can immutably borrow other session fields afterwards. + let sigwinch_action = { + let (pending_buffer, sigwinch) = match &mut session.state { + SessionState::Active { + pending_buffer, + sigwinch, + } => (pending_buffer, sigwinch), + _ => continue, + }; - // Compress data if it exceeds threshold - if data.len() > COMPRESS_THRESHOLD { - let compressed = compress::compress(&data); - if compressed.len() < data.len() { - terminal_data.data = bytes::Bytes::from(compressed); - terminal_data.compressed = true; - } else { - // Compression didn't help, send uncompressed - terminal_data.data = bytes::Bytes::from(data); + // Send pending buffer response first (set on reconnection in handle_open). + // This ensures historical buffer is sent before any real-time data. + if let Some(buffer) = pending_buffer.take() { + if !buffer.is_empty() { + responses + .push(Self::create_terminal_data_response(terminal_id, buffer)); } - } else { - terminal_data.data = bytes::Bytes::from(data); } - response.set_data(terminal_data); - responses.push(response); + // Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale. + // Each phase is a single PTY resize, spaced ~30ms apart by the polling + // interval, ensuring the TUI app sees a real size change on each signal. + match sigwinch { + SigwinchPhase::TempResize { retries } => { + if *retries == 0 { + log::warn!( + "Terminal {} SIGWINCH phase 1 (temp resize) failed after {} attempts, giving up", + terminal_id, MAX_SIGWINCH_PHASE_ATTEMPTS + ); + *sigwinch = SigwinchPhase::Idle; + None + } else { + *retries -= 1; + Some(SigwinchAction::TempResize) + } + } + SigwinchPhase::Restore { retries } => { + if *retries == 0 { + log::warn!( + "Terminal {} SIGWINCH phase 2 (restore) failed after {} attempts, giving up", + terminal_id, MAX_SIGWINCH_PHASE_ATTEMPTS + ); + *sigwinch = SigwinchPhase::Idle; + None + } else { + *retries -= 1; + Some(SigwinchAction::Restore) + } + } + SigwinchPhase::Idle => None, + } + }; + + // Execute SIGWINCH resize outside the mutable borrow scope of session.state. + if let Some(action) = sigwinch_action { + #[cfg(target_os = "windows")] + let is_helper = session.is_helper_mode; + #[cfg(not(target_os = "windows"))] + let is_helper = false; + let resize_ok = Self::do_sigwinch_resize( + terminal_id, + session.rows, + session.cols, + &session.pty_pair, + &session.input_tx, + is_helper, + &action, + ); + if let SessionState::Active { sigwinch, .. } = &mut session.state { + match action { + SigwinchAction::TempResize => { + if resize_ok { + // Phase 1 succeeded — advance to phase 2 (restore). + *sigwinch = SigwinchPhase::Restore { + retries: MAX_SIGWINCH_PHASE_ATTEMPTS, + }; + } + // If failed, retries already decremented; will retry phase 1. + } + SigwinchAction::Restore => { + if resize_ok { + // Phase 2 succeeded — SIGWINCH sequence complete. + *sigwinch = SigwinchPhase::Idle; + } + // If failed, retries already decremented; will retry phase 2. + } + } + } } - if has_activity { - session.update_activity(); + // Send real-time data after historical buffer + for data in received_data { + responses.push(Self::create_terminal_data_response(terminal_id, data)); } } }