From 6c20fc936d04d0290415ca749cdd624b28969380 Mon Sep 17 00:00:00 2001 From: RustDesk <71636191+rustdesk@users.noreply.github.com> Date: Thu, 7 May 2026 13:27:13 +0800 Subject: [PATCH] Terminal utf8 and reconnect (#14895) * fix: handle incomplete UTF-8 sequences in terminal output, rework on https://github.com/rustdesk/rustdesk/pull/14736 * Fix terminal auto-reconnect freeze: reconnect resumes terminal output, while multi-tab reconnect avoids restoring duplicate tabs for terminals that are already open. * fix(terminal): subtract with overflow ``` thread '' panicked at src\server\terminal_service.rs:476:17: attempt to subtract with overflow note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace thread 'tokio-runtime-worker' panicked at src\server\terminal_service.rs:1576:50: called `Result::unwrap()` on an `Err` value: PoisonError { .. } [2026-04-25T07:17:34Z ERROR librustdesk::server::service] Failed to join thread for service ts_9badd3fe-2411-4996-9f40-93c979009edd, Any { .. } ``` Signed-off-by: fufesou * fix ios enter: https://github.com/rustdesk/rustdesk/issues/14907 * fix(terminal): reconnect, error handling 1. Terminal shows "^[[1;1R^[[2;2R^[[>0;0;0c" 2. NaN ``` [ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: Converting object to an encodable object failed: NaN ... ``` Signed-off-by: fufesou * fix(terminal): dialog, close window Signed-off-by: fufesou * fix(terminal): close terminal window on disconnect dialog Signed-off-by: fufesou * fix(terminal): merge reconnect backlog into replay output Signed-off-by: fufesou * fix(terminal): avoid reconnect stalls and delayed layout writes Signed-off-by: fufesou * fix(terminal): remove invalid test Signed-off-by: fufesou * fix(terminal): schedule frame before flushing buffered output Signed-off-by: fufesou * fix(terminal): windows&macos, charset utf-8 Signed-off-by: fufesou * fix(terminal): reconnect suppress next output Signed-off-by: fufesou * fix: cap terminal reconnect replay output - split reconnect replay backlog into capped chunks - mark terminal data replay chunks for client-side suppression - avoid using open-message text to suppress xterm replies - reuse default terminal padding value - remove misleading Enter-key normalization PR link Signed-off-by: fufesou * fix(terminal): env en_US.UTF-8 Signed-off-by: fufesou * fix(terminal): reconnect, refactor Signed-off-by: fufesou * fix(terminal): flag, retry output Signed-off-by: fufesou * fix(terminal): update hbb_common Signed-off-by: fufesou * fix(terminal): comments Signed-off-by: fufesou * fix(terminal): comments utf-8 chunk accumulator Signed-off-by: fufesou * fix(terminal): update hbb_common Signed-off-by: fufesou --------- Signed-off-by: fufesou Co-authored-by: fufesou --- flutter/lib/common.dart | 14 +- flutter/lib/desktop/pages/terminal_page.dart | 28 +- .../lib/desktop/pages/terminal_tab_page.dart | 36 +- .../lib/desktop/widgets/tabbar_widget.dart | 1 + flutter/lib/models/terminal_model.dart | 116 +++-- libs/hbb_common | 2 +- src/flutter.rs | 4 + src/server/terminal_helper.rs | 32 +- src/server/terminal_service.rs | 407 ++++++++++++++++-- 9 files changed, 560 insertions(+), 80 deletions(-) diff --git a/flutter/lib/common.dart b/flutter/lib/common.dart index e579db36a..366a7b6ba 100644 --- a/flutter/lib/common.dart +++ b/flutter/lib/common.dart @@ -716,6 +716,17 @@ closeConnection({String? id}) { stateGlobal.isInMainPage = true; } else { final controller = Get.find(); + if (controller.tabType == DesktopTabType.terminal && + controller.onCloseWindow != null) { + // Terminal windows are scoped to one peer. The optional id passed to + // closeConnection() is that peer id, not a terminal tab key + // (${peerId}_${terminalId}). Closing from terminal dialogs should close + // the peer's whole terminal window, including all terminal tabs. + unawaited(controller.onCloseWindow!().catchError((e, _) { + debugPrint('[closeConnection] Failed to close terminal window: $e'); + })); + return; + } controller.closeBy(id); } } @@ -4179,8 +4190,7 @@ Widget? buildAvatarWidget({ width: size, height: size, fit: BoxFit.cover, - errorBuilder: (_, __, ___) => - fallback ?? SizedBox.shrink(), + errorBuilder: (_, __, ___) => fallback ?? SizedBox.shrink(), ), ); } diff --git a/flutter/lib/desktop/pages/terminal_page.dart b/flutter/lib/desktop/pages/terminal_page.dart index 0070cd73b..d38dc4a8b 100644 --- a/flutter/lib/desktop/pages/terminal_page.dart +++ b/flutter/lib/desktop/pages/terminal_page.dart @@ -27,6 +27,7 @@ class TerminalPage extends StatefulWidget { final bool? isSharedPassword; final String? connToken; final int terminalId; + /// Tab key for focus management, passed from parent to avoid duplicate construction final String tabKey; final SimpleWrapper?> _lastState = SimpleWrapper(null); @@ -43,6 +44,9 @@ class TerminalPage extends StatefulWidget { class _TerminalPageState extends State with AutomaticKeepAliveClientMixin { + static const EdgeInsets _defaultTerminalPadding = + EdgeInsets.symmetric(horizontal: 5.0, vertical: 2.0); + late FFI _ffi; late TerminalModel _terminalModel; double? _cellHeight; @@ -155,13 +159,27 @@ class _TerminalPageState extends State // extra space left after dividing the available height by the height of a single // terminal row (`_cellHeight`) and distributing it evenly as top and bottom padding. EdgeInsets _calculatePadding(double heightPx) { - if (_cellHeight == null) { - return const EdgeInsets.symmetric(horizontal: 5.0, vertical: 2.0); + final cellHeight = _cellHeight; + if (!heightPx.isFinite || + heightPx <= 0 || + cellHeight == null || + !cellHeight.isFinite || + cellHeight <= 0) { + return _defaultTerminalPadding; + } + final rows = (heightPx / cellHeight).floor(); + if (rows <= 0) { + return _defaultTerminalPadding; + } + final extraSpace = heightPx - rows * cellHeight; + if (!extraSpace.isFinite || extraSpace < 0) { + return _defaultTerminalPadding; } - final rows = (heightPx / _cellHeight!).floor(); - final extraSpace = heightPx - rows * _cellHeight!; final topBottom = extraSpace / 2.0; - return EdgeInsets.symmetric(horizontal: 5.0, vertical: topBottom); + return EdgeInsets.symmetric( + horizontal: _defaultTerminalPadding.horizontal / 2, + vertical: topBottom, + ); } @override diff --git a/flutter/lib/desktop/pages/terminal_tab_page.dart b/flutter/lib/desktop/pages/terminal_tab_page.dart index 28e59fb05..63289e94d 100644 --- a/flutter/lib/desktop/pages/terminal_tab_page.dart +++ b/flutter/lib/desktop/pages/terminal_tab_page.dart @@ -46,6 +46,7 @@ class _TerminalTabPageState extends State { .setTitle(getWindowNameWithId(id)); }; tabController.onRemoved = (_, id) => onRemoveId(id); + tabController.onCloseWindow = _closeWindowFromConnection; final terminalId = params['terminalId'] ?? _nextTerminalId++; tabController.add(_createTerminalTab( peerId: params['id'], @@ -144,6 +145,8 @@ class _TerminalTabPageState extends State { _windowClosing = true; final tabKeys = tabController.state.value.tabs.map((t) => t.key).toList(); // Remove all UI tabs immediately (same instant behavior as the old tabController.clear()) + // Keep the cleanup target lookup below synchronous before its first await: + // it relies on the current frame still retaining each TerminalPage's FFI/model. tabController.clear(); // Run session cleanup in parallel with bounded timeout (closeTerminal() has internal 3s timeout). // Skip tabs already being closed by a concurrent _closeTab() to avoid duplicate FFI calls. @@ -368,8 +371,34 @@ class _TerminalTabPageState extends State { final persistentSessions = args['persistent_sessions'] as List? ?? []; final sortedSessions = persistentSessions.whereType().toList()..sort(); + var peerId = args['peer_id'] as String? ?? ''; + if (peerId.isEmpty) { + if (tabController.state.value.tabs.isEmpty || + tabController.state.value.selected >= + tabController.state.value.tabs.length) { + debugPrint('[TerminalTabPage] Skip restore: no selected tab'); + return; + } + final currentTab = tabController.state.value.selectedTabInfo; + final parsed = _parseTabKey(currentTab.key); + if (parsed == null) return; + peerId = parsed.$1; + } + final existingTerminalIds = tabController.state.value.tabs + .map((tab) => _parseTabKey(tab.key)) + .where((parsed) => parsed != null && parsed.$1 == peerId) + .map((parsed) => parsed!.$2) + .toSet(); + if (existingTerminalIds.isEmpty) { + debugPrint( + '[TerminalTabPage] Skip restore: no seed tab for peer $peerId'); + return; + } for (final terminalId in sortedSessions) { - _addNewTerminalForCurrentPeer(terminalId: terminalId); + if (!existingTerminalIds.add(terminalId)) { + continue; + } + _addNewTerminal(peerId, terminalId: terminalId); // A delay is required to ensure the UI has sufficient time to update // before adding the next terminal. Without this delay, `_TerminalPageState::dispose()` // may be called prematurely while the tab widget is still in the tab controller. @@ -546,6 +575,11 @@ class _TerminalTabPageState extends State { } } + Future _closeWindowFromConnection() async { + await _closeAllTabs(); + await WindowController.fromWindowId(windowId()).close(); + } + int windowId() { return widget.params["windowId"]; } diff --git a/flutter/lib/desktop/widgets/tabbar_widget.dart b/flutter/lib/desktop/widgets/tabbar_widget.dart index ac7d80017..ef195b493 100644 --- a/flutter/lib/desktop/widgets/tabbar_widget.dart +++ b/flutter/lib/desktop/widgets/tabbar_widget.dart @@ -99,6 +99,7 @@ class DesktopTabController { /// index, key Function(int, String)? onRemoved; Function(String)? onSelected; + Future Function()? onCloseWindow; DesktopTabController( {required this.tabType, this.onRemoved, this.onSelected}); diff --git a/flutter/lib/models/terminal_model.dart b/flutter/lib/models/terminal_model.dart index a74241ccb..8961d2dd8 100644 --- a/flutter/lib/models/terminal_model.dart +++ b/flutter/lib/models/terminal_model.dart @@ -27,25 +27,30 @@ class TerminalModel with ChangeNotifier { // Buffer for output data received before terminal view has valid dimensions. // This prevents NaN errors when writing to terminal before layout is complete. final _pendingOutputChunks = []; + final _pendingOutputSuppressFlags = []; int _pendingOutputSize = 0; static const int _kMaxOutputBufferChars = 8 * 1024; // View ready state: true when terminal has valid dimensions, safe to write bool _terminalViewReady = false; - - bool get isPeerWindows => parent.ffiModel.pi.platform == kPeerPlatformWindows; + bool _markViewReadyScheduled = false; + bool _suppressTerminalOutput = false; + bool _suppressNextTerminalDataOutput = false; void Function(int w, int h, int pw, int ph)? onResizeExternal; Future _handleInput(String data) async { - // If we press the `Enter` button on Android, - // `data` can be '\r' or '\n' when using different keyboards. - // Android -> Windows. '\r' works, but '\n' does not. '\n' is just a newline. - // Android -> Linux. Both '\r' and '\n' work as expected (execute a command). - // So when we receive '\n', we may need to convert it to '\r' to ensure compatibility. - // Desktop -> Desktop works fine. - // Check if we are on mobile or web(mobile), and convert '\n' to '\r'. + // Soft keyboards (notably iOS) emit '\n' when Enter is pressed, while a + // real keyboard's Enter sends '\r'. Some Android keyboards also emit '\n'. + // - Peer Windows: '\r' works, '\n' is just a newline. + // - Peer Linux: canonical-mode shells accept both, but raw-mode apps + // (readline, prompt_toolkit, vim, TUI frameworks) expect '\r'. + // - Peer macOS: same as Linux, raw-mode apps expect '\r' + // (https://github.com/rustdesk/rustdesk/issues/14907). + // So on mobile / web-mobile, always normalize a lone '\n' to '\r'. + // We deliberately do not touch multi-character payloads (e.g. pasted text) + // so embedded newlines in pasted content are preserved. final isMobileOrWebMobile = (isMobile || (isWeb && !isWebDesktop)); - if (isMobileOrWebMobile && isPeerWindows && data == '\n') { + if (isMobileOrWebMobile && data == '\n') { data = '\r'; } if (_terminalOpened) { @@ -70,7 +75,10 @@ class TerminalModel with ChangeNotifier { terminalController = TerminalController(); // Setup terminal callbacks - terminal.onOutput = _handleInput; + terminal.onOutput = (data) { + if (_suppressTerminalOutput) return; + _handleInput(data); + }; terminal.onResize = (w, h, pw, ph) async { // Validate all dimensions before using them @@ -84,7 +92,7 @@ class TerminalModel with ChangeNotifier { // Mark terminal view as ready and flush any buffered output on first valid resize. // Must be after onResizeExternal so the view layer has valid dimensions before flushing. if (!_terminalViewReady) { - _markViewReady(); + _scheduleMarkViewReady(); } if (_terminalOpened) { @@ -110,14 +118,16 @@ class TerminalModel with ChangeNotifier { void onReady() { parent.dialogManager.dismissAll(); - // Fire and forget - don't block onReady - openTerminal().catchError((e) { + // Fire and forget - don't block onReady. If the transport reconnects while + // this model is still open, re-send OpenTerminal so the remote service marks + // the persistent session active again and resumes output streaming. + openTerminal(force: _terminalOpened).catchError((e) { debugPrint('[TerminalModel] Error opening terminal: $e'); }); } - Future openTerminal() async { - if (_terminalOpened) return; + Future openTerminal({bool force = false}) async { + if (_terminalOpened && !force) return; // Request the remote side to open a terminal with default shell // The remote side will decide which shell to use based on its OS @@ -275,9 +285,12 @@ class TerminalModel with ChangeNotifier { if (success) { _terminalOpened = true; - // On reconnect ("Reconnected to existing terminal"), server may replay recent output. - // If this TerminalView instance is reused (not rebuilt), duplicate lines can appear. - // We intentionally accept this tradeoff for now to keep logic simple. + // On reconnect, the server may replay recent output. That replay can include + // terminal queries like DSR/DA; xterm answers them through onOutput as + // "^[[1;1R^[[2;2R^[[>0;0;0c", which must not be sent back to the peer. + final replayTerminalOutput = evt['replay_terminal_output']; + _suppressNextTerminalDataOutput = replayTerminalOutput == true || + message == 'Reconnected to existing terminal with pending output'; // Fallback: if terminal view is not yet ready but already has valid // dimensions (e.g. layout completed before open response arrived), @@ -285,7 +298,7 @@ class TerminalModel with ChangeNotifier { if (!_terminalViewReady && terminal.viewWidth > 0 && terminal.viewHeight > 0) { - _markViewReady(); + _scheduleMarkViewReady(); } // Process any buffered input @@ -297,12 +310,16 @@ class TerminalModel with ChangeNotifier { }); final persistentSessions = - evt['persistent_sessions'] as List? ?? []; + (evt['persistent_sessions'] as List? ?? []) + .whereType() + .where((id) => !parent.terminalModels.containsKey(id)) + .toList(); if (kWindowId != null && persistentSessions.isNotEmpty) { DesktopMultiWindow.invokeMethod( kWindowId!, kWindowEventRestoreTerminalSessions, jsonEncode({ + 'peer_id': id, 'persistent_sessions': persistentSessions, })); } @@ -332,6 +349,8 @@ class TerminalModel with ChangeNotifier { final data = evt['data']; if (data != null) { + final suppressTerminalOutput = _suppressNextTerminalDataOutput; + _suppressNextTerminalDataOutput = false; try { String text = ''; if (data is String) { @@ -351,7 +370,7 @@ class TerminalModel with ChangeNotifier { return; } - _writeToTerminal(text); + _writeToTerminal(text, suppressTerminalOutput: suppressTerminalOutput); } catch (e) { debugPrint('[TerminalModel] Failed to process terminal data: $e'); } @@ -361,7 +380,10 @@ class TerminalModel with ChangeNotifier { /// Write text to terminal, buffering if the view is not yet ready. /// All terminal output should go through this method to avoid NaN errors /// from writing before the terminal view has valid layout dimensions. - void _writeToTerminal(String text) { + void _writeToTerminal( + String text, { + bool suppressTerminalOutput = false, + }) { if (!_terminalViewReady) { // If a single chunk exceeds the cap, keep only its tail. // Note: truncation may split a multi-byte ANSI escape sequence, @@ -373,34 +395,73 @@ class TerminalModel with ChangeNotifier { _pendingOutputChunks ..clear() ..add(truncated); + _pendingOutputSuppressFlags + ..clear() + ..add(suppressTerminalOutput); _pendingOutputSize = truncated.length; } else { _pendingOutputChunks.add(text); + _pendingOutputSuppressFlags.add(suppressTerminalOutput); _pendingOutputSize += text.length; // Drop oldest chunks if exceeds limit (whole chunks to preserve ANSI sequences) while (_pendingOutputSize > _kMaxOutputBufferChars && _pendingOutputChunks.length > 1) { final removed = _pendingOutputChunks.removeAt(0); + _pendingOutputSuppressFlags.removeAt(0); _pendingOutputSize -= removed.length; } } return; } - terminal.write(text); + _writeTerminalChunk(text, suppressTerminalOutput: suppressTerminalOutput); } void _flushOutputBuffer() { if (_pendingOutputChunks.isEmpty) return; debugPrint( '[TerminalModel] Flushing $_pendingOutputSize buffered chars (${_pendingOutputChunks.length} chunks)'); - for (final chunk in _pendingOutputChunks) { - terminal.write(chunk); + for (var i = 0; i < _pendingOutputChunks.length; i++) { + _writeTerminalChunk( + _pendingOutputChunks[i], + suppressTerminalOutput: _pendingOutputSuppressFlags[i], + ); } _pendingOutputChunks.clear(); + _pendingOutputSuppressFlags.clear(); _pendingOutputSize = 0; } + void _writeTerminalChunk( + String text, { + required bool suppressTerminalOutput, + }) { + if (!suppressTerminalOutput) { + terminal.write(text); + return; + } + final previous = _suppressTerminalOutput; + _suppressTerminalOutput = true; + try { + terminal.write(text); + } finally { + _suppressTerminalOutput = previous; + } + } + /// Mark terminal view as ready and flush buffered output. + void _scheduleMarkViewReady() { + if (_disposed || _terminalViewReady || _markViewReadyScheduled) return; + _markViewReadyScheduled = true; + WidgetsBinding.instance.addPostFrameCallback((_) { + _markViewReadyScheduled = false; + if (_disposed || _terminalViewReady) return; + if (terminal.viewWidth > 0 && terminal.viewHeight > 0) { + _markViewReady(); + } + }); + WidgetsBinding.instance.ensureVisualUpdate(); + } + void _markViewReady() { if (_terminalViewReady) return; _terminalViewReady = true; @@ -426,7 +487,10 @@ class TerminalModel with ChangeNotifier { // Clear buffers to free memory _inputBuffer.clear(); _pendingOutputChunks.clear(); + _pendingOutputSuppressFlags.clear(); _pendingOutputSize = 0; + _markViewReadyScheduled = false; + _suppressNextTerminalDataOutput = false; // Terminal cleanup is handled server-side when service closes super.dispose(); } diff --git a/libs/hbb_common b/libs/hbb_common index 3e31a9493..42af0f0ae 160000 --- a/libs/hbb_common +++ b/libs/hbb_common @@ -1 +1 @@ -Subproject commit 3e31a94939e026ab2c05d21a2c436960aa9bfea8 +Subproject commit 42af0f0aed0bb5fd5df4ff95fd4cc9816fcf5769 diff --git a/src/flutter.rs b/src/flutter.rs index c7e07f892..f8b04bf6c 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -1135,6 +1135,10 @@ impl InvokeUiSession for FlutterHandler { ("message", json!(&opened.message)), ("pid", json!(opened.pid)), ("service_id", json!(&opened.service_id)), + ( + "replay_terminal_output", + json!(opened.replay_terminal_output), + ), ]; if !opened.persistent_sessions.is_empty() { event_data.push(("persistent_sessions", json!(opened.persistent_sessions))); diff --git a/src/server/terminal_helper.rs b/src/server/terminal_helper.rs index 8edf4621b..fd85d2a4c 100644 --- a/src/server/terminal_helper.rs +++ b/src/server/terminal_helper.rs @@ -318,6 +318,35 @@ pub fn get_default_shell() -> String { std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string()) } +fn utf8_shell_args(shell: &str) -> Vec { + let name = std::path::Path::new(shell) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or(shell) + .to_ascii_lowercase(); + + if name == "cmd.exe" || name == "cmd" { + return vec!["/K".to_string(), "chcp 65001 >NUL".to_string()]; + } + + if name == "pwsh.exe" || name == "pwsh" || name == "powershell.exe" { + return vec![ + "-NoLogo".to_string(), + "-NoExit".to_string(), + "-Command".to_string(), + "chcp.com 65001 > $null; [Console]::InputEncoding = [System.Text.Encoding]::UTF8; [Console]::OutputEncoding = [System.Text.Encoding]::UTF8".to_string(), + ]; + } + + Vec::new() +} + +pub fn configure_utf8_shell_command(shell: &str, cmd: &mut CommandBuilder) { + for arg in utf8_shell_args(shell) { + cmd.arg(arg); + } +} + /// Get the SID of the user from a token. /// Returns a Vec containing the SID bytes. pub fn get_user_sid_from_token(user_token: UserToken) -> Result> { @@ -831,7 +860,8 @@ pub fn run_terminal_helper(args: &[String]) -> Result<()> { let shell = get_default_shell(); log::debug!("Using shell: {}", shell); - let cmd = CommandBuilder::new(&shell); + let mut cmd = CommandBuilder::new(&shell); + configure_utf8_shell_command(&shell, &mut cmd); let mut child = pty_pair .slave .spawn_command(cmd) diff --git a/src/server/terminal_service.rs b/src/server/terminal_service.rs index fb6b4fd29..52a296b74 100644 --- a/src/server/terminal_service.rs +++ b/src/server/terminal_service.rs @@ -20,10 +20,11 @@ use std::{ // Windows-specific imports from terminal_helper module #[cfg(target_os = "windows")] use super::terminal_helper::{ - create_named_pipe_server, encode_helper_message, encode_resize_message, - is_helper_process_running, launch_terminal_helper_with_token, wait_for_pipe_connection, - HelperProcessGuard, OwnedHandle, SendableHandle, WinCloseHandle, WinTerminateProcess, - WinWaitForSingleObject, MSG_TYPE_DATA, PIPE_CONNECTION_TIMEOUT_MS, WIN_WAIT_OBJECT_0, + configure_utf8_shell_command, create_named_pipe_server, encode_helper_message, + encode_resize_message, is_helper_process_running, launch_terminal_helper_with_token, + wait_for_pipe_connection, HelperProcessGuard, OwnedHandle, SendableHandle, WinCloseHandle, + WinTerminateProcess, WinWaitForSingleObject, MSG_TYPE_DATA, PIPE_CONNECTION_TIMEOUT_MS, + WIN_WAIT_OBJECT_0, }; const MAX_OUTPUT_BUFFER_SIZE: usize = 1024 * 1024; // 1MB per terminal @@ -133,6 +134,26 @@ fn get_default_shell() -> String { } } +#[cfg(target_os = "macos")] +fn locale_value_is_utf8(value: &str) -> bool { + let value = value.to_ascii_uppercase(); + value.contains("UTF-8") || value.contains("UTF8") +} + +#[cfg(target_os = "macos")] +fn should_force_process_utf8_ctype() -> bool { + if let Ok(value) = std::env::var("LC_ALL") { + return !locale_value_is_utf8(&value); + } + if let Ok(value) = std::env::var("LC_CTYPE") { + return !locale_value_is_utf8(&value); + } + if let Ok(value) = std::env::var("LANG") { + return !locale_value_is_utf8(&value); + } + true +} + pub fn is_service_specified_user(service_id: &str) -> Option { get_service(service_id).map(|s| s.lock().unwrap().is_specified_user) } @@ -435,6 +456,7 @@ impl OutputBuffer { // Find first newline in new data if let Some(newline_pos) = data.iter().position(|&b| b == b'\n') { last_line.extend_from_slice(&data[..=newline_pos]); + self.total_size += newline_pos + 1; start = newline_pos + 1; self.last_line_incomplete = false; } else { @@ -473,7 +495,28 @@ impl OutputBuffer { // Trim old data if buffer is too large while self.total_size > MAX_OUTPUT_BUFFER_SIZE || self.lines.len() > MAX_BUFFER_LINES { if let Some(removed) = self.lines.pop_front() { - self.total_size -= removed.len(); + if removed.len() > self.total_size { + log::error!( + "OutputBuffer total_size underflow avoided: total_size={}, removed_len={}, lines_len={}", + self.total_size, + removed.len(), + self.lines.len() + ); + self.total_size = self.lines.iter().map(|line| line.len()).sum(); + } else { + self.total_size -= removed.len(); + } + if self.lines.is_empty() { + self.last_line_incomplete = false; + } + } else { + log::error!( + "OutputBuffer trim invariant broken: total_size={}, lines_len=0", + self.total_size + ); + self.total_size = 0; + self.last_line_incomplete = false; + break; } } } @@ -531,6 +574,97 @@ impl OutputBuffer { } } +/// Find the largest prefix of `buf` that does not end in the middle of a UTF-8 +/// code point. Invalid bytes are treated as complete so they can continue +/// downstream and be rendered with replacement characters if needed. +fn find_utf8_split_point(buf: &[u8]) -> usize { + if buf.is_empty() { + return 0; + } + + let start = buf.len().saturating_sub(3); + for i in (start..buf.len()).rev() { + let b = buf[i]; + if b & 0x80 == 0 { + return buf.len(); + } + if b & 0xC0 == 0x80 { + continue; + } + + let seq_len = if b & 0xE0 == 0xC0 { + 2 + } else if b & 0xF0 == 0xE0 { + 3 + } else if b & 0xF8 == 0xF0 { + 4 + } else { + return buf.len(); + }; + + return if buf.len() - i >= seq_len { + buf.len() + } else { + i + }; + } + + buf.len() +} + +// Terminal output currently follows a UTF-8 text model end to end: the service +// keeps replay buffers on UTF-8 boundaries, and Flutter decodes payload bytes as +// UTF-8 before writing to xterm. This accumulator only prevents splitting a +// trailing UTF-8 code point across PTY reads. Supporting non-UTF-8 terminals +// would need a separate design covering remote encoding detection, Flutter +// decoding, replay truncation, and input transcoding. +#[derive(Default)] +struct Utf8ChunkAccumulator { + remainder: Vec, +} + +impl Utf8ChunkAccumulator { + fn push_chunk(&mut self, mut data: Vec) -> Option> { + if data.is_empty() { + return None; + } + + let had_remainder = !self.remainder.is_empty(); + if had_remainder { + let mut combined = std::mem::take(&mut self.remainder); + combined.extend_from_slice(&data); + data = combined; + } + + let split = find_utf8_split_point(&data); + if split == data.len() { + return Some(data); + } + + // Only hold back a candidate incomplete suffix when we have evidence that + // the bytes before it are already UTF-8 text. If split is 0, the whole + // read may be the start of a UTF-8 character, so keep it for the next read. + if !had_remainder && split > 0 && std::str::from_utf8(&data[..split]).is_err() { + return Some(data); + } + + self.remainder = data.split_off(split); + if data.is_empty() { + None + } else { + Some(data) + } + } + + fn finish(&mut self) -> Option> { + if self.remainder.is_empty() { + None + } else { + Some(std::mem::take(&mut self.remainder)) + } + } +} + /// Try to send data through the output channel with rate-limited drop logging. /// Returns `true` if the caller should break out of the read loop (channel disconnected). fn try_send_output( @@ -570,7 +704,11 @@ fn try_send_output( false } Err(mpsc::TrySendError::Disconnected(_)) => { - log::debug!("Terminal {}{} output channel disconnected", terminal_id, label); + log::debug!( + "Terminal {}{} output channel disconnected", + terminal_id, + label + ); true } } @@ -937,15 +1075,35 @@ impl TerminalServiceProxy { if let Some(session_arc) = service.sessions.get(&open.terminal_id) { // Reconnect to existing terminal let mut session = session_arc.lock().unwrap(); - // Directly enter Active state with pending buffer for immediate streaming. - // Historical buffer is sent first by read_outputs(), then real-time data follows. - // No overlap: pending_buffer comes from output_buffer (pre-disconnect history), - // while received_data in read_outputs() comes from the channel (post-reconnect). - // During disconnect, the run loop (sp.ok()) exits so read_outputs() stops being - // called; output_buffer is not updated, and channel data may be lost if it fills up. - let buffer = session + // Directly enter Active state with pending replay for immediate streaming. + // The replay combines output_buffer history and the channel backlog that was + // already pending at reconnect time so the client can suppress stale xterm + // query answers without requiring a protobuf schema change. + // During disconnect, read_outputs() is not called; channel data can still be lost + // if output_rx fills before reconnect drains it. + let mut buffer = session .output_buffer .get_recent(DEFAULT_RECONNECT_BUFFER_BYTES); + let mut reconnect_backlog = Vec::new(); + if let Some(output_rx) = &session.output_rx { + // Cap reconnect-time drain so a chatty PTY cannot keep OpenTerminal + // inside this loop indefinitely. Remaining output is drained by read_outputs(). + for _ in 0..CHANNEL_BUFFER_SIZE { + let Ok(data) = output_rx.try_recv() else { + break; + }; + reconnect_backlog.push(data); + } + } + let has_reconnect_backlog = !reconnect_backlog.is_empty(); + for data in reconnect_backlog { + session.output_buffer.append(&data); + } + if has_reconnect_backlog { + buffer = session + .output_buffer + .get_recent(DEFAULT_RECONNECT_BUFFER_BYTES); + } let has_pending = !buffer.is_empty(); session.state = SessionState::Active { pending_buffer: if has_pending { Some(buffer) } else { None }, @@ -959,9 +1117,14 @@ impl TerminalServiceProxy { let mut opened = TerminalOpened::new(); opened.terminal_id = open.terminal_id; opened.success = true; - opened.message = "Reconnected to existing terminal".to_string(); + opened.message = if has_pending { + "Reconnected to existing terminal with pending output".to_string() + } else { + "Reconnected to existing terminal".to_string() + }; opened.pid = session.pid; opened.service_id = self.service_id.clone(); + opened.replay_terminal_output = has_pending; if service.needs_session_sync { if service.sessions.len() > 1 { // No need to include the current terminal in the list. @@ -1016,6 +1179,9 @@ impl TerminalServiceProxy { #[allow(unused_mut)] let mut cmd = CommandBuilder::new(&shell); + #[cfg(target_os = "windows")] + configure_utf8_shell_command(&shell, &mut cmd); + // macOS-specific terminal configuration // 1. Use login shell (-l) to load user's shell profile (~/.zprofile, ~/.bash_profile) // This ensures PATH includes Homebrew paths (/opt/homebrew/bin, /usr/local/bin) @@ -1036,6 +1202,12 @@ impl TerminalServiceProxy { }; cmd.env("TERM", term); log::debug!("Set TERM={} for macOS PTY", term); + + if should_force_process_utf8_ctype() { + cmd.env_remove("LC_ALL"); + cmd.env("LC_CTYPE", "en_US.UTF-8"); + log::debug!("Set LC_CTYPE=en_US.UTF-8 for macOS PTY"); + } } // Note: On Windows with user_token, we use helper mode (handle_open_with_helper) @@ -1086,6 +1258,7 @@ impl TerminalServiceProxy { let reader_thread = thread::spawn(move || { let mut reader = reader; let mut buf = vec![0u8; 4096]; + let mut utf8_chunks = Utf8ChunkAccumulator::default(); let mut drop_count: u64 = 0; // Initialize to > 5s ago so the first drop triggers a warning immediately. let mut last_drop_warn = Instant::now() - Duration::from_secs(6); @@ -1095,13 +1268,25 @@ impl TerminalServiceProxy { // EOF // This branch can be reached when the child process exits on macOS. // But not on Linux and Windows in my tests. + if let Some(data) = utf8_chunks.finish() { + let _ = try_send_output( + &output_tx, + data, + terminal_id, + "", + &mut drop_count, + &mut last_drop_warn, + ); + } break; } Ok(n) => { if exiting.load(Ordering::SeqCst) { break; } - let data = buf[..n].to_vec(); + let Some(data) = utf8_chunks.push_chunk(buf[..n].to_vec()) else { + continue; + }; // Use try_send to avoid blocking the reader thread when channel is full. // During disconnect, the run loop (sp.ok()) stops and read_outputs() is // no longer called, so the channel won't be drained. Blocking send would @@ -1308,12 +1493,23 @@ impl TerminalServiceProxy { let terminal_id = open.terminal_id; let reader_thread = thread::spawn(move || { let mut buf = vec![0u8; 4096]; + let mut utf8_chunks = Utf8ChunkAccumulator::default(); let mut drop_count: u64 = 0; // Initialize to > 5s ago so the first drop triggers a warning immediately. let mut last_drop_warn = Instant::now() - Duration::from_secs(6); loop { match output_pipe.read(&mut buf) { Ok(0) => { + if let Some(data) = utf8_chunks.finish() { + let _ = try_send_output( + &output_tx, + data, + terminal_id, + " (helper)", + &mut drop_count, + &mut last_drop_warn, + ); + } // EOF - helper process exited log::debug!("Terminal {} helper output EOF", terminal_id); break; @@ -1322,7 +1518,9 @@ impl TerminalServiceProxy { if exiting.load(Ordering::SeqCst) { break; } - let data = buf[..n].to_vec(); + let Some(data) = utf8_chunks.push_chunk(buf[..n].to_vec()) else { + continue; + }; // Use try_send to avoid blocking the reader thread (same as direct PTY mode) if try_send_output( &output_tx, @@ -1462,20 +1660,28 @@ impl TerminalServiceProxy { data: &TerminalData, ) -> Result> { if let Some(session_arc) = session { - let mut session = session_arc.lock().unwrap(); - session.update_activity(); - if let Some(input_tx) = &session.input_tx { - // Encode data for helper mode or send raw for direct PTY mode - #[cfg(target_os = "windows")] - let msg = if session.is_helper_mode { - encode_helper_message(MSG_TYPE_DATA, &data.data) - } else { - data.data.to_vec() - }; - #[cfg(not(target_os = "windows"))] - let msg = data.data.to_vec(); + let input = { + let mut session = session_arc.lock().unwrap(); + session.update_activity(); + if let Some(input_tx) = session.input_tx.clone() { + // Encode data for helper mode or send raw for direct PTY mode + #[cfg(target_os = "windows")] + let msg = if session.is_helper_mode { + encode_helper_message(MSG_TYPE_DATA, &data.data) + } else { + data.data.to_vec() + }; + #[cfg(not(target_os = "windows"))] + let msg = data.data.to_vec(); - // Send data to writer thread + Some((input_tx, msg)) + } else { + None + } + }; + + if let Some((input_tx, msg)) = input { + // Send outside the session lock; SyncSender::send can block when full. if let Err(e) = input_tx.send(msg) { log::error!( "Failed to send data to terminal {}: {}", @@ -1683,10 +1889,6 @@ impl TerminalServiceProxy { } } - if has_activity { - session.update_activity(); - } - // Update buffer (always buffer for reconnection support) for data in &received_data { session.output_buffer.append(data); @@ -1696,7 +1898,7 @@ impl TerminalServiceProxy { // Data is already buffered above and will be sent on next reconnection. // Use a scoped block to limit the mutable borrow of session.state, // so we can immutably borrow other session fields afterwards. - let sigwinch_action = { + let (replay_buffer, sigwinch_action) = { let (pending_buffer, sigwinch) = match &mut session.state { SessionState::Active { pending_buffer, @@ -1705,19 +1907,12 @@ impl TerminalServiceProxy { _ => continue, }; - // Send pending buffer response first (set on reconnection in handle_open). - // This ensures historical buffer is sent before any real-time data. - if let Some(buffer) = pending_buffer.take() { - if !buffer.is_empty() { - responses - .push(Self::create_terminal_data_response(terminal_id, buffer)); - } - } + let replay_buffer = pending_buffer.take(); // Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale. // Each phase is a single PTY resize, spaced ~30ms apart by the polling // interval, ensuring the TUI app sees a real size change on each signal. - match sigwinch { + let sigwinch_action = match sigwinch { SigwinchPhase::TempResize { retries } => { if *retries == 0 { log::warn!( @@ -1745,9 +1940,20 @@ impl TerminalServiceProxy { } } SigwinchPhase::Idle => None, - } + }; + (replay_buffer, sigwinch_action) }; + if let Some(buffer) = replay_buffer { + if !buffer.is_empty() { + responses.push(Self::create_terminal_data_response(terminal_id, buffer)); + } + } + + if has_activity { + session.update_activity(); + } + // Execute SIGWINCH resize outside the mutable borrow scope of session.state. if let Some(action) = sigwinch_action { #[cfg(target_os = "windows")] @@ -1845,3 +2051,116 @@ impl TerminalServiceProxy { } } } + +#[cfg(test)] +mod tests { + use super::{find_utf8_split_point, OutputBuffer, Utf8ChunkAccumulator, MAX_BUFFER_LINES}; + + #[test] + fn utf8_split_point_returns_full_len_for_complete_input() { + assert_eq!(find_utf8_split_point(b"hello"), 5); + assert_eq!(find_utf8_split_point("中文".as_bytes()), "中文".len()); + assert_eq!(find_utf8_split_point("😀".as_bytes()), "😀".len()); + } + + #[test] + fn utf8_split_point_detects_incomplete_trailing_sequence() { + let data = [b'a', 0xE4, 0xB8]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn utf8_split_point_keeps_malformed_prefix_but_buffers_trailing_lead_byte() { + let data = [0xFF, 0xE4]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn utf8_split_point_treats_orphan_continuations_as_complete() { + let data = [0x80, 0x81, 0x82]; + assert_eq!(find_utf8_split_point(&data), data.len()); + } + + #[test] + fn utf8_chunk_accumulator_reassembles_split_multibyte_output() { + let full = "你好世界".as_bytes(); + let mut chunker = Utf8ChunkAccumulator::default(); + let mut output = Vec::new(); + + for chunk in full.chunks(5) { + if let Some(data) = chunker.push_chunk(chunk.to_vec()) { + output.extend_from_slice(&data); + } + } + + if let Some(data) = chunker.finish() { + output.extend_from_slice(&data); + } + + assert_eq!(output, full); + } + + #[test] + fn utf8_chunk_accumulator_buffers_leading_split_multibyte_output() { + let mut chunker = Utf8ChunkAccumulator::default(); + + assert!(chunker.push_chunk(vec![0xE4]).is_none()); + assert!(chunker.push_chunk(vec![0xB8]).is_none()); + assert_eq!( + chunker.push_chunk(vec![0xAD]), + Some("中".as_bytes().to_vec()) + ); + assert!(chunker.finish().is_none()); + } + + #[test] + fn utf8_chunk_accumulator_flushes_incomplete_tail_on_finish() { + let mut chunker = Utf8ChunkAccumulator::default(); + assert_eq!(chunker.push_chunk(vec![b'a', 0xE4]), Some(vec![b'a'])); + assert_eq!(chunker.finish(), Some(vec![0xE4])); + assert!(chunker.finish().is_none()); + } + + #[test] + fn utf8_chunk_accumulator_does_not_stall_on_malformed_bytes() { + let mut chunker = Utf8ChunkAccumulator::default(); + assert_eq!(chunker.push_chunk(vec![0xFF]), Some(vec![0xFF])); + assert!(chunker.finish().is_none()); + } + + #[test] + fn utf8_chunk_accumulator_buffers_lone_utf8_lead_bytes() { + let mut chunker = Utf8ChunkAccumulator::default(); + assert!(chunker.push_chunk(vec![0xE4]).is_none()); + assert_eq!(chunker.finish(), Some(vec![0xE4])); + } + + #[test] + fn utf8_chunk_accumulator_does_not_hold_back_non_utf8_prefixes() { + let mut chunker = Utf8ChunkAccumulator::default(); + assert_eq!(chunker.push_chunk(vec![0xFF, 0xE4]), Some(vec![0xFF, 0xE4])); + assert!(chunker.finish().is_none()); + } + + #[test] + fn output_buffer_trim_after_incomplete_merge_does_not_underflow() { + let mut buffer = OutputBuffer::new(); + + // Create an incomplete line first. + buffer.append(b"hello"); + + // Merge a large chunk that contains the first newline at the tail. + // This exercises the "append to last incomplete line" branch. + let mut large = vec![b'a'; 30_000]; + large.push(b'\n'); + buffer.append(&large); + + // Exceed MAX_BUFFER_LINES so trim pops the first large merged line. + for _ in 0..=MAX_BUFFER_LINES { + buffer.append(b"x\n"); + } + + let actual_size: usize = buffer.lines.iter().map(|line| line.len()).sum(); + assert_eq!(buffer.total_size, actual_size); + } +}