Compare commits

...

22 Commits

Author SHA1 Message Date
fufesou
cdfd986cb9 Merge branch 'master' into terminal-utf8-and-reconnect 2026-05-07 12:13:44 +08:00
fufesou
e47e5b38b6 fix(terminal): update hbb_common
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-05-07 12:12:37 +08:00
fufesou
6a53757f68 fix(terminal): comments utf-8 chunk accumulator
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-05-03 11:37:01 +08:00
fufesou
5a65d45244 fix(terminal): comments
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-05-03 09:53:56 +08:00
fufesou
e99829d709 fix(terminal): update hbb_common
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-30 21:43:38 +08:00
fufesou
071c6b1c12 fix(terminal): flag, retry output
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-29 23:48:55 +08:00
fufesou
26a356d0f5 fix(terminal): reconnect, refactor
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-29 23:11:13 +08:00
fufesou
929a4e78ba fix(terminal): env en_US.UTF-8
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-29 22:30:39 +08:00
fufesou
18479129a2 fix: cap terminal reconnect replay output
- split reconnect replay backlog into capped chunks
  - mark terminal data replay chunks for client-side suppression
  - avoid using open-message text to suppress xterm replies
  - reuse default terminal padding value
  - remove misleading Enter-key normalization PR link

Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-29 22:12:26 +08:00
fufesou
b516dfb15b fix(terminal): reconnect suppress next output
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-29 20:54:04 +08:00
fufesou
d5568d9188 fix(terminal): windows&macos, charset utf-8
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-29 10:11:25 +08:00
fufesou
1745bab204 fix(terminal): schedule frame before flushing buffered output
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-28 20:46:47 +08:00
fufesou
0eff404323 fix(terminal): remove invalid test
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-28 20:31:37 +08:00
fufesou
67b5484ded fix(terminal): avoid reconnect stalls and delayed layout writes
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-28 20:21:15 +08:00
fufesou
268827ef64 fix(terminal): merge reconnect backlog into replay output
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-28 20:11:18 +08:00
fufesou
c4542b4a5d fix(terminal): close terminal window on disconnect dialog
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-28 19:47:57 +08:00
fufesou
59f3060a04 fix(terminal): dialog, close window
Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-28 18:40:49 +08:00
fufesou
0a1500a72a fix(terminal): reconnect, error handling
1. Terminal shows "^[[1;1R^[[2;2R^[[>0;0;0c"
2. NaN

```
[ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: Converting object to an encodable object failed: NaN
...
```

Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-28 17:58:51 +08:00
rustdesk
4f5c7db70a fix ios enter: https://github.com/rustdesk/rustdesk/issues/14907 2026-04-26 09:08:11 +08:00
fufesou
0112167029 fix(terminal): subtract with overflow
```
thread '<unnamed>' panicked at src\server\terminal_service.rs:476:17:
attempt to subtract with overflow
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'tokio-runtime-worker' panicked at src\server\terminal_service.rs:1576:50:
called `Result::unwrap()` on an `Err` value: PoisonError { .. }
[2026-04-25T07:17:34Z ERROR librustdesk::server::service] Failed to join thread for service ts_9badd3fe-2411-4996-9f40-93c979009edd, Any { .. }
```

Signed-off-by: fufesou <linlong1266@gmail.com>
2026-04-25 15:46:25 +08:00
rustdesk
0d77482a64 Fix terminal auto-reconnect freeze: reconnect resumes terminal output, while multi-tab reconnect avoids restoring duplicate tabs for terminals that are already open. 2026-04-25 01:07:00 +08:00
rustdesk
ca3ef2a1c3 fix: handle incomplete UTF-8 sequences in terminal output, rework on https://github.com/rustdesk/rustdesk/pull/14736 2026-04-25 00:44:23 +08:00
9 changed files with 560 additions and 80 deletions

View File

@@ -716,6 +716,17 @@ closeConnection({String? id}) {
stateGlobal.isInMainPage = true; stateGlobal.isInMainPage = true;
} else { } else {
final controller = Get.find<DesktopTabController>(); final controller = Get.find<DesktopTabController>();
if (controller.tabType == DesktopTabType.terminal &&
controller.onCloseWindow != null) {
// Terminal windows are scoped to one peer. The optional id passed to
// closeConnection() is that peer id, not a terminal tab key
// (${peerId}_${terminalId}). Closing from terminal dialogs should close
// the peer's whole terminal window, including all terminal tabs.
unawaited(controller.onCloseWindow!().catchError((e, _) {
debugPrint('[closeConnection] Failed to close terminal window: $e');
}));
return;
}
controller.closeBy(id); controller.closeBy(id);
} }
} }
@@ -4179,8 +4190,7 @@ Widget? buildAvatarWidget({
width: size, width: size,
height: size, height: size,
fit: BoxFit.cover, fit: BoxFit.cover,
errorBuilder: (_, __, ___) => errorBuilder: (_, __, ___) => fallback ?? SizedBox.shrink(),
fallback ?? SizedBox.shrink(),
), ),
); );
} }

View File

