mirror of
https://github.com/rustdesk/rustdesk.git
synced 2026-05-11 08:38:11 +03:00
Terminal utf8 and reconnect (#14895)
* fix: handle incomplete UTF-8 sequences in terminal output, rework on https://github.com/rustdesk/rustdesk/pull/14736 * Fix terminal auto-reconnect freeze: reconnect resumes terminal output, while multi-tab reconnect avoids restoring duplicate tabs for terminals that are already open. * fix(terminal): subtract with overflow ``` thread '<unnamed>' panicked at src\server\terminal_service.rs:476:17: attempt to subtract with overflow note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace thread 'tokio-runtime-worker' panicked at src\server\terminal_service.rs:1576:50: called `Result::unwrap()` on an `Err` value: PoisonError { .. } [2026-04-25T07:17:34Z ERROR librustdesk::server::service] Failed to join thread for service ts_9badd3fe-2411-4996-9f40-93c979009edd, Any { .. } ``` Signed-off-by: fufesou <linlong1266@gmail.com> * fix ios enter: https://github.com/rustdesk/rustdesk/issues/14907 * fix(terminal): reconnect, error handling 1. Terminal shows "^[[1;1R^[[2;2R^[[>0;0;0c" 2. NaN ``` [ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: Converting object to an encodable object failed: NaN ... ``` Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): dialog, close window Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): close terminal window on disconnect dialog Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): merge reconnect backlog into replay output Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): avoid reconnect stalls and delayed layout writes Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): remove invalid test Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): schedule frame before flushing buffered output Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): windows&macos, charset utf-8 Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): reconnect suppress next output Signed-off-by: fufesou <linlong1266@gmail.com> * fix: cap terminal reconnect replay output - split reconnect replay backlog into capped chunks - mark terminal data replay chunks for client-side suppression - avoid using open-message text to suppress xterm replies - reuse default terminal padding value - remove misleading Enter-key normalization PR link Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): env en_US.UTF-8 Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): reconnect, refactor Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): flag, retry output Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): update hbb_common Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): comments Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): comments utf-8 chunk accumulator Signed-off-by: fufesou <linlong1266@gmail.com> * fix(terminal): update hbb_common Signed-off-by: fufesou <linlong1266@gmail.com> --------- Signed-off-by: fufesou <linlong1266@gmail.com> Co-authored-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
@@ -20,10 +20,11 @@ use std::{
|
||||
// Windows-specific imports from terminal_helper module
|
||||
#[cfg(target_os = "windows")]
|
||||
use super::terminal_helper::{
|
||||
create_named_pipe_server, encode_helper_message, encode_resize_message,
|
||||
is_helper_process_running, launch_terminal_helper_with_token, wait_for_pipe_connection,
|
||||
HelperProcessGuard, OwnedHandle, SendableHandle, WinCloseHandle, WinTerminateProcess,
|
||||
WinWaitForSingleObject, MSG_TYPE_DATA, PIPE_CONNECTION_TIMEOUT_MS, WIN_WAIT_OBJECT_0,
|
||||
configure_utf8_shell_command, create_named_pipe_server, encode_helper_message,
|
||||
encode_resize_message, is_helper_process_running, launch_terminal_helper_with_token,
|
||||
wait_for_pipe_connection, HelperProcessGuard, OwnedHandle, SendableHandle, WinCloseHandle,
|
||||
WinTerminateProcess, WinWaitForSingleObject, MSG_TYPE_DATA, PIPE_CONNECTION_TIMEOUT_MS,
|
||||
WIN_WAIT_OBJECT_0,
|
||||
};
|
||||
|
||||
const MAX_OUTPUT_BUFFER_SIZE: usize = 1024 * 1024; // 1MB per terminal
|
||||
@@ -133,6 +134,26 @@ fn get_default_shell() -> String {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
fn locale_value_is_utf8(value: &str) -> bool {
|
||||
let value = value.to_ascii_uppercase();
|
||||
value.contains("UTF-8") || value.contains("UTF8")
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
fn should_force_process_utf8_ctype() -> bool {
|
||||
if let Ok(value) = std::env::var("LC_ALL") {
|
||||
return !locale_value_is_utf8(&value);
|
||||
}
|
||||
if let Ok(value) = std::env::var("LC_CTYPE") {
|
||||
return !locale_value_is_utf8(&value);
|
||||
}
|
||||
if let Ok(value) = std::env::var("LANG") {
|
||||
return !locale_value_is_utf8(&value);
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
pub fn is_service_specified_user(service_id: &str) -> Option<bool> {
|
||||
get_service(service_id).map(|s| s.lock().unwrap().is_specified_user)
|
||||
}
|
||||
@@ -435,6 +456,7 @@ impl OutputBuffer {
|
||||
// Find first newline in new data
|
||||
if let Some(newline_pos) = data.iter().position(|&b| b == b'\n') {
|
||||
last_line.extend_from_slice(&data[..=newline_pos]);
|
||||
self.total_size += newline_pos + 1;
|
||||
start = newline_pos + 1;
|
||||
self.last_line_incomplete = false;
|
||||
} else {
|
||||
@@ -473,7 +495,28 @@ impl OutputBuffer {
|
||||
// Trim old data if buffer is too large
|
||||
while self.total_size > MAX_OUTPUT_BUFFER_SIZE || self.lines.len() > MAX_BUFFER_LINES {
|
||||
if let Some(removed) = self.lines.pop_front() {
|
||||
self.total_size -= removed.len();
|
||||
if removed.len() > self.total_size {
|
||||
log::error!(
|
||||
"OutputBuffer total_size underflow avoided: total_size={}, removed_len={}, lines_len={}",
|
||||
self.total_size,
|
||||
removed.len(),
|
||||
self.lines.len()
|
||||
);
|
||||
self.total_size = self.lines.iter().map(|line| line.len()).sum();
|
||||
} else {
|
||||
self.total_size -= removed.len();
|
||||
}
|
||||
if self.lines.is_empty() {
|
||||
self.last_line_incomplete = false;
|
||||
}
|
||||
} else {
|
||||
log::error!(
|
||||
"OutputBuffer trim invariant broken: total_size={}, lines_len=0",
|
||||
self.total_size
|
||||
);
|
||||
self.total_size = 0;
|
||||
self.last_line_incomplete = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -531,6 +574,97 @@ impl OutputBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the largest prefix of `buf` that does not end in the middle of a UTF-8
|
||||
/// code point. Invalid bytes are treated as complete so they can continue
|
||||
/// downstream and be rendered with replacement characters if needed.
|
||||
fn find_utf8_split_point(buf: &[u8]) -> usize {
|
||||
if buf.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let start = buf.len().saturating_sub(3);
|
||||
for i in (start..buf.len()).rev() {
|
||||
let b = buf[i];
|
||||
if b & 0x80 == 0 {
|
||||
return buf.len();
|
||||
}
|
||||
if b & 0xC0 == 0x80 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let seq_len = if b & 0xE0 == 0xC0 {
|
||||
2
|
||||
} else if b & 0xF0 == 0xE0 {
|
||||
3
|
||||
} else if b & 0xF8 == 0xF0 {
|
||||
4
|
||||
} else {
|
||||
return buf.len();
|
||||
};
|
||||
|
||||
return if buf.len() - i >= seq_len {
|
||||
buf.len()
|
||||
} else {
|
||||
i
|
||||
};
|
||||
}
|
||||
|
||||
buf.len()
|
||||
}
|
||||
|
||||
// Terminal output currently follows a UTF-8 text model end to end: the service
|
||||
// keeps replay buffers on UTF-8 boundaries, and Flutter decodes payload bytes as
|
||||
// UTF-8 before writing to xterm. This accumulator only prevents splitting a
|
||||
// trailing UTF-8 code point across PTY reads. Supporting non-UTF-8 terminals
|
||||
// would need a separate design covering remote encoding detection, Flutter
|
||||
// decoding, replay truncation, and input transcoding.
|
||||
#[derive(Default)]
|
||||
struct Utf8ChunkAccumulator {
|
||||
remainder: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Utf8ChunkAccumulator {
|
||||
fn push_chunk(&mut self, mut data: Vec<u8>) -> Option<Vec<u8>> {
|
||||
if data.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let had_remainder = !self.remainder.is_empty();
|
||||
if had_remainder {
|
||||
let mut combined = std::mem::take(&mut self.remainder);
|
||||
combined.extend_from_slice(&data);
|
||||
data = combined;
|
||||
}
|
||||
|
||||
let split = find_utf8_split_point(&data);
|
||||
if split == data.len() {
|
||||
return Some(data);
|
||||
}
|
||||
|
||||
// Only hold back a candidate incomplete suffix when we have evidence that
|
||||
// the bytes before it are already UTF-8 text. If split is 0, the whole
|
||||
// read may be the start of a UTF-8 character, so keep it for the next read.
|
||||
if !had_remainder && split > 0 && std::str::from_utf8(&data[..split]).is_err() {
|
||||
return Some(data);
|
||||
}
|
||||
|
||||
self.remainder = data.split_off(split);
|
||||
if data.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(data)
|
||||
}
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Option<Vec<u8>> {
|
||||
if self.remainder.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(std::mem::take(&mut self.remainder))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to send data through the output channel with rate-limited drop logging.
|
||||
/// Returns `true` if the caller should break out of the read loop (channel disconnected).
|
||||
fn try_send_output(
|
||||
@@ -570,7 +704,11 @@ fn try_send_output(
|
||||
false
|
||||
}
|
||||
Err(mpsc::TrySendError::Disconnected(_)) => {
|
||||
log::debug!("Terminal {}{} output channel disconnected", terminal_id, label);
|
||||
log::debug!(
|
||||
"Terminal {}{} output channel disconnected",
|
||||
terminal_id,
|
||||
label
|
||||
);
|
||||
true
|
||||
}
|
||||
}
|
||||
@@ -937,15 +1075,35 @@ impl TerminalServiceProxy {
|
||||
if let Some(session_arc) = service.sessions.get(&open.terminal_id) {
|
||||
// Reconnect to existing terminal
|
||||
let mut session = session_arc.lock().unwrap();
|
||||
// Directly enter Active state with pending buffer for immediate streaming.
|
||||
// Historical buffer is sent first by read_outputs(), then real-time data follows.
|
||||
// No overlap: pending_buffer comes from output_buffer (pre-disconnect history),
|
||||
// while received_data in read_outputs() comes from the channel (post-reconnect).
|
||||
// During disconnect, the run loop (sp.ok()) exits so read_outputs() stops being
|
||||
// called; output_buffer is not updated, and channel data may be lost if it fills up.
|
||||
let buffer = session
|
||||
// Directly enter Active state with pending replay for immediate streaming.
|
||||
// The replay combines output_buffer history and the channel backlog that was
|
||||
// already pending at reconnect time so the client can suppress stale xterm
|
||||
// query answers without requiring a protobuf schema change.
|
||||
// During disconnect, read_outputs() is not called; channel data can still be lost
|
||||
// if output_rx fills before reconnect drains it.
|
||||
let mut buffer = session
|
||||
.output_buffer
|
||||
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
|
||||
let mut reconnect_backlog = Vec::new();
|
||||
if let Some(output_rx) = &session.output_rx {
|
||||
// Cap reconnect-time drain so a chatty PTY cannot keep OpenTerminal
|
||||
// inside this loop indefinitely. Remaining output is drained by read_outputs().
|
||||
for _ in 0..CHANNEL_BUFFER_SIZE {
|
||||
let Ok(data) = output_rx.try_recv() else {
|
||||
break;
|
||||
};
|
||||
reconnect_backlog.push(data);
|
||||
}
|
||||
}
|
||||
let has_reconnect_backlog = !reconnect_backlog.is_empty();
|
||||
for data in reconnect_backlog {
|
||||
session.output_buffer.append(&data);
|
||||
}
|
||||
if has_reconnect_backlog {
|
||||
buffer = session
|
||||
.output_buffer
|
||||
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
|
||||
}
|
||||
let has_pending = !buffer.is_empty();
|
||||
session.state = SessionState::Active {
|
||||
pending_buffer: if has_pending { Some(buffer) } else { None },
|
||||
@@ -959,9 +1117,14 @@ impl TerminalServiceProxy {
|
||||
let mut opened = TerminalOpened::new();
|
||||
opened.terminal_id = open.terminal_id;
|
||||
opened.success = true;
|
||||
opened.message = "Reconnected to existing terminal".to_string();
|
||||
opened.message = if has_pending {
|
||||
"Reconnected to existing terminal with pending output".to_string()
|
||||
} else {
|
||||
"Reconnected to existing terminal".to_string()
|
||||
};
|
||||
opened.pid = session.pid;
|
||||
opened.service_id = self.service_id.clone();
|
||||
opened.replay_terminal_output = has_pending;
|
||||
if service.needs_session_sync {
|
||||
if service.sessions.len() > 1 {
|
||||
// No need to include the current terminal in the list.
|
||||
@@ -1016,6 +1179,9 @@ impl TerminalServiceProxy {
|
||||
#[allow(unused_mut)]
|
||||
let mut cmd = CommandBuilder::new(&shell);
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
configure_utf8_shell_command(&shell, &mut cmd);
|
||||
|
||||
// macOS-specific terminal configuration
|
||||
// 1. Use login shell (-l) to load user's shell profile (~/.zprofile, ~/.bash_profile)
|
||||
// This ensures PATH includes Homebrew paths (/opt/homebrew/bin, /usr/local/bin)
|
||||
@@ -1036,6 +1202,12 @@ impl TerminalServiceProxy {
|
||||
};
|
||||
cmd.env("TERM", term);
|
||||
log::debug!("Set TERM={} for macOS PTY", term);
|
||||
|
||||
if should_force_process_utf8_ctype() {
|
||||
cmd.env_remove("LC_ALL");
|
||||
cmd.env("LC_CTYPE", "en_US.UTF-8");
|
||||
log::debug!("Set LC_CTYPE=en_US.UTF-8 for macOS PTY");
|
||||
}
|
||||
}
|
||||
|
||||
// Note: On Windows with user_token, we use helper mode (handle_open_with_helper)
|
||||
@@ -1086,6 +1258,7 @@ impl TerminalServiceProxy {
|
||||
let reader_thread = thread::spawn(move || {
|
||||
let mut reader = reader;
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let mut utf8_chunks = Utf8ChunkAccumulator::default();
|
||||
let mut drop_count: u64 = 0;
|
||||
// Initialize to > 5s ago so the first drop triggers a warning immediately.
|
||||
let mut last_drop_warn = Instant::now() - Duration::from_secs(6);
|
||||
@@ -1095,13 +1268,25 @@ impl TerminalServiceProxy {
|
||||
// EOF
|
||||
// This branch can be reached when the child process exits on macOS.
|
||||
// But not on Linux and Windows in my tests.
|
||||
if let Some(data) = utf8_chunks.finish() {
|
||||
let _ = try_send_output(
|
||||
&output_tx,
|
||||
data,
|
||||
terminal_id,
|
||||
"",
|
||||
&mut drop_count,
|
||||
&mut last_drop_warn,
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
if exiting.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
let data = buf[..n].to_vec();
|
||||
let Some(data) = utf8_chunks.push_chunk(buf[..n].to_vec()) else {
|
||||
continue;
|
||||
};
|
||||
// Use try_send to avoid blocking the reader thread when channel is full.
|
||||
// During disconnect, the run loop (sp.ok()) stops and read_outputs() is
|
||||
// no longer called, so the channel won't be drained. Blocking send would
|
||||
@@ -1308,12 +1493,23 @@ impl TerminalServiceProxy {
|
||||
let terminal_id = open.terminal_id;
|
||||
let reader_thread = thread::spawn(move || {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let mut utf8_chunks = Utf8ChunkAccumulator::default();
|
||||
let mut drop_count: u64 = 0;
|
||||
// Initialize to > 5s ago so the first drop triggers a warning immediately.
|
||||
let mut last_drop_warn = Instant::now() - Duration::from_secs(6);
|
||||
loop {
|
||||
match output_pipe.read(&mut buf) {
|
||||
Ok(0) => {
|
||||
if let Some(data) = utf8_chunks.finish() {
|
||||
let _ = try_send_output(
|
||||
&output_tx,
|
||||
data,
|
||||
terminal_id,
|
||||
" (helper)",
|
||||
&mut drop_count,
|
||||
&mut last_drop_warn,
|
||||
);
|
||||
}
|
||||
// EOF - helper process exited
|
||||
log::debug!("Terminal {} helper output EOF", terminal_id);
|
||||
break;
|
||||
@@ -1322,7 +1518,9 @@ impl TerminalServiceProxy {
|
||||
if exiting.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
let data = buf[..n].to_vec();
|
||||
let Some(data) = utf8_chunks.push_chunk(buf[..n].to_vec()) else {
|
||||
continue;
|
||||
};
|
||||
// Use try_send to avoid blocking the reader thread (same as direct PTY mode)
|
||||
if try_send_output(
|
||||
&output_tx,
|
||||
@@ -1462,20 +1660,28 @@ impl TerminalServiceProxy {
|
||||
data: &TerminalData,
|
||||
) -> Result<Option<TerminalResponse>> {
|
||||
if let Some(session_arc) = session {
|
||||
let mut session = session_arc.lock().unwrap();
|
||||
session.update_activity();
|
||||
if let Some(input_tx) = &session.input_tx {
|
||||
// Encode data for helper mode or send raw for direct PTY mode
|
||||
#[cfg(target_os = "windows")]
|
||||
let msg = if session.is_helper_mode {
|
||||
encode_helper_message(MSG_TYPE_DATA, &data.data)
|
||||
} else {
|
||||
data.data.to_vec()
|
||||
};
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
let msg = data.data.to_vec();
|
||||
let input = {
|
||||
let mut session = session_arc.lock().unwrap();
|
||||
session.update_activity();
|
||||
if let Some(input_tx) = session.input_tx.clone() {
|
||||
// Encode data for helper mode or send raw for direct PTY mode
|
||||
#[cfg(target_os = "windows")]
|
||||
let msg = if session.is_helper_mode {
|
||||
encode_helper_message(MSG_TYPE_DATA, &data.data)
|
||||
} else {
|
||||
data.data.to_vec()
|
||||
};
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
let msg = data.data.to_vec();
|
||||
|
||||
// Send data to writer thread
|
||||
Some((input_tx, msg))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((input_tx, msg)) = input {
|
||||
// Send outside the session lock; SyncSender::send can block when full.
|
||||
if let Err(e) = input_tx.send(msg) {
|
||||
log::error!(
|
||||
"Failed to send data to terminal {}: {}",
|
||||
@@ -1683,10 +1889,6 @@ impl TerminalServiceProxy {
|
||||
}
|
||||
}
|
||||
|
||||
if has_activity {
|
||||
session.update_activity();
|
||||
}
|
||||
|
||||
// Update buffer (always buffer for reconnection support)
|
||||
for data in &received_data {
|
||||
session.output_buffer.append(data);
|
||||
@@ -1696,7 +1898,7 @@ impl TerminalServiceProxy {
|
||||
// Data is already buffered above and will be sent on next reconnection.
|
||||
// Use a scoped block to limit the mutable borrow of session.state,
|
||||
// so we can immutably borrow other session fields afterwards.
|
||||
let sigwinch_action = {
|
||||
let (replay_buffer, sigwinch_action) = {
|
||||
let (pending_buffer, sigwinch) = match &mut session.state {
|
||||
SessionState::Active {
|
||||
pending_buffer,
|
||||
@@ -1705,19 +1907,12 @@ impl TerminalServiceProxy {
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
// Send pending buffer response first (set on reconnection in handle_open).
|
||||
// This ensures historical buffer is sent before any real-time data.
|
||||
if let Some(buffer) = pending_buffer.take() {
|
||||
if !buffer.is_empty() {
|
||||
responses
|
||||
.push(Self::create_terminal_data_response(terminal_id, buffer));
|
||||
}
|
||||
}
|
||||
let replay_buffer = pending_buffer.take();
|
||||
|
||||
// Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale.
|
||||
// Each phase is a single PTY resize, spaced ~30ms apart by the polling
|
||||
// interval, ensuring the TUI app sees a real size change on each signal.
|
||||
match sigwinch {
|
||||
let sigwinch_action = match sigwinch {
|
||||
SigwinchPhase::TempResize { retries } => {
|
||||
if *retries == 0 {
|
||||
log::warn!(
|
||||
@@ -1745,9 +1940,20 @@ impl TerminalServiceProxy {
|
||||
}
|
||||
}
|
||||
SigwinchPhase::Idle => None,
|
||||
}
|
||||
};
|
||||
(replay_buffer, sigwinch_action)
|
||||
};
|
||||
|
||||
if let Some(buffer) = replay_buffer {
|
||||
if !buffer.is_empty() {
|
||||
responses.push(Self::create_terminal_data_response(terminal_id, buffer));
|
||||
}
|
||||
}
|
||||
|
||||
if has_activity {
|
||||
session.update_activity();
|
||||
}
|
||||
|
||||
// Execute SIGWINCH resize outside the mutable borrow scope of session.state.
|
||||
if let Some(action) = sigwinch_action {
|
||||
#[cfg(target_os = "windows")]
|
||||
@@ -1845,3 +2051,116 @@ impl TerminalServiceProxy {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{find_utf8_split_point, OutputBuffer, Utf8ChunkAccumulator, MAX_BUFFER_LINES};
|
||||
|
||||
#[test]
|
||||
fn utf8_split_point_returns_full_len_for_complete_input() {
|
||||
assert_eq!(find_utf8_split_point(b"hello"), 5);
|
||||
assert_eq!(find_utf8_split_point("中文".as_bytes()), "中文".len());
|
||||
assert_eq!(find_utf8_split_point("😀".as_bytes()), "😀".len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_split_point_detects_incomplete_trailing_sequence() {
|
||||
let data = [b'a', 0xE4, 0xB8];
|
||||
assert_eq!(find_utf8_split_point(&data), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_split_point_keeps_malformed_prefix_but_buffers_trailing_lead_byte() {
|
||||
let data = [0xFF, 0xE4];
|
||||
assert_eq!(find_utf8_split_point(&data), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_split_point_treats_orphan_continuations_as_complete() {
|
||||
let data = [0x80, 0x81, 0x82];
|
||||
assert_eq!(find_utf8_split_point(&data), data.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_chunk_accumulator_reassembles_split_multibyte_output() {
|
||||
let full = "你好世界".as_bytes();
|
||||
let mut chunker = Utf8ChunkAccumulator::default();
|
||||
let mut output = Vec::new();
|
||||
|
||||
for chunk in full.chunks(5) {
|
||||
if let Some(data) = chunker.push_chunk(chunk.to_vec()) {
|
||||
output.extend_from_slice(&data);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(data) = chunker.finish() {
|
||||
output.extend_from_slice(&data);
|
||||
}
|
||||
|
||||
assert_eq!(output, full);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_chunk_accumulator_buffers_leading_split_multibyte_output() {
|
||||
let mut chunker = Utf8ChunkAccumulator::default();
|
||||
|
||||
assert!(chunker.push_chunk(vec![0xE4]).is_none());
|
||||
assert!(chunker.push_chunk(vec![0xB8]).is_none());
|
||||
assert_eq!(
|
||||
chunker.push_chunk(vec![0xAD]),
|
||||
Some("中".as_bytes().to_vec())
|
||||
);
|
||||
assert!(chunker.finish().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_chunk_accumulator_flushes_incomplete_tail_on_finish() {
|
||||
let mut chunker = Utf8ChunkAccumulator::default();
|
||||
assert_eq!(chunker.push_chunk(vec![b'a', 0xE4]), Some(vec![b'a']));
|
||||
assert_eq!(chunker.finish(), Some(vec![0xE4]));
|
||||
assert!(chunker.finish().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_chunk_accumulator_does_not_stall_on_malformed_bytes() {
|
||||
let mut chunker = Utf8ChunkAccumulator::default();
|
||||
assert_eq!(chunker.push_chunk(vec![0xFF]), Some(vec![0xFF]));
|
||||
assert!(chunker.finish().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_chunk_accumulator_buffers_lone_utf8_lead_bytes() {
|
||||
let mut chunker = Utf8ChunkAccumulator::default();
|
||||
assert!(chunker.push_chunk(vec![0xE4]).is_none());
|
||||
assert_eq!(chunker.finish(), Some(vec![0xE4]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utf8_chunk_accumulator_does_not_hold_back_non_utf8_prefixes() {
|
||||
let mut chunker = Utf8ChunkAccumulator::default();
|
||||
assert_eq!(chunker.push_chunk(vec![0xFF, 0xE4]), Some(vec![0xFF, 0xE4]));
|
||||
assert!(chunker.finish().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn output_buffer_trim_after_incomplete_merge_does_not_underflow() {
|
||||
let mut buffer = OutputBuffer::new();
|
||||
|
||||
// Create an incomplete line first.
|
||||
buffer.append(b"hello");
|
||||
|
||||
// Merge a large chunk that contains the first newline at the tail.
|
||||
// This exercises the "append to last incomplete line" branch.
|
||||
let mut large = vec![b'a'; 30_000];
|
||||
large.push(b'\n');
|
||||
buffer.append(&large);
|
||||
|
||||
// Exceed MAX_BUFFER_LINES so trim pops the first large merged line.
|
||||
for _ in 0..=MAX_BUFFER_LINES {
|
||||
buffer.append(b"x\n");
|
||||
}
|
||||
|
||||
let actual_size: usize = buffer.lines.iter().map(|line| line.len()).sum();
|
||||
assert_eq!(buffer.total_size, actual_size);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user