diff --git a/src/server/terminal_service.rs b/src/server/terminal_service.rs index fb6b4fd29..3029d095f 100644 --- a/src/server/terminal_service.rs +++ b/src/server/terminal_service.rs @@ -531,6 +531,91 @@ impl OutputBuffer { } } +/// Find the largest prefix of `buf` that does not end in the middle of a UTF-8 +/// code point. Invalid bytes are treated as complete so they can continue +/// downstream and be rendered with replacement characters if needed. +fn find_utf8_split_point(buf: &[u8]) -> usize { + if buf.is_empty() { + return 0; + } + + let start = buf.len().saturating_sub(3); + for i in (start..buf.len()).rev() { + let b = buf[i]; + if b & 0x80 == 0 { + return buf.len(); + } + if b & 0xC0 == 0x80 { + continue; + } + + let seq_len = if b & 0xE0 == 0xC0 { + 2 + } else if b & 0xF0 == 0xE0 { + 3 + } else if b & 0xF8 == 0xF0 { + 4 + } else { + return buf.len(); + }; + + return if buf.len() - i >= seq_len { + buf.len() + } else { + i + }; + } + + buf.len() +} + +#[derive(Default)] +struct Utf8ChunkAccumulator { + remainder: Vec, +} + +impl Utf8ChunkAccumulator { + fn push_chunk(&mut self, mut data: Vec) -> Option> { + if data.is_empty() { + return None; + } + + let had_remainder = !self.remainder.is_empty(); + if had_remainder { + let mut combined = std::mem::take(&mut self.remainder); + combined.extend_from_slice(&data); + data = combined; + } + + let split = find_utf8_split_point(&data); + if split == data.len() { + return Some(data); + } + + // Only hold back a candidate incomplete suffix when we have evidence that + // the bytes before it are already UTF-8 text. If split is 0, the whole + // read may be the start of a UTF-8 character, so keep it for the next read. + if !had_remainder && split > 0 && std::str::from_utf8(&data[..split]).is_err() { + return Some(data); + } + + self.remainder = data.split_off(split); + if data.is_empty() { + None + } else { + Some(data) + } + } + + fn finish(&mut self) -> Option> { + if self.remainder.is_empty() { + None + } else { + Some(std::mem::take(&mut self.remainder)) + } + } +} + /// Try to send data through the output channel with rate-limited drop logging. /// Returns `true` if the caller should break out of the read loop (channel disconnected). fn try_send_output( @@ -1086,6 +1171,7 @@ impl TerminalServiceProxy { let reader_thread = thread::spawn(move || { let mut reader = reader; let mut buf = vec![0u8; 4096]; + let mut utf8_chunks = Utf8ChunkAccumulator::default(); let mut drop_count: u64 = 0; // Initialize to > 5s ago so the first drop triggers a warning immediately. let mut last_drop_warn = Instant::now() - Duration::from_secs(6); @@ -1095,13 +1181,25 @@ impl TerminalServiceProxy { // EOF // This branch can be reached when the child process exits on macOS. // But not on Linux and Windows in my tests. + if let Some(data) = utf8_chunks.finish() { + let _ = try_send_output( + &output_tx, + data, + terminal_id, + "", + &mut drop_count, + &mut last_drop_warn, + ); + } break; } Ok(n) => { if exiting.load(Ordering::SeqCst) { break; } - let data = buf[..n].to_vec(); + let Some(data) = utf8_chunks.push_chunk(buf[..n].to_vec()) else { + continue; + }; // Use try_send to avoid blocking the reader thread when channel is full. // During disconnect, the run loop (sp.ok()) stops and read_outputs() is // no longer called, so the channel won't be drained. Blocking send would @@ -1308,12 +1406,23 @@ impl TerminalServiceProxy { let terminal_id = open.terminal_id; let reader_thread = thread::spawn(move || { let mut buf = vec![0u8; 4096]; + let mut utf8_chunks = Utf8ChunkAccumulator::default(); let mut drop_count: u64 = 0; // Initialize to > 5s ago so the first drop triggers a warning immediately. let mut last_drop_warn = Instant::now() - Duration::from_secs(6); loop { match output_pipe.read(&mut buf) { Ok(0) => { + if let Some(data) = utf8_chunks.finish() { + let _ = try_send_output( + &output_tx, + data, + terminal_id, + " (helper)", + &mut drop_count, + &mut last_drop_warn, + ); + } // EOF - helper process exited log::debug!("Terminal {} helper output EOF", terminal_id); break; @@ -1322,7 +1431,9 @@ impl TerminalServiceProxy { if exiting.load(Ordering::SeqCst) { break; } - let data = buf[..n].to_vec(); + let Some(data) = utf8_chunks.push_chunk(buf[..n].to_vec()) else { + continue; + }; // Use try_send to avoid blocking the reader thread (same as direct PTY mode) if try_send_output( &output_tx, @@ -1845,3 +1956,94 @@ impl TerminalServiceProxy { } } } + +#[cfg(test)] +mod tests { + use super::{find_utf8_split_point, Utf8ChunkAccumulator}; + + #[test] + fn utf8_split_point_returns_full_len_for_complete_input() { + assert_eq!(find_utf8_split_point(b"hello"), 5); + assert_eq!(find_utf8_split_point("中文".as_bytes()), "中文".len()); + assert_eq!(find_utf8_split_point("😀".as_bytes()), "😀".len()); + } + + #[test] + fn utf8_split_point_detects_incomplete_trailing_sequence() { + let data = [b'a', 0xE4, 0xB8]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn utf8_split_point_keeps_malformed_prefix_but_buffers_trailing_lead_byte() { + let data = [0xFF, 0xE4]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn utf8_split_point_treats_orphan_continuations_as_complete() { + let data = [0x80, 0x81, 0x82]; + assert_eq!(find_utf8_split_point(&data), data.len()); + } + + #[test] + fn utf8_chunk_accumulator_reassembles_split_multibyte_output() { + let full = "你好世界".as_bytes(); + let mut chunker = Utf8ChunkAccumulator::default(); + let mut output = Vec::new(); + + for chunk in full.chunks(5) { + if let Some(data) = chunker.push_chunk(chunk.to_vec()) { + output.extend_from_slice(&data); + } + } + + if let Some(data) = chunker.finish() { + output.extend_from_slice(&data); + } + + assert_eq!(output, full); + } + + #[test] + fn utf8_chunk_accumulator_buffers_leading_split_multibyte_output() { + let mut chunker = Utf8ChunkAccumulator::default(); + + assert!(chunker.push_chunk(vec![0xE4]).is_none()); + assert!(chunker.push_chunk(vec![0xB8]).is_none()); + assert_eq!( + chunker.push_chunk(vec![0xAD]), + Some("中".as_bytes().to_vec()) + ); + assert!(chunker.finish().is_none()); + } + + #[test] + fn utf8_chunk_accumulator_flushes_incomplete_tail_on_finish() { + let mut chunker = Utf8ChunkAccumulator::default(); + assert_eq!(chunker.push_chunk(vec![b'a', 0xE4]), Some(vec![b'a'])); + assert_eq!(chunker.finish(), Some(vec![0xE4])); + assert!(chunker.finish().is_none()); + } + + #[test] + fn utf8_chunk_accumulator_does_not_stall_on_malformed_bytes() { + let mut chunker = Utf8ChunkAccumulator::default(); + assert_eq!(chunker.push_chunk(vec![0xFF]), Some(vec![0xFF])); + assert!(chunker.finish().is_none()); + } + + #[test] + fn utf8_chunk_accumulator_buffers_lone_utf8_lead_bytes() { + let mut chunker = Utf8ChunkAccumulator::default(); + assert!(chunker.push_chunk(vec![0xE4]).is_none()); + assert_eq!(chunker.finish(), Some(vec![0xE4])); + } + + #[test] + fn utf8_chunk_accumulator_does_not_hold_back_non_utf8_prefixes() { + let mut chunker = Utf8ChunkAccumulator::default(); + assert_eq!(chunker.push_chunk(vec![0xFF, 0xE4]), Some(vec![0xFF, 0xE4])); + assert!(chunker.finish().is_none()); + } +}