From 989bf80fe83e1a8f7f18e8da037109d1adfe0385 Mon Sep 17 00:00:00 2001 From: 21pages Date: Fri, 26 Jun 2026 15:07:27 +0800 Subject: [PATCH] Support controller user attribution in audit logs (#15407) * Support controller user attribution in audit logs This PR supports associating audit logs with the controller user. ## Implementation: - Add `ControlledContext { conn_audit_token }` to `PunchHole`, `RequestRelay`, and `FetchLocalAddr`. - The server sends a controller-user identity snapshot to the controlled client through rendezvous messages. - The controlled client sends the token back to the server when posting the `on_open` conn audit or IP whitelist alarm audit. - This lets the server attach the controller user to audit logs. ## How the controlled client helps identify the controller user: - Conn audit: sends the token to the server in `on_open`; the server creates the audit log and caches the user snapshot. - File audit: sends `id` and `conn_id`; the server uses them to find the cached user snapshot. - Alarm audit: IP whitelist sends the token directly; other alarm logs send `id` and `conn_id`, and the server uses them to find the cached user snapshot. ## Compatibility: - Supported only for logs created with a new server and a new controlled client. - Does not require upgrading the controller client. ## Test - [x] New/old clients connected to new/old servers, and conn/file/alarm audit logs worked normally. - [x] New client connected to new server generated searchable conn/file/alarm audit logs. - [x] Punch hole, local addr, and relay paths worked with audit logs and control role on new/old servers. - [x] Direct IP connections produced audit logs, but do not support user audit. Signed-off-by: 21pages * rename conn_audit_token to conn_audit_ref Signed-off-by: 21pages --------- Signed-off-by: 21pages --- libs/hbb_common | 2 +- src/rendezvous_mediator.rs | 103 ++++++++++++++++--------------------- src/server.rs | 33 ++++++------ src/server/connection.rs | 57 +++++++++++++++----- 4 files changed, 104 insertions(+), 91 deletions(-) diff --git a/libs/hbb_common b/libs/hbb_common index 387603f47..a920d0094 160000 --- a/libs/hbb_common +++ b/libs/hbb_common @@ -1 +1 @@ -Subproject commit 387603f47cbb15c0d3dc3d67ae3396d3eb707daf +Subproject commit a920d00945e1d2441b3f77b2677054cb8c3d9dd2 diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 98e75eaf3..4f5c8fee8 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -28,11 +28,21 @@ use hbb_common::{ use crate::{ check_port, - server::{check_zombie, new as new_server, ServerPtr}, + server::{check_zombie, new as new_server, ConnectionMeta, ServerPtr}, }; type Message = RendezvousMessage; +fn connection_meta( + control_permissions: Option, + controlled_context: Option, +) -> ConnectionMeta { + ConnectionMeta { + control_permissions, + controlled_context, + } +} + lazy_static::lazy_static! { static ref SOLVING_PK_MISMATCH: Mutex = Default::default(); static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); @@ -490,6 +500,10 @@ impl RendezvousMediator { if last.0 == addr && last.1.elapsed().as_millis() < 100 { return Ok(()); } + let meta = connection_meta( + rr.control_permissions.into_option(), + rr.controlled_context.into_option(), + ); self.create_relay( rr.socket_addr.into(), @@ -499,7 +513,7 @@ impl RendezvousMediator { rr.secure, false, Default::default(), - rr.control_permissions.clone().into_option(), + meta, ) .await } @@ -513,7 +527,7 @@ impl RendezvousMediator { secure: bool, initiate: bool, socket_addr_v6: bytes::Bytes, - control_permissions: Option, + meta: ConnectionMeta, ) -> ResultType<()> { let peer_addr = AddrMangle::decode(&socket_addr); log::info!( @@ -547,7 +561,7 @@ impl RendezvousMediator { peer_addr, secure, is_ipv4(&self.addr), - control_permissions, + meta, ) .await; Ok(()) @@ -565,14 +579,12 @@ impl RendezvousMediator { let relay_server = self.get_relay_server(fla.relay_server.clone()); let relay = use_ws() || Config::is_proxy(); let mut socket_addr_v6 = Default::default(); + let meta = connection_meta( + fla.control_permissions.clone().into_option(), + fla.controlled_context.clone().into_option(), + ); if peer_addr_v6.port() > 0 && !relay { - socket_addr_v6 = start_ipv6( - peer_addr_v6, - addr, - server.clone(), - fla.control_permissions.clone().into_option(), - ) - .await; + socket_addr_v6 = start_ipv6(peer_addr_v6, addr, server.clone(), meta.clone()).await; } if is_ipv4(&self.addr) && !relay && !config::is_disable_tcp_listen() { if let Err(err) = self @@ -581,6 +593,7 @@ impl RendezvousMediator { server.clone(), relay_server.clone(), socket_addr_v6.clone(), + meta.clone(), ) .await { @@ -598,7 +611,7 @@ impl RendezvousMediator { true, true, socket_addr_v6, - fla.control_permissions.into_option(), + meta, ) .await } @@ -609,6 +622,7 @@ impl RendezvousMediator { server: ServerPtr, relay_server: String, socket_addr_v6: bytes::Bytes, + meta: ConnectionMeta, ) -> ResultType<()> { let peer_addr = AddrMangle::decode(&fla.socket_addr); log::debug!("Handle intranet from {:?}", peer_addr); @@ -629,14 +643,7 @@ impl RendezvousMediator { }); let bytes = msg_out.write_to_bytes()?; socket.send_raw(bytes).await?; - crate::accept_connection( - server.clone(), - socket, - peer_addr, - true, - fla.control_permissions.into_option(), - ) - .await; + crate::accept_connection(server.clone(), socket, peer_addr, true, meta).await; Ok(()) } @@ -651,15 +658,13 @@ impl RendezvousMediator { let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6); let relay = use_ws() || Config::is_proxy() || ph.force_relay; let mut socket_addr_v6 = Default::default(); - let control_permissions = ph.control_permissions.into_option(); + let meta = connection_meta( + ph.control_permissions.into_option(), + ph.controlled_context.into_option(), + ); if peer_addr_v6.port() > 0 && !relay { - socket_addr_v6 = start_ipv6( - peer_addr_v6, - peer_addr, - server.clone(), - control_permissions.clone(), - ) - .await; + socket_addr_v6 = + start_ipv6(peer_addr_v6, peer_addr, server.clone(), meta.clone()).await; } let relay_server = self.get_relay_server(ph.relay_server); // for ensure, websocket go relay directly @@ -678,7 +683,7 @@ impl RendezvousMediator { true, true, socket_addr_v6.clone(), - control_permissions, + meta, ) .await; } @@ -695,7 +700,7 @@ impl RendezvousMediator { }; if ph.udp_port > 0 { peer_addr.set_port(ph.udp_port as u16); - self.punch_udp_hole(peer_addr, server, msg_punch, control_permissions) + self.punch_udp_hole(peer_addr, server, msg_punch, meta) .await?; return Ok(()); } @@ -712,8 +717,7 @@ impl RendezvousMediator { msg_out.set_punch_hole_sent(msg_punch); let bytes = msg_out.write_to_bytes()?; socket.send_raw(bytes).await?; - crate::accept_connection(server.clone(), socket, peer_addr, true, control_permissions) - .await; + crate::accept_connection(server.clone(), socket, peer_addr, true, meta).await; Ok(()) } @@ -722,7 +726,7 @@ impl RendezvousMediator { peer_addr: SocketAddr, server: ServerPtr, msg_punch: PunchHoleSent, - control_permissions: Option, + meta: ConnectionMeta, ) -> ResultType<()> { let mut msg_out = Message::new(); msg_out.set_punch_hole_sent(msg_punch); @@ -737,14 +741,7 @@ impl RendezvousMediator { socket.send_to(&data, addr).await.ok(); } }); - udp_nat_listen( - socket_cloned.clone(), - peer_addr, - peer_addr, - server, - control_permissions, - ) - .await?; + udp_nat_listen(socket_cloned.clone(), peer_addr, peer_addr, server, meta).await?; Ok(()) } @@ -901,7 +898,7 @@ async fn direct_server(server: ServerPtr) { hbb_common::Stream::from(stream, local_addr), addr, false, - None, // Direct connections don't have control_permissions + ConnectionMeta::default(), // Direct connections don't have server-side user context. ) .await ); @@ -933,21 +930,14 @@ async fn start_ipv6( peer_addr_v6: SocketAddr, peer_addr_v4: SocketAddr, server: ServerPtr, - control_permissions: Option, + meta: ConnectionMeta, ) -> bytes::Bytes { crate::test_ipv6().await; if let Some((socket, local_addr_v6)) = crate::get_ipv6_socket().await { let server = server.clone(); tokio::spawn(async move { allow_err!( - udp_nat_listen( - socket.clone(), - peer_addr_v6, - peer_addr_v4, - server, - control_permissions - ) - .await + udp_nat_listen(socket.clone(), peer_addr_v6, peer_addr_v4, server, meta).await ); }); return local_addr_v6; @@ -960,7 +950,7 @@ async fn udp_nat_listen( peer_addr: SocketAddr, peer_addr_v4: SocketAddr, server: ServerPtr, - control_permissions: Option, + meta: ConnectionMeta, ) -> ResultType<()> { let tm = Instant::now(); let socket_cloned = socket.clone(); @@ -973,14 +963,7 @@ async fn udp_nat_listen( res, ) .await?; - crate::server::create_tcp_connection( - server, - stream.1, - peer_addr_v4, - true, - control_permissions, - ) - .await?; + crate::server::create_tcp_connection(server, stream.1, peer_addr_v4, true, meta).await?; Ok(()) }; func.await.map_err(|e: anyhow::Error| { diff --git a/src/server.rs b/src/server.rs index 86f7b5396..89a17a919 100644 --- a/src/server.rs +++ b/src/server.rs @@ -81,6 +81,12 @@ pub mod printer_service; pub type Childs = Arc>>; type ConnMap = HashMap; +#[derive(Clone, Default)] +pub struct ConnectionMeta { + pub control_permissions: Option, + pub controlled_context: Option, +} + #[cfg(any(target_os = "macos", target_os = "linux"))] const CONFIG_SYNC_INTERVAL_SECS: f32 = 0.3; #[cfg(any(target_os = "macos", target_os = "linux"))] @@ -163,7 +169,7 @@ async fn accept_connection_( server: ServerPtr, socket: Stream, secure: bool, - control_permissions: Option, + meta: ConnectionMeta, ) -> ResultType<()> { let local_addr = socket.local_addr(); drop(socket); @@ -180,7 +186,7 @@ async fn accept_connection_( Stream::from(stream, stream_addr), addr, secure, - control_permissions, + meta, ) .await?; } @@ -192,7 +198,7 @@ pub async fn create_tcp_connection( stream: Stream, addr: SocketAddr, secure: bool, - control_permissions: Option, + meta: ConnectionMeta, ) -> ResultType<()> { let mut stream = stream; let id = server.write().unwrap().get_new_id(); @@ -260,14 +266,7 @@ pub async fn create_tcp_connection( } log::info!("wake up macos"); } - Connection::start( - addr, - stream, - id, - Arc::downgrade(&server), - control_permissions, - ) - .await; + Connection::start(addr, stream, id, Arc::downgrade(&server), meta).await; Ok(()) } @@ -276,9 +275,9 @@ pub async fn accept_connection( socket: Stream, peer_addr: SocketAddr, secure: bool, - control_permissions: Option, + meta: ConnectionMeta, ) { - if let Err(err) = accept_connection_(server, socket, secure, control_permissions).await { + if let Err(err) = accept_connection_(server, socket, secure, meta).await { log::warn!("Failed to accept connection from {}: {}", peer_addr, err); } } @@ -290,7 +289,7 @@ pub async fn create_relay_connection( peer_addr: SocketAddr, secure: bool, ipv4: bool, - control_permissions: Option, + meta: ConnectionMeta, ) { if let Err(err) = create_relay_connection_( server, @@ -299,7 +298,7 @@ pub async fn create_relay_connection( peer_addr, secure, ipv4, - control_permissions, + meta, ) .await { @@ -319,7 +318,7 @@ async fn create_relay_connection_( peer_addr: SocketAddr, secure: bool, ipv4: bool, - control_permissions: Option, + meta: ConnectionMeta, ) -> ResultType<()> { let mut stream = socket_client::connect_tcp( socket_client::ipv4_to_ipv6(crate::check_port(relay_server, RELAY_PORT), ipv4), @@ -334,7 +333,7 @@ async fn create_relay_connection_( ..Default::default() }); stream.send(&msg_out).await?; - create_tcp_connection(server, stream, peer_addr, secure, control_permissions).await?; + create_tcp_connection(server, stream, peer_addr, secure, meta).await?; Ok(()) } diff --git a/src/server/connection.rs b/src/server/connection.rs index 5e0825bc4..fb55a16aa 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -310,6 +310,7 @@ pub struct Connection { video_ack_required: bool, server_audit_conn: String, server_audit_file: String, + controlled_context: Option, lr: LoginRequest, peer_argb: u32, session_last_recv_time: Option>>, @@ -407,8 +408,12 @@ impl Connection { stream: super::Stream, id: i32, server: super::ServerPtrWeak, - control_permissions: Option, + meta: super::ConnectionMeta, ) { + let super::ConnectionMeta { + control_permissions, + controlled_context, + } = meta; // Android is not supported yet, so we always set control_permissions to None. #[cfg(target_os = "android")] let control_permissions = None; @@ -495,6 +500,7 @@ impl Connection { video_ack_required: false, server_audit_conn: "".to_owned(), server_audit_file: "".to_owned(), + controlled_context, lr: Default::default(), peer_argb: 0u32, session_last_recv_time: None, @@ -1308,7 +1314,7 @@ impl Connection { { self.send_login_error("Your ip is blocked by the peer") .await; - Self::post_alarm_audit( + self.post_alarm_audit( AlarmAuditType::IpWhitelist, //"ip whitelist", json!({ "ip":addr.ip() }), ); @@ -1334,10 +1340,14 @@ impl Connection { msg_out.set_hash(self.hash.clone()); self.send(msg_out).await; self.get_api_server(); - self.post_conn_audit(json!({ + let mut audit = json!({ "ip": addr.ip(), "action": "new", - })); + }); + if let Some(audit_ref) = self.conn_audit_ref() { + audit["conn_audit_ref"] = json!(audit_ref); + } + self.post_conn_audit(audit); true } @@ -1354,6 +1364,18 @@ impl Connection { ); } + fn conn_audit_ref(&self) -> Option<&str> { + let audit_ref = self + .controlled_context + .as_ref() + .map(|c| c.conn_audit_ref.as_str())?; + if audit_ref.is_empty() { + None + } else { + Some(audit_ref) + } + } + fn post_conn_audit(&self, v: Value) { if self.server_audit_conn.is_empty() { return; @@ -1408,6 +1430,7 @@ impl Connection { "id":json!(Config::get_id()), "uuid":json!(crate::encode64(hbb_common::get_uuid())), "peer_id":json!(self.lr.my_id), + "conn_id":json!(self.inner.id()), "type": r#type as i8, "path":path, "is_file":is_file, @@ -1418,7 +1441,7 @@ impl Connection { }); } - pub fn post_alarm_audit(typ: AlarmAuditType, info: Value) { + fn post_alarm_audit(&self, typ: AlarmAuditType, info: Value) { let url = crate::get_audit_server( Config::get_option("api-server"), Config::get_option("custom-rendezvous-server"), @@ -1432,6 +1455,12 @@ impl Connection { v["uuid"] = json!(crate::encode64(hbb_common::get_uuid())); v["typ"] = json!(typ as i8); v["info"] = serde_json::Value::String(info.to_string()); + v["conn_id"] = json!(self.inner.id()); + if typ == AlarmAuditType::IpWhitelist { + if let Some(audit_ref) = self.conn_audit_ref() { + v["conn_audit_ref"] = json!(audit_ref); + } + } tokio::spawn(async move { allow_err!(Self::post_audit_async(url, v).await); }); @@ -1565,9 +1594,10 @@ impl Connection { .unwrap() .get(&self.session_key()) .map(|s| s.last_recv_time.clone()); - self.post_conn_audit( - json!({"peer": ((&self.lr.my_id, &self.lr.my_name)), "type": conn_type}), - ); + self.post_conn_audit(json!({ + "peer": ((&self.lr.my_id, &self.lr.my_name)), + "type": conn_type, + })); #[allow(unused_mut)] let mut username = crate::platform::get_active_username(); let mut res = LoginResponse::new(); @@ -3668,7 +3698,7 @@ impl Connection { ); self.send_login_error("Please try 1 minute later").await; sleep(1.).await; - Self::post_alarm_audit( + self.post_alarm_audit( AlarmAuditType::TerminalOsLoginConcurrency, json!({ "ip": self.ip, @@ -3856,7 +3886,7 @@ impl Connection { prefix_num )) .await; - Self::post_alarm_audit( + self.post_alarm_audit( AlarmAuditType::ExceedIPv6PrefixAttempts, json!({ "ip": self.ip, @@ -3901,7 +3931,7 @@ impl Connection { if let Some(audit) = decision.audit { // For OS blocked/backoff events, we currently emit one alarm report per blocked attempt. // TODO: Add unified cumulative/aggregation fields across alarm producers. - Self::post_alarm_audit( + self.post_alarm_audit( audit, json!({ "ip": self.ip, @@ -3938,7 +3968,7 @@ impl Connection { let res = if failure.2 > 30 { self.send_login_error("Too many wrong attempts").await; - Self::post_alarm_audit( + self.post_alarm_audit( AlarmAuditType::ExceedThirtyAttempts, json!({ "ip": self.ip, @@ -3949,7 +3979,7 @@ impl Connection { false } else if time == failure.0 && failure.1 > 6 { self.send_login_error("Please try 1 minute later").await; - Self::post_alarm_audit( + self.post_alarm_audit( AlarmAuditType::SixAttemptsWithinOneMinute, json!({ "ip": self.ip, @@ -5490,6 +5520,7 @@ fn try_activate_screen() { }); } +#[derive(Clone, Copy, PartialEq, Eq)] pub enum AlarmAuditType { IpWhitelist = 0, ExceedThirtyAttempts = 1,