@@ -27,6 +27,7 @@ class TerminalPage extends StatefulWidget {
final bool? isSharedPassword; final bool? isSharedPassword;
final String? connToken; final String? connToken;
final int terminalId; final int terminalId;
/// Tab key for focus management, passed from parent to avoid duplicate construction /// Tab key for focus management, passed from parent to avoid duplicate construction
final String tabKey; final String tabKey;
final SimpleWrapper<State<TerminalPage>?> _lastState = SimpleWrapper(null); final SimpleWrapper<State<TerminalPage>?> _lastState = SimpleWrapper(null);
@@ -43,6 +44,9 @@ class TerminalPage extends StatefulWidget {
class _TerminalPageState extends State<TerminalPage> class _TerminalPageState extends State<TerminalPage>
with AutomaticKeepAliveClientMixin { with AutomaticKeepAliveClientMixin {
static const EdgeInsets _defaultTerminalPadding =
EdgeInsets.symmetric(horizontal: 5.0, vertical: 2.0);
late FFI _ffi; late FFI _ffi;
late TerminalModel _terminalModel; late TerminalModel _terminalModel;
double? _cellHeight; double? _cellHeight;
@@ -155,13 +159,27 @@ class _TerminalPageState extends State<TerminalPage>
// extra space left after dividing the available height by the height of a single // extra space left after dividing the available height by the height of a single
// terminal row (`_cellHeight`) and distributing it evenly as top and bottom padding. // terminal row (`_cellHeight`) and distributing it evenly as top and bottom padding.
EdgeInsets _calculatePadding(double heightPx) { EdgeInsets _calculatePadding(double heightPx) {
if (_cellHeight == null) { final cellHeight = _cellHeight;
return const EdgeInsets.symmetric(horizontal: 5.0, vertical: 2.0); if (!heightPx.isFinite ||
heightPx <= 0 ||
cellHeight == null ||
!cellHeight.isFinite ||
cellHeight <= 0) {
return _defaultTerminalPadding;
}
final rows = (heightPx / cellHeight).floor();
if (rows <= 0) {
return _defaultTerminalPadding;
}
final extraSpace = heightPx - rows * cellHeight;
if (!extraSpace.isFinite || extraSpace < 0) {
return _defaultTerminalPadding;
} }
final rows = (heightPx / _cellHeight!).floor();
final extraSpace = heightPx - rows * _cellHeight!;
final topBottom = extraSpace / 2.0; final topBottom = extraSpace / 2.0;
return EdgeInsets.symmetric(horizontal: 5.0, vertical: topBottom); return EdgeInsets.symmetric(
horizontal: _defaultTerminalPadding.horizontal / 2,
vertical: topBottom,
);
} }
@override @override

View File

@@ -46,6 +46,7 @@ class _TerminalTabPageState extends State<TerminalTabPage> {
.setTitle(getWindowNameWithId(id)); .setTitle(getWindowNameWithId(id));
}; };
tabController.onRemoved = (_, id) => onRemoveId(id); tabController.onRemoved = (_, id) => onRemoveId(id);
tabController.onCloseWindow = _closeWindowFromConnection;
final terminalId = params['terminalId'] ?? _nextTerminalId++; final terminalId = params['terminalId'] ?? _nextTerminalId++;
tabController.add(_createTerminalTab( tabController.add(_createTerminalTab(
peerId: params['id'], peerId: params['id'],
@@ -144,6 +145,8 @@ class _TerminalTabPageState extends State<TerminalTabPage> {
_windowClosing = true; _windowClosing = true;
final tabKeys = tabController.state.value.tabs.map((t) => t.key).toList(); final tabKeys = tabController.state.value.tabs.map((t) => t.key).toList();
// Remove all UI tabs immediately (same instant behavior as the old tabController.clear()) // Remove all UI tabs immediately (same instant behavior as the old tabController.clear())
// Keep the cleanup target lookup below synchronous before its first await:
// it relies on the current frame still retaining each TerminalPage's FFI/model.
tabController.clear(); tabController.clear();
// Run session cleanup in parallel with bounded timeout (closeTerminal() has internal 3s timeout). // Run session cleanup in parallel with bounded timeout (closeTerminal() has internal 3s timeout).
// Skip tabs already being closed by a concurrent _closeTab() to avoid duplicate FFI calls. // Skip tabs already being closed by a concurrent _closeTab() to avoid duplicate FFI calls.
@@ -368,8 +371,34 @@ class _TerminalTabPageState extends State<TerminalTabPage> {
final persistentSessions = final persistentSessions =
args['persistent_sessions'] as List<dynamic>? ?? []; args['persistent_sessions'] as List<dynamic>? ?? [];
final sortedSessions = persistentSessions.whereType<int>().toList()..sort(); final sortedSessions = persistentSessions.whereType<int>().toList()..sort();
var peerId = args['peer_id'] as String? ?? '';
if (peerId.isEmpty) {
if (tabController.state.value.tabs.isEmpty ||
tabController.state.value.selected >=
tabController.state.value.tabs.length) {
debugPrint('[TerminalTabPage] Skip restore: no selected tab');
return;
}
final currentTab = tabController.state.value.selectedTabInfo;
final parsed = _parseTabKey(currentTab.key);
if (parsed == null) return;
peerId = parsed.$1;
}
final existingTerminalIds = tabController.state.value.tabs
.map((tab) => _parseTabKey(tab.key))
.where((parsed) => parsed != null && parsed.$1 == peerId)
.map((parsed) => parsed!.$2)
.toSet();
if (existingTerminalIds.isEmpty) {
debugPrint(
'[TerminalTabPage] Skip restore: no seed tab for peer $peerId');
return;
}
for (final terminalId in sortedSessions) { for (final terminalId in sortedSessions) {
_addNewTerminalForCurrentPeer(terminalId: terminalId); if (!existingTerminalIds.add(terminalId)) {
continue;
}
_addNewTerminal(peerId, terminalId: terminalId);
// A delay is required to ensure the UI has sufficient time to update // A delay is required to ensure the UI has sufficient time to update
// before adding the next terminal. Without this delay, `_TerminalPageState::dispose()` // before adding the next terminal. Without this delay, `_TerminalPageState::dispose()`
// may be called prematurely while the tab widget is still in the tab controller. // may be called prematurely while the tab widget is still in the tab controller.
@@ -546,6 +575,11 @@ class _TerminalTabPageState extends State<TerminalTabPage> {
} }
} }
Future<void> _closeWindowFromConnection() async {
await _closeAllTabs();
await WindowController.fromWindowId(windowId()).close();
}
int windowId() { int windowId() {
return widget.params["windowId"]; return widget.params["windowId"];
} }

View File

@@ -99,6 +99,7 @@ class DesktopTabController {
/// index, key /// index, key
Function(int, String)? onRemoved; Function(int, String)? onRemoved;
Function(String)? onSelected; Function(String)? onSelected;
Future<void> Function()? onCloseWindow;
DesktopTabController( DesktopTabController(
{required this.tabType, this.onRemoved, this.onSelected}); {required this.tabType, this.onRemoved, this.onSelected});

View File

@@ -27,25 +27,30 @@ class TerminalModel with ChangeNotifier {
// Buffer for output data received before terminal view has valid dimensions. // Buffer for output data received before terminal view has valid dimensions.
// This prevents NaN errors when writing to terminal before layout is complete. // This prevents NaN errors when writing to terminal before layout is complete.
final _pendingOutputChunks = <String>[]; final _pendingOutputChunks = <String>[];
final _pendingOutputSuppressFlags = <bool>[];
int _pendingOutputSize = 0; int _pendingOutputSize = 0;
static const int _kMaxOutputBufferChars = 8 * 1024; static const int _kMaxOutputBufferChars = 8 * 1024;
// View ready state: true when terminal has valid dimensions, safe to write // View ready state: true when terminal has valid dimensions, safe to write
bool _terminalViewReady = false; bool _terminalViewReady = false;
bool _markViewReadyScheduled = false;
bool get isPeerWindows => parent.ffiModel.pi.platform == kPeerPlatformWindows; bool _suppressTerminalOutput = false;
bool _suppressNextTerminalDataOutput = false;
void Function(int w, int h, int pw, int ph)? onResizeExternal; void Function(int w, int h, int pw, int ph)? onResizeExternal;
Future<void> _handleInput(String data) async { Future<void> _handleInput(String data) async {
// If we press the `Enter` button on Android, // Soft keyboards (notably iOS) emit '\n' when Enter is pressed, while a
// `data` can be '\r' or '\n' when using different keyboards. // real keyboard's Enter sends '\r'. Some Android keyboards also emit '\n'.
// Android -> Windows. '\r' works, but '\n' does not. '\n' is just a newline. // - Peer Windows: '\r' works, '\n' is just a newline.
// Android -> Linux. Both '\r' and '\n' work as expected (execute a command). // - Peer Linux: canonical-mode shells accept both, but raw-mode apps
// So when we receive '\n', we may need to convert it to '\r' to ensure compatibility. // (readline, prompt_toolkit, vim, TUI frameworks) expect '\r'.
// Desktop -> Desktop works fine. // - Peer macOS: same as Linux, raw-mode apps expect '\r'
// Check if we are on mobile or web(mobile), and convert '\n' to '\r'. // (https://github.com/rustdesk/rustdesk/issues/14907).
// So on mobile / web-mobile, always normalize a lone '\n' to '\r'.
// We deliberately do not touch multi-character payloads (e.g. pasted text)
// so embedded newlines in pasted content are preserved.
final isMobileOrWebMobile = (isMobile || (isWeb && !isWebDesktop)); final isMobileOrWebMobile = (isMobile || (isWeb && !isWebDesktop));
if (isMobileOrWebMobile && isPeerWindows && data == '\n') { if (isMobileOrWebMobile && data == '\n') {
data = '\r'; data = '\r';
} }
if (_terminalOpened) { if (_terminalOpened) {
@@ -70,7 +75,10 @@ class TerminalModel with ChangeNotifier {
terminalController = TerminalController(); terminalController = TerminalController();
// Setup terminal callbacks // Setup terminal callbacks
terminal.onOutput = _handleInput; terminal.onOutput = (data) {
if (_suppressTerminalOutput) return;
_handleInput(data);
};
terminal.onResize = (w, h, pw, ph) async { terminal.onResize = (w, h, pw, ph) async {
// Validate all dimensions before using them // Validate all dimensions before using them
@@ -84,7 +92,7 @@ class TerminalModel with ChangeNotifier {
// Mark terminal view as ready and flush any buffered output on first valid resize. // Mark terminal view as ready and flush any buffered output on first valid resize.
// Must be after onResizeExternal so the view layer has valid dimensions before flushing. // Must be after onResizeExternal so the view layer has valid dimensions before flushing.
if (!_terminalViewReady) { if (!_terminalViewReady) {
_markViewReady(); _scheduleMarkViewReady();
} }
if (_terminalOpened) { if (_terminalOpened) {
@@ -110,14 +118,16 @@ class TerminalModel with ChangeNotifier {
void onReady() { void onReady() {
parent.dialogManager.dismissAll(); parent.dialogManager.dismissAll();
// Fire and forget - don't block onReady // Fire and forget - don't block onReady. If the transport reconnects while
openTerminal().catchError((e) { // this model is still open, re-send OpenTerminal so the remote service marks
// the persistent session active again and resumes output streaming.
openTerminal(force: _terminalOpened).catchError((e) {
debugPrint('[TerminalModel] Error opening terminal: $e'); debugPrint('[TerminalModel] Error opening terminal: $e');
}); });
} }
Future<void> openTerminal() async { Future<void> openTerminal({bool force = false}) async {
if (_terminalOpened) return; if (_terminalOpened && !force) return;
// Request the remote side to open a terminal with default shell // Request the remote side to open a terminal with default shell
// The remote side will decide which shell to use based on its OS // The remote side will decide which shell to use based on its OS
@@ -275,9 +285,12 @@ class TerminalModel with ChangeNotifier {
if (success) { if (success) {
_terminalOpened = true; _terminalOpened = true;
// On reconnect ("Reconnected to existing terminal"), server may replay recent output. // On reconnect, the server may replay recent output. That replay can include
// If this TerminalView instance is reused (not rebuilt), duplicate lines can appear. // terminal queries like DSR/DA; xterm answers them through onOutput as
// We intentionally accept this tradeoff for now to keep logic simple. // "^[[1;1R^[[2;2R^[[>0;0;0c", which must not be sent back to the peer.
final replayTerminalOutput = evt['replay_terminal_output'];
_suppressNextTerminalDataOutput = replayTerminalOutput == true ||
message == 'Reconnected to existing terminal with pending output';
// Fallback: if terminal view is not yet ready but already has valid // Fallback: if terminal view is not yet ready but already has valid
// dimensions (e.g. layout completed before open response arrived), // dimensions (e.g. layout completed before open response arrived),
@@ -285,7 +298,7 @@ class TerminalModel with ChangeNotifier {
if (!_terminalViewReady && if (!_terminalViewReady &&
terminal.viewWidth > 0 && terminal.viewWidth > 0 &&
terminal.viewHeight > 0) { terminal.viewHeight > 0) {
_markViewReady(); _scheduleMarkViewReady();
} }
// Process any buffered input // Process any buffered input
@@ -297,12 +310,16 @@ class TerminalModel with ChangeNotifier {
}); });
final persistentSessions = final persistentSessions =
evt['persistent_sessions'] as List<dynamic>? ?? []; (evt['persistent_sessions'] as List<dynamic>? ?? [])
.whereType<int>()
.where((id) => !parent.terminalModels.containsKey(id))
.toList();
if (kWindowId != null && persistentSessions.isNotEmpty) { if (kWindowId != null && persistentSessions.isNotEmpty) {
DesktopMultiWindow.invokeMethod( DesktopMultiWindow.invokeMethod(
kWindowId!, kWindowId!,
kWindowEventRestoreTerminalSessions, kWindowEventRestoreTerminalSessions,
jsonEncode({ jsonEncode({
'peer_id': id,
'persistent_sessions': persistentSessions, 'persistent_sessions': persistentSessions,
})); }));
} }
@@ -332,6 +349,8 @@ class TerminalModel with ChangeNotifier {
final data = evt['data']; final data = evt['data'];
if (data != null) { if (data != null) {
final suppressTerminalOutput = _suppressNextTerminalDataOutput;
_suppressNextTerminalDataOutput = false;
try { try {
String text = ''; String text = '';
if (data is String) { if (data is String) {
@@ -351,7 +370,7 @@ class TerminalModel with ChangeNotifier {
return; return;
} }
_writeToTerminal(text); _writeToTerminal(text, suppressTerminalOutput: suppressTerminalOutput);
} catch (e) { } catch (e) {
debugPrint('[TerminalModel] Failed to process terminal data: $e'); debugPrint('[TerminalModel] Failed to process terminal data: $e');
} }
@@ -361,7 +380,10 @@ class TerminalModel with ChangeNotifier {
/// Write text to terminal, buffering if the view is not yet ready. /// Write text to terminal, buffering if the view is not yet ready.
/// All terminal output should go through this method to avoid NaN errors /// All terminal output should go through this method to avoid NaN errors
/// from writing before the terminal view has valid layout dimensions. /// from writing before the terminal view has valid layout dimensions.
void _writeToTerminal(String text) { void _writeToTerminal(
String text, {
bool suppressTerminalOutput = false,
}) {
if (!_terminalViewReady) { if (!_terminalViewReady) {
// If a single chunk exceeds the cap, keep only its tail. // If a single chunk exceeds the cap, keep only its tail.
// Note: truncation may split a multi-byte ANSI escape sequence, // Note: truncation may split a multi-byte ANSI escape sequence,
@@ -373,34 +395,73 @@ class TerminalModel with ChangeNotifier {
_pendingOutputChunks _pendingOutputChunks
..clear() ..clear()
..add(truncated); ..add(truncated);
_pendingOutputSuppressFlags
..clear()
..add(suppressTerminalOutput);
_pendingOutputSize = truncated.length; _pendingOutputSize = truncated.length;
} else { } else {
_pendingOutputChunks.add(text); _pendingOutputChunks.add(text);
_pendingOutputSuppressFlags.add(suppressTerminalOutput);
_pendingOutputSize += text.length; _pendingOutputSize += text.length;
// Drop oldest chunks if exceeds limit (whole chunks to preserve ANSI sequences) // Drop oldest chunks if exceeds limit (whole chunks to preserve ANSI sequences)
while (_pendingOutputSize > _kMaxOutputBufferChars && while (_pendingOutputSize > _kMaxOutputBufferChars &&
_pendingOutputChunks.length > 1) { _pendingOutputChunks.length > 1) {
final removed = _pendingOutputChunks.removeAt(0); final removed = _pendingOutputChunks.removeAt(0);
_pendingOutputSuppressFlags.removeAt(0);
_pendingOutputSize -= removed.length; _pendingOutputSize -= removed.length;
} }
} }
return; return;
} }
terminal.write(text); _writeTerminalChunk(text, suppressTerminalOutput: suppressTerminalOutput);
} }
void _flushOutputBuffer() { void _flushOutputBuffer() {
if (_pendingOutputChunks.isEmpty) return; if (_pendingOutputChunks.isEmpty) return;
debugPrint( debugPrint(
'[TerminalModel] Flushing $_pendingOutputSize buffered chars (${_pendingOutputChunks.length} chunks)'); '[TerminalModel] Flushing $_pendingOutputSize buffered chars (${_pendingOutputChunks.length} chunks)');
for (final chunk in _pendingOutputChunks) { for (var i = 0; i < _pendingOutputChunks.length; i++) {
terminal.write(chunk); _writeTerminalChunk(
_pendingOutputChunks[i],
suppressTerminalOutput: _pendingOutputSuppressFlags[i],
);
} }
_pendingOutputChunks.clear(); _pendingOutputChunks.clear();
_pendingOutputSuppressFlags.clear();
_pendingOutputSize = 0; _pendingOutputSize = 0;
} }
void _writeTerminalChunk(
String text, {
required bool suppressTerminalOutput,
}) {
if (!suppressTerminalOutput) {
terminal.write(text);
return;
}
final previous = _suppressTerminalOutput;
_suppressTerminalOutput = true;
try {
terminal.write(text);
} finally {
_suppressTerminalOutput = previous;
}
}
/// Mark terminal view as ready and flush buffered output. /// Mark terminal view as ready and flush buffered output.
void _scheduleMarkViewReady() {
if (_disposed || _terminalViewReady || _markViewReadyScheduled) return;
_markViewReadyScheduled = true;
WidgetsBinding.instance.addPostFrameCallback((_) {
_markViewReadyScheduled = false;
if (_disposed || _terminalViewReady) return;
if (terminal.viewWidth > 0 && terminal.viewHeight > 0) {
_markViewReady();
}
});
WidgetsBinding.instance.ensureVisualUpdate();
}
void _markViewReady() { void _markViewReady() {
if (_terminalViewReady) return; if (_terminalViewReady) return;
_terminalViewReady = true; _terminalViewReady = true;
@@ -426,7 +487,10 @@ class TerminalModel with ChangeNotifier {
// Clear buffers to free memory // Clear buffers to free memory
_inputBuffer.clear(); _inputBuffer.clear();
_pendingOutputChunks.clear(); _pendingOutputChunks.clear();
_pendingOutputSuppressFlags.clear();
_pendingOutputSize = 0; _pendingOutputSize = 0;
_markViewReadyScheduled = false;
_suppressNextTerminalDataOutput = false;
// Terminal cleanup is handled server-side when service closes // Terminal cleanup is handled server-side when service closes
super.dispose(); super.dispose();
} }

View File

@@ -1135,6 +1135,10 @@ impl InvokeUiSession for FlutterHandler {
("message", json!(&opened.message)), ("message", json!(&opened.message)),
("pid", json!(opened.pid)), ("pid", json!(opened.pid)),
("service_id", json!(&opened.service_id)), ("service_id", json!(&opened.service_id)),
(
"replay_terminal_output",
json!(opened.replay_terminal_output),
),
]; ];
if !opened.persistent_sessions.is_empty() { if !opened.persistent_sessions.is_empty() {
event_data.push(("persistent_sessions", json!(opened.persistent_sessions))); event_data.push(("persistent_sessions", json!(opened.persistent_sessions)));

View File

@@ -318,6 +318,35 @@ pub fn get_default_shell() -> String {
std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string()) std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string())
} }
fn utf8_shell_args(shell: &str) -> Vec<String> {
let name = std::path::Path::new(shell)
.file_name()
.and_then(|name| name.to_str())
.unwrap_or(shell)
.to_ascii_lowercase();
if name == "cmd.exe" || name == "cmd" {
return vec!["/K".to_string(), "chcp 65001 >NUL".to_string()];
}
if name == "pwsh.exe" || name == "pwsh" || name == "powershell.exe" {
return vec![
"-NoLogo".to_string(),
"-NoExit".to_string(),
"-Command".to_string(),
"chcp.com 65001 > $null; [Console]::InputEncoding = [System.Text.Encoding]::UTF8; [Console]::OutputEncoding = [System.Text.Encoding]::UTF8".to_string(),
];
}
Vec::new()
}
pub fn configure_utf8_shell_command(shell: &str, cmd: &mut CommandBuilder) {
for arg in utf8_shell_args(shell) {
cmd.arg(arg);
}
}
/// Get the SID of the user from a token. /// Get the SID of the user from a token.
/// Returns a Vec<u8> containing the SID bytes. /// Returns a Vec<u8> containing the SID bytes.
pub fn get_user_sid_from_token(user_token: UserToken) -> Result<Vec<u8>> { pub fn get_user_sid_from_token(user_token: UserToken) -> Result<Vec<u8>> {
@@ -831,7 +860,8 @@ pub fn run_terminal_helper(args: &[String]) -> Result<()> {
let shell = get_default_shell(); let shell = get_default_shell();
log::debug!("Using shell: {}", shell); log::debug!("Using shell: {}", shell);
let cmd = CommandBuilder::new(&shell); let mut cmd = CommandBuilder::new(&shell);
configure_utf8_shell_command(&shell, &mut cmd);
let mut child = pty_pair let mut child = pty_pair
.slave .slave
.spawn_command(cmd) .spawn_command(cmd)

View File

@@ -20,10 +20,11 @@ use std::{
// Windows-specific imports from terminal_helper module // Windows-specific imports from terminal_helper module
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
use super::terminal_helper::{ use super::terminal_helper::{
create_named_pipe_server, encode_helper_message, encode_resize_message, configure_utf8_shell_command, create_named_pipe_server, encode_helper_message,
is_helper_process_running, launch_terminal_helper_with_token, wait_for_pipe_connection, encode_resize_message, is_helper_process_running, launch_terminal_helper_with_token,
HelperProcessGuard, OwnedHandle, SendableHandle, WinCloseHandle, WinTerminateProcess, wait_for_pipe_connection, HelperProcessGuard, OwnedHandle, SendableHandle, WinCloseHandle,
WinWaitForSingleObject, MSG_TYPE_DATA, PIPE_CONNECTION_TIMEOUT_MS, WIN_WAIT_OBJECT_0, WinTerminateProcess, WinWaitForSingleObject, MSG_TYPE_DATA, PIPE_CONNECTION_TIMEOUT_MS,
WIN_WAIT_OBJECT_0,
}; };
const MAX_OUTPUT_BUFFER_SIZE: usize = 1024 * 1024; // 1MB per terminal const MAX_OUTPUT_BUFFER_SIZE: usize = 1024 * 1024; // 1MB per terminal
@@ -133,6 +134,26 @@ fn get_default_shell() -> String {
} }
} }
#[cfg(target_os = "macos")]
fn locale_value_is_utf8(value: &str) -> bool {
let value = value.to_ascii_uppercase();
value.contains("UTF-8") || value.contains("UTF8")
}
#[cfg(target_os = "macos")]
fn should_force_process_utf8_ctype() -> bool {
if let Ok(value) = std::env::var("LC_ALL") {
return !locale_value_is_utf8(&value);
}
if let Ok(value) = std::env::var("LC_CTYPE") {
return !locale_value_is_utf8(&value);
}
if let Ok(value) = std::env::var("LANG") {
return !locale_value_is_utf8(&value);
}
true
}
pub fn is_service_specified_user(service_id: &str) -> Option<bool> { pub fn is_service_specified_user(service_id: &str) -> Option<bool> {
get_service(service_id).map(|s| s.lock().unwrap().is_specified_user) get_service(service_id).map(|s| s.lock().unwrap().is_specified_user)
} }
@@ -435,6 +456,7 @@ impl OutputBuffer {
// Find first newline in new data // Find first newline in new data
if let Some(newline_pos) = data.iter().position(|&b| b == b'\n') { if let Some(newline_pos) = data.iter().position(|&b| b == b'\n') {
last_line.extend_from_slice(&data[..=newline_pos]); last_line.extend_from_slice(&data[..=newline_pos]);
self.total_size += newline_pos + 1;
start = newline_pos + 1; start = newline_pos + 1;
self.last_line_incomplete = false; self.last_line_incomplete = false;
} else { } else {
@@ -473,8 +495,29 @@ impl OutputBuffer {
// Trim old data if buffer is too large // Trim old data if buffer is too large
while self.total_size > MAX_OUTPUT_BUFFER_SIZE || self.lines.len() > MAX_BUFFER_LINES { while self.total_size > MAX_OUTPUT_BUFFER_SIZE || self.lines.len() > MAX_BUFFER_LINES {
if let Some(removed) = self.lines.pop_front() { if let Some(removed) = self.lines.pop_front() {
if removed.len() > self.total_size {
log::error!(
"OutputBuffer total_size underflow avoided: total_size={}, removed_len={}, lines_len={}",
self.total_size,
removed.len(),
self.lines.len()
);
self.total_size = self.lines.iter().map(|line| line.len()).sum();
} else {
self.total_size -= removed.len(); self.total_size -= removed.len();
} }
if self.lines.is_empty() {
self.last_line_incomplete = false;
}
} else {
log::error!(
"OutputBuffer trim invariant broken: total_size={}, lines_len=0",
self.total_size
);
self.total_size = 0;
self.last_line_incomplete = false;
break;
}
} }
} }
@@ -531,6 +574,97 @@ impl OutputBuffer {
} }
} }
/// Find the largest prefix of `buf` that does not end in the middle of a UTF-8
/// code point. Invalid bytes are treated as complete so they can continue
/// downstream and be rendered with replacement characters if needed.
fn find_utf8_split_point(buf: &[u8]) -> usize {
if buf.is_empty() {
return 0;
}
let start = buf.len().saturating_sub(3);
for i in (start..buf.len()).rev() {
let b = buf[i];
if b & 0x80 == 0 {
return buf.len();
}
if b & 0xC0 == 0x80 {
continue;
}
let seq_len = if b & 0xE0 == 0xC0 {
2
} else if b & 0xF0 == 0xE0 {
3
} else if b & 0xF8 == 0xF0 {
4
} else {
return buf.len();
};
return if buf.len() - i >= seq_len {
buf.len()
} else {
i
};
}
buf.len()
}
// Terminal output currently follows a UTF-8 text model end to end: the service
// keeps replay buffers on UTF-8 boundaries, and Flutter decodes payload bytes as
// UTF-8 before writing to xterm. This accumulator only prevents splitting a
// trailing UTF-8 code point across PTY reads. Supporting non-UTF-8 terminals
// would need a separate design covering remote encoding detection, Flutter
// decoding, replay truncation, and input transcoding.
#[derive(Default)]
struct Utf8ChunkAccumulator {
remainder: Vec<u8>,
}
impl Utf8ChunkAccumulator {
fn push_chunk(&mut self, mut data: Vec<u8>) -> Option<Vec<u8>> {
if data.is_empty() {
return None;
}
let had_remainder = !self.remainder.is_empty();
if had_remainder {
let mut combined = std::mem::take(&mut self.remainder);
combined.extend_from_slice(&data);
data = combined;
}
let split = find_utf8_split_point(&data);
if split == data.len() {
return Some(data);
}
// Only hold back a candidate incomplete suffix when we have evidence that
// the bytes before it are already UTF-8 text. If split is 0, the whole
// read may be the start of a UTF-8 character, so keep it for the next read.
if !had_remainder && split > 0 && std::str::from_utf8(&data[..split]).is_err() {
return Some(data);
}
self.remainder = data.split_off(split);
if data.is_empty() {
None
} else {
Some(data)
}
}
fn finish(&mut self) -> Option<Vec<u8>> {
if self.remainder.is_empty() {
None
} else {
Some(std::mem::take(&mut self.remainder))
}
}
}
/// Try to send data through the output channel with rate-limited drop logging. /// Try to send data through the output channel with rate-limited drop logging.
/// Returns `true` if the caller should break out of the read loop (channel disconnected). /// Returns `true` if the caller should break out of the read loop (channel disconnected).
fn try_send_output( fn try_send_output(
@@ -570,7 +704,11 @@ fn try_send_output(
false false
} }
Err(mpsc::TrySendError::Disconnected(_)) => { Err(mpsc::TrySendError::Disconnected(_)) => {
log::debug!("Terminal {}{} output channel disconnected", terminal_id, label); log::debug!(
"Terminal {}{} output channel disconnected",
terminal_id,
label
);
true true
} }
} }
@@ -937,15 +1075,35 @@ impl TerminalServiceProxy {
if let Some(session_arc) = service.sessions.get(&open.terminal_id) { if let Some(session_arc) = service.sessions.get(&open.terminal_id) {
// Reconnect to existing terminal // Reconnect to existing terminal
let mut session = session_arc.lock().unwrap(); let mut session = session_arc.lock().unwrap();
// Directly enter Active state with pending buffer for immediate streaming. // Directly enter Active state with pending replay for immediate streaming.
// Historical buffer is sent first by read_outputs(), then real-time data follows. // The replay combines output_buffer history and the channel backlog that was
// No overlap: pending_buffer comes from output_buffer (pre-disconnect history), // already pending at reconnect time so the client can suppress stale xterm
// while received_data in read_outputs() comes from the channel (post-reconnect). // query answers without requiring a protobuf schema change.
// During disconnect, the run loop (sp.ok()) exits so read_outputs() stops being // During disconnect, read_outputs() is not called; channel data can still be lost
// called; output_buffer is not updated, and channel data may be lost if it fills up. // if output_rx fills before reconnect drains it.
let buffer = session let mut buffer = session
.output_buffer .output_buffer
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES); .get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
let mut reconnect_backlog = Vec::new();
if let Some(output_rx) = &session.output_rx {
// Cap reconnect-time drain so a chatty PTY cannot keep OpenTerminal
// inside this loop indefinitely. Remaining output is drained by read_outputs().
for _ in 0..CHANNEL_BUFFER_SIZE {
let Ok(data) = output_rx.try_recv() else {
break;
};
reconnect_backlog.push(data);
}
}
let has_reconnect_backlog = !reconnect_backlog.is_empty();
for data in reconnect_backlog {
session.output_buffer.append(&data);
}
if has_reconnect_backlog {
buffer = session
.output_buffer
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
}
let has_pending = !buffer.is_empty(); let has_pending = !buffer.is_empty();
session.state = SessionState::Active { session.state = SessionState::Active {
pending_buffer: if has_pending { Some(buffer) } else { None }, pending_buffer: if has_pending { Some(buffer) } else { None },
@@ -959,9 +1117,14 @@ impl TerminalServiceProxy {
let mut opened = TerminalOpened::new(); let mut opened = TerminalOpened::new();
opened.terminal_id = open.terminal_id; opened.terminal_id = open.terminal_id;
opened.success = true; opened.success = true;
opened.message = "Reconnected to existing terminal".to_string(); opened.message = if has_pending {
"Reconnected to existing terminal with pending output".to_string()
} else {
"Reconnected to existing terminal".to_string()
};
opened.pid = session.pid; opened.pid = session.pid;
opened.service_id = self.service_id.clone(); opened.service_id = self.service_id.clone();
opened.replay_terminal_output = has_pending;
if service.needs_session_sync { if service.needs_session_sync {
if service.sessions.len() > 1 { if service.sessions.len() > 1 {
// No need to include the current terminal in the list. // No need to include the current terminal in the list.
@@ -1016,6 +1179,9 @@ impl TerminalServiceProxy {
#[allow(unused_mut)] #[allow(unused_mut)]
let mut cmd = CommandBuilder::new(&shell); let mut cmd = CommandBuilder::new(&shell);
#[cfg(target_os = "windows")]
configure_utf8_shell_command(&shell, &mut cmd);
// macOS-specific terminal configuration // macOS-specific terminal configuration
// 1. Use login shell (-l) to load user's shell profile (~/.zprofile, ~/.bash_profile) // 1. Use login shell (-l) to load user's shell profile (~/.zprofile, ~/.bash_profile)
// This ensures PATH includes Homebrew paths (/opt/homebrew/bin, /usr/local/bin) // This ensures PATH includes Homebrew paths (/opt/homebrew/bin, /usr/local/bin)
@@ -1036,6 +1202,12 @@ impl TerminalServiceProxy {
}; };
cmd.env("TERM", term); cmd.env("TERM", term);
log::debug!("Set TERM={} for macOS PTY", term); log::debug!("Set TERM={} for macOS PTY", term);
if should_force_process_utf8_ctype() {
cmd.env_remove("LC_ALL");
cmd.env("LC_CTYPE", "en_US.UTF-8");
log::debug!("Set LC_CTYPE=en_US.UTF-8 for macOS PTY");
}
} }
// Note: On Windows with user_token, we use helper mode (handle_open_with_helper) // Note: On Windows with user_token, we use helper mode (handle_open_with_helper)
@@ -1086,6 +1258,7 @@ impl TerminalServiceProxy {
let reader_thread = thread::spawn(move || { let reader_thread = thread::spawn(move || {
let mut reader = reader; let mut reader = reader;
let mut buf = vec![0u8; 4096]; let mut buf = vec![0u8; 4096];
let mut utf8_chunks = Utf8ChunkAccumulator::default();
let mut drop_count: u64 = 0; let mut drop_count: u64 = 0;
// Initialize to > 5s ago so the first drop triggers a warning immediately. // Initialize to > 5s ago so the first drop triggers a warning immediately.
let mut last_drop_warn = Instant::now() - Duration::from_secs(6); let mut last_drop_warn = Instant::now() - Duration::from_secs(6);
@@ -1095,13 +1268,25 @@ impl TerminalServiceProxy {
// EOF // EOF
// This branch can be reached when the child process exits on macOS. // This branch can be reached when the child process exits on macOS.
// But not on Linux and Windows in my tests. // But not on Linux and Windows in my tests.
if let Some(data) = utf8_chunks.finish() {
let _ = try_send_output(
&output_tx,
data,
terminal_id,
"",
&mut drop_count,
&mut last_drop_warn,
);
}
break; break;
} }
Ok(n) => { Ok(n) => {
if exiting.load(Ordering::SeqCst) { if exiting.load(Ordering::SeqCst) {
break; break;
} }
let data = buf[..n].to_vec(); let Some(data) = utf8_chunks.push_chunk(buf[..n].to_vec()) else {
continue;
};
// Use try_send to avoid blocking the reader thread when channel is full. // Use try_send to avoid blocking the reader thread when channel is full.
// During disconnect, the run loop (sp.ok()) stops and read_outputs() is // During disconnect, the run loop (sp.ok()) stops and read_outputs() is
// no longer called, so the channel won't be drained. Blocking send would // no longer called, so the channel won't be drained. Blocking send would
@@ -1308,12 +1493,23 @@ impl TerminalServiceProxy {
let terminal_id = open.terminal_id; let terminal_id = open.terminal_id;
let reader_thread = thread::spawn(move || { let reader_thread = thread::spawn(move || {
let mut buf = vec![0u8; 4096]; let mut buf = vec![0u8; 4096];
let mut utf8_chunks = Utf8ChunkAccumulator::default();
let mut drop_count: u64 = 0; let mut drop_count: u64 = 0;
// Initialize to > 5s ago so the first drop triggers a warning immediately. // Initialize to > 5s ago so the first drop triggers a warning immediately.
let mut last_drop_warn = Instant::now() - Duration::from_secs(6); let mut last_drop_warn = Instant::now() - Duration::from_secs(6);
loop { loop {
match output_pipe.read(&mut buf) { match output_pipe.read(&mut buf) {
Ok(0) => { Ok(0) => {
if let Some(data) = utf8_chunks.finish() {
let _ = try_send_output(
&output_tx,
data,
terminal_id,
" (helper)",
&mut drop_count,
&mut last_drop_warn,
);
}
// EOF - helper process exited // EOF - helper process exited
log::debug!("Terminal {} helper output EOF", terminal_id); log::debug!("Terminal {} helper output EOF", terminal_id);
break; break;
@@ -1322,7 +1518,9 @@ impl TerminalServiceProxy {
if exiting.load(Ordering::SeqCst) { if exiting.load(Ordering::SeqCst) {
break; break;
} }
let data = buf[..n].to_vec(); let Some(data) = utf8_chunks.push_chunk(buf[..n].to_vec()) else {
continue;
};
// Use try_send to avoid blocking the reader thread (same as direct PTY mode) // Use try_send to avoid blocking the reader thread (same as direct PTY mode)
if try_send_output( if try_send_output(
&output_tx, &output_tx,
@@ -1462,9 +1660,10 @@ impl TerminalServiceProxy {
data: &TerminalData, data: &TerminalData,
) -> Result<Option<TerminalResponse>> { ) -> Result<Option<TerminalResponse>> {
if let Some(session_arc) = session { if let Some(session_arc) = session {
let input = {
let mut session = session_arc.lock().unwrap(); let mut session = session_arc.lock().unwrap();
session.update_activity(); session.update_activity();
if let Some(input_tx) = &session.input_tx { if let Some(input_tx) = session.input_tx.clone() {
// Encode data for helper mode or send raw for direct PTY mode // Encode data for helper mode or send raw for direct PTY mode
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
let msg = if session.is_helper_mode { let msg = if session.is_helper_mode {
@@ -1475,7 +1674,14 @@ impl TerminalServiceProxy {
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
let msg = data.data.to_vec(); let msg = data.data.to_vec();
// Send data to writer thread Some((input_tx, msg))
} else {
None
}
};
if let Some((input_tx, msg)) = input {
// Send outside the session lock; SyncSender::send can block when full.
if let Err(e) = input_tx.send(msg) { if let Err(e) = input_tx.send(msg) {
log::error!( log::error!(
"Failed to send data to terminal {}: {}", "Failed to send data to terminal {}: {}",
@@ -1683,10 +1889,6 @@ impl TerminalServiceProxy {
} }
} }
if has_activity {
session.update_activity();
}
// Update buffer (always buffer for reconnection support) // Update buffer (always buffer for reconnection support)
for data in &received_data { for data in &received_data {
session.output_buffer.append(data); session.output_buffer.append(data);
@@ -1696,7 +1898,7 @@ impl TerminalServiceProxy {
// Data is already buffered above and will be sent on next reconnection. // Data is already buffered above and will be sent on next reconnection.
// Use a scoped block to limit the mutable borrow of session.state, // Use a scoped block to limit the mutable borrow of session.state,
// so we can immutably borrow other session fields afterwards. // so we can immutably borrow other session fields afterwards.
let sigwinch_action = { let (replay_buffer, sigwinch_action) = {
let (pending_buffer, sigwinch) = match &mut session.state { let (pending_buffer, sigwinch) = match &mut session.state {
SessionState::Active { SessionState::Active {
pending_buffer, pending_buffer,
@@ -1705,19 +1907,12 @@ impl TerminalServiceProxy {
_ => continue, _ => continue,
}; };
// Send pending buffer response first (set on reconnection in handle_open). let replay_buffer = pending_buffer.take();
// This ensures historical buffer is sent before any real-time data.
if let Some(buffer) = pending_buffer.take() {
if !buffer.is_empty() {
responses
.push(Self::create_terminal_data_response(terminal_id, buffer));
}
}
// Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale. // Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale.
// Each phase is a single PTY resize, spaced ~30ms apart by the polling // Each phase is a single PTY resize, spaced ~30ms apart by the polling
// interval, ensuring the TUI app sees a real size change on each signal. // interval, ensuring the TUI app sees a real size change on each signal.
match sigwinch { let sigwinch_action = match sigwinch {
SigwinchPhase::TempResize { retries } => { SigwinchPhase::TempResize { retries } => {
if *retries == 0 { if *retries == 0 {
log::warn!( log::warn!(
@@ -1745,8 +1940,19 @@ impl TerminalServiceProxy {
} }
} }
SigwinchPhase::Idle => None, SigwinchPhase::Idle => None,
}
}; };
(replay_buffer, sigwinch_action)
};
if let Some(buffer) = replay_buffer {
if !buffer.is_empty() {
responses.push(Self::create_terminal_data_response(terminal_id, buffer));
}
}
if has_activity {
session.update_activity();
}
// Execute SIGWINCH resize outside the mutable borrow scope of session.state. // Execute SIGWINCH resize outside the mutable borrow scope of session.state.
if let Some(action) = sigwinch_action { if let Some(action) = sigwinch_action {
@@ -1845,3 +2051,116 @@ impl TerminalServiceProxy {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::{find_utf8_split_point, OutputBuffer, Utf8ChunkAccumulator, MAX_BUFFER_LINES};
#[test]
fn utf8_split_point_returns_full_len_for_complete_input() {
assert_eq!(find_utf8_split_point(b"hello"), 5);
assert_eq!(find_utf8_split_point("中文".as_bytes()), "中文".len());
assert_eq!(find_utf8_split_point("😀".as_bytes()), "😀".len());
}
#[test]
fn utf8_split_point_detects_incomplete_trailing_sequence() {
let data = [b'a', 0xE4, 0xB8];
assert_eq!(find_utf8_split_point(&data), 1);
}
#[test]
fn utf8_split_point_keeps_malformed_prefix_but_buffers_trailing_lead_byte() {
let data = [0xFF, 0xE4];
assert_eq!(find_utf8_split_point(&data), 1);
}
#[test]
fn utf8_split_point_treats_orphan_continuations_as_complete() {
let data = [0x80, 0x81, 0x82];
assert_eq!(find_utf8_split_point(&data), data.len());
}
#[test]
fn utf8_chunk_accumulator_reassembles_split_multibyte_output() {
let full = "你好世界".as_bytes();
let mut chunker = Utf8ChunkAccumulator::default();
let mut output = Vec::new();
for chunk in full.chunks(5) {
if let Some(data) = chunker.push_chunk(chunk.to_vec()) {
output.extend_from_slice(&data);
}
}
if let Some(data) = chunker.finish() {
output.extend_from_slice(&data);
}
assert_eq!(output, full);
}
#[test]
fn utf8_chunk_accumulator_buffers_leading_split_multibyte_output() {
let mut chunker = Utf8ChunkAccumulator::default();
assert!(chunker.push_chunk(vec![0xE4]).is_none());
assert!(chunker.push_chunk(vec![0xB8]).is_none());
assert_eq!(
chunker.push_chunk(vec![0xAD]),
Some("".as_bytes().to_vec())
);
assert!(chunker.finish().is_none());
}
#[test]
fn utf8_chunk_accumulator_flushes_incomplete_tail_on_finish() {
let mut chunker = Utf8ChunkAccumulator::default();
assert_eq!(chunker.push_chunk(vec![b'a', 0xE4]), Some(vec![b'a']));
assert_eq!(chunker.finish(), Some(vec![0xE4]));
assert!(chunker.finish().is_none());
}
#[test]
fn utf8_chunk_accumulator_does_not_stall_on_malformed_bytes() {
let mut chunker = Utf8ChunkAccumulator::default();
assert_eq!(chunker.push_chunk(vec![0xFF]), Some(vec![0xFF]));
assert!(chunker.finish().is_none());
}
#[test]
fn utf8_chunk_accumulator_buffers_lone_utf8_lead_bytes() {
let mut chunker = Utf8ChunkAccumulator::default();
assert!(chunker.push_chunk(vec![0xE4]).is_none());
assert_eq!(chunker.finish(), Some(vec![0xE4]));
}
#[test]
fn utf8_chunk_accumulator_does_not_hold_back_non_utf8_prefixes() {
let mut chunker = Utf8ChunkAccumulator::default();
assert_eq!(chunker.push_chunk(vec![0xFF, 0xE4]), Some(vec![0xFF, 0xE4]));
assert!(chunker.finish().is_none());
}
#[test]
fn output_buffer_trim_after_incomplete_merge_does_not_underflow() {
let mut buffer = OutputBuffer::new();
// Create an incomplete line first.
buffer.append(b"hello");
// Merge a large chunk that contains the first newline at the tail.
// This exercises the "append to last incomplete line" branch.
let mut large = vec![b'a'; 30_000];
large.push(b'\n');
buffer.append(&large);
// Exceed MAX_BUFFER_LINES so trim pops the first large merged line.
for _ in 0..=MAX_BUFFER_LINES {
buffer.append(b"x\n");
}
let actual_size: usize = buffer.lines.iter().map(|line| line.len()).sum();
assert_eq!(buffer.total_size, actual_size);
}
}