Fix/linux keep terminal sessions (#12222)

* fix: linux, keep terminal sessions

Signed-off-by: fufesou <linlong1266@gmail.com>

* fix: terminal service stucked at reader join

Signed-off-by: fufesou <linlong1266@gmail.com>

---------

Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
fufesou
2025-07-03 17:27:50 +08:00
committed by GitHub
parent 7ad3023285
commit f766d28c36
3 changed files with 88 additions and 13 deletions

View File

@@ -8,6 +8,7 @@ use std::{
collections::{HashMap, VecDeque},
io::{Read, Write},
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, Receiver, SyncSender},
Arc, Mutex,
},
@@ -141,11 +142,7 @@ fn remove_service(service_id: &str) {
let sessions = service.lock().unwrap().sessions.clone();
for (_, session) in sessions.iter() {
let mut session = session.lock().unwrap();
if let Some(mut child) = session.child.take() {
// Kill the process
let _ = child.kill();
add_to_reaper(child);
}
session.stop();
}
}
}
@@ -265,6 +262,15 @@ fn ensure_cleanup_task() {
}
}
#[cfg(target_os = "linux")]
pub fn get_terminal_session_count(include_zombie_tasks: bool) -> usize {
let mut c = TERMINAL_SERVICES.lock().unwrap().len();
if include_zombie_tasks {
c += TERMINAL_TASKS.lock().unwrap().len();
}
c
}
pub fn new(service_id: String, is_persistent: bool) -> GenericService {
// Create the service with initial persistence setting
allow_err!(get_or_create_service(service_id.clone(), is_persistent));
@@ -393,6 +399,7 @@ pub struct TerminalSession {
input_tx: Option<SyncSender<Vec<u8>>>,
// Channel for receiving output from the reader thread
output_rx: Option<Receiver<Vec<u8>>>,
exiting: Arc<AtomicBool>,
// Thread handles
reader_thread: Option<thread::JoinHandle<()>>,
writer_thread: Option<thread::JoinHandle<()>>,
@@ -414,6 +421,7 @@ impl TerminalSession {
child: None,
input_tx: None,
output_rx: None,
exiting: Arc::new(AtomicBool::new(false)),
reader_thread: None,
writer_thread: None,
output_buffer: OutputBuffer::new(),
@@ -428,29 +436,50 @@ impl TerminalSession {
fn update_activity(&mut self) {
self.last_activity = Instant::now();
}
}
impl Drop for TerminalSession {
fn drop(&mut self) {
// This helper function is to ensure that the threads are joined before the child process is dropped.
// Though this is not strictly necessary on macOS.
fn stop(&mut self) {
self.exiting.store(true, Ordering::SeqCst);
// Drop the input channel to signal writer thread to exit
drop(self.input_tx.take());
if let Some(input_tx) = self.input_tx.take() {
// Send a final newline to ensure the reader can read some data, and then exit.
// This is required on Windows and Linux.
if let Err(e) = input_tx.send(b"\r\n".to_vec()) {
log::warn!("Failed to send final newline to the terminal: {}", e);
}
drop(input_tx);
}
// Wait for threads to finish
if let Some(writer_thread) = self.writer_thread.take() {
let _ = writer_thread.join();
}
// The reader thread should join before the writer thread on Windows.
if let Some(reader_thread) = self.reader_thread.take() {
let _ = reader_thread.join();
}
// Ensure child process is properly handled when session is dropped
// The read can read the last "\r\n" after the writer thread (not the child process) exits
// on Linux in my tests.
// But we still send "\r\n" to the writer thread and let the reader thread exit first for safety.
if let Some(writer_thread) = self.writer_thread.take() {
let _ = writer_thread.join();
}
if let Some(mut child) = self.child.take() {
// Kill the process
let _ = child.kill();
add_to_reaper(child);
}
}
}
impl Drop for TerminalSession {
fn drop(&mut self) {
// Ensure child process is properly handled when session is dropped
self.stop();
}
}
/// Persistent terminal service that can survive connection drops
pub struct PersistentTerminalService {
service_id: String,
@@ -669,6 +698,17 @@ impl TerminalServiceProxy {
let terminal_id = open.terminal_id;
let writer_thread = thread::spawn(move || {
let mut writer = writer;
// Write initial carriage return:
// 1. Windows requires at least one carriage return for `drop()` to work properly.
// Without this, the reader may fail to read the buffer after `input_tx.send(b"\r\n".to_vec()).ok();`.
// 2. This also refreshes the terminal interface on the controlling side (workaround for blank content on connect).
if let Err(e) = writer.write_all(b"\r") {
log::error!("Terminal {} initial write error: {}", terminal_id, e);
} else {
if let Err(e) = writer.flush() {
log::error!("Terminal {} initial flush error: {}", terminal_id, e);
}
}
while let Ok(data) = input_rx.recv() {
if let Err(e) = writer.write_all(&data) {
log::error!("Terminal {} write error: {}", terminal_id, e);
@@ -681,6 +721,7 @@ impl TerminalServiceProxy {
log::debug!("Terminal {} writer thread exiting", terminal_id);
});
let exiting = session.exiting.clone();
// Spawn reader thread
let terminal_id = open.terminal_id;
let reader_thread = thread::spawn(move || {
@@ -690,9 +731,14 @@ impl TerminalServiceProxy {
match reader.read(&mut buf) {
Ok(0) => {
// EOF
// This branch can be reached when the child process exits on macOS.
// But not on Linux and Windows in my tests.
break;
}
Ok(n) => {
if exiting.load(Ordering::SeqCst) {
break;
}
let data = buf[..n].to_vec();
// Try to send, if channel is full, drop the data
match output_tx.try_send(data) {
@@ -710,6 +756,10 @@ impl TerminalServiceProxy {
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// This branch is not reached in my tests, but we still add `exiting` check to ensure we can exit.
if exiting.load(Ordering::SeqCst) {
break;
}
// For non-blocking I/O, sleep briefly
thread::sleep(Duration::from_millis(10));
}