Support controller user attribution in audit logs (#15407)

* Support controller user attribution in audit logs

This PR supports associating audit logs with the controller user.

  ## Implementation:
  - Add `ControlledContext { conn_audit_token }` to `PunchHole`, `RequestRelay`, and `FetchLocalAddr`.
  - The server sends a controller-user identity snapshot to the controlled client through rendezvous messages.
  - The controlled client sends the token back to the server when posting the `on_open` conn audit or IP whitelist alarm audit.
  - This lets the server attach the controller user to audit logs.

  ## How the controlled client helps identify the controller user:
  - Conn audit: sends the token to the server in `on_open`; the server creates the audit log and caches the user snapshot.
  - File audit: sends `id` and `conn_id`; the server uses them to find the cached user snapshot.
  - Alarm audit: IP whitelist sends the token directly; other alarm logs send `id` and `conn_id`, and the server uses them to find the cached user
  snapshot.

  ## Compatibility:
  - Supported only for logs created with a new server and a new controlled client.
  - Does not require upgrading the controller client.

  ## Test

  - [x] New/old clients connected to new/old servers, and conn/file/alarm audit logs worked normally.
  - [x] New client connected to new server generated searchable conn/file/alarm audit logs.
  - [x] Punch hole, local addr, and relay paths worked with audit logs and control role on new/old servers.
  - [x] Direct IP connections produced audit logs, but do not support user audit.

Signed-off-by: 21pages <sunboeasy@gmail.com>

* rename conn_audit_token to conn_audit_ref

Signed-off-by: 21pages <sunboeasy@gmail.com>

---------

Signed-off-by: 21pages <sunboeasy@gmail.com>
This commit is contained in:
21pages
2026-06-26 15:07:27 +08:00
committed by GitHub
parent 78b5f47668
commit 989bf80fe8
4 changed files with 104 additions and 91 deletions

View File

@@ -28,11 +28,21 @@ use hbb_common::{
use crate::{
check_port,
server::{check_zombie, new as new_server, ServerPtr},
server::{check_zombie, new as new_server, ConnectionMeta, ServerPtr},
};
type Message = RendezvousMessage;
fn connection_meta(
control_permissions: Option<ControlPermissions>,
controlled_context: Option<ControlledContext>,
) -> ConnectionMeta {
ConnectionMeta {
control_permissions,
controlled_context,
}
}
lazy_static::lazy_static! {
static ref SOLVING_PK_MISMATCH: Mutex<String> = Default::default();
static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now()));
@@ -490,6 +500,10 @@ impl RendezvousMediator {
if last.0 == addr && last.1.elapsed().as_millis() < 100 {
return Ok(());
}
let meta = connection_meta(
rr.control_permissions.into_option(),
rr.controlled_context.into_option(),
);
self.create_relay(
rr.socket_addr.into(),
@@ -499,7 +513,7 @@ impl RendezvousMediator {
rr.secure,
false,
Default::default(),
rr.control_permissions.clone().into_option(),
meta,
)
.await
}
@@ -513,7 +527,7 @@ impl RendezvousMediator {
secure: bool,
initiate: bool,
socket_addr_v6: bytes::Bytes,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) -> ResultType<()> {
let peer_addr = AddrMangle::decode(&socket_addr);
log::info!(
@@ -547,7 +561,7 @@ impl RendezvousMediator {
peer_addr,
secure,
is_ipv4(&self.addr),
control_permissions,
meta,
)
.await;
Ok(())
@@ -565,14 +579,12 @@ impl RendezvousMediator {
let relay_server = self.get_relay_server(fla.relay_server.clone());
let relay = use_ws() || Config::is_proxy();
let mut socket_addr_v6 = Default::default();
let meta = connection_meta(
fla.control_permissions.clone().into_option(),
fla.controlled_context.clone().into_option(),
);
if peer_addr_v6.port() > 0 && !relay {
socket_addr_v6 = start_ipv6(
peer_addr_v6,
addr,
server.clone(),
fla.control_permissions.clone().into_option(),
)
.await;
socket_addr_v6 = start_ipv6(peer_addr_v6, addr, server.clone(), meta.clone()).await;
}
if is_ipv4(&self.addr) && !relay && !config::is_disable_tcp_listen() {
if let Err(err) = self
@@ -581,6 +593,7 @@ impl RendezvousMediator {
server.clone(),
relay_server.clone(),
socket_addr_v6.clone(),
meta.clone(),
)
.await
{
@@ -598,7 +611,7 @@ impl RendezvousMediator {
true,
true,
socket_addr_v6,
fla.control_permissions.into_option(),
meta,
)
.await
}
@@ -609,6 +622,7 @@ impl RendezvousMediator {
server: ServerPtr,
relay_server: String,
socket_addr_v6: bytes::Bytes,
meta: ConnectionMeta,
) -> ResultType<()> {
let peer_addr = AddrMangle::decode(&fla.socket_addr);
log::debug!("Handle intranet from {:?}", peer_addr);
@@ -629,14 +643,7 @@ impl RendezvousMediator {
});
let bytes = msg_out.write_to_bytes()?;
socket.send_raw(bytes).await?;
crate::accept_connection(
server.clone(),
socket,
peer_addr,
true,
fla.control_permissions.into_option(),
)
.await;
crate::accept_connection(server.clone(), socket, peer_addr, true, meta).await;
Ok(())
}
@@ -651,15 +658,13 @@ impl RendezvousMediator {
let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6);
let relay = use_ws() || Config::is_proxy() || ph.force_relay;
let mut socket_addr_v6 = Default::default();
let control_permissions = ph.control_permissions.into_option();
let meta = connection_meta(
ph.control_permissions.into_option(),
ph.controlled_context.into_option(),
);
if peer_addr_v6.port() > 0 && !relay {
socket_addr_v6 = start_ipv6(
peer_addr_v6,
peer_addr,
server.clone(),
control_permissions.clone(),
)
.await;
socket_addr_v6 =
start_ipv6(peer_addr_v6, peer_addr, server.clone(), meta.clone()).await;
}
let relay_server = self.get_relay_server(ph.relay_server);
// for ensure, websocket go relay directly
@@ -678,7 +683,7 @@ impl RendezvousMediator {
true,
true,
socket_addr_v6.clone(),
control_permissions,
meta,
)
.await;
}
@@ -695,7 +700,7 @@ impl RendezvousMediator {
};
if ph.udp_port > 0 {
peer_addr.set_port(ph.udp_port as u16);
self.punch_udp_hole(peer_addr, server, msg_punch, control_permissions)
self.punch_udp_hole(peer_addr, server, msg_punch, meta)
.await?;
return Ok(());
}
@@ -712,8 +717,7 @@ impl RendezvousMediator {
msg_out.set_punch_hole_sent(msg_punch);
let bytes = msg_out.write_to_bytes()?;
socket.send_raw(bytes).await?;
crate::accept_connection(server.clone(), socket, peer_addr, true, control_permissions)
.await;
crate::accept_connection(server.clone(), socket, peer_addr, true, meta).await;
Ok(())
}
@@ -722,7 +726,7 @@ impl RendezvousMediator {
peer_addr: SocketAddr,
server: ServerPtr,
msg_punch: PunchHoleSent,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) -> ResultType<()> {
let mut msg_out = Message::new();
msg_out.set_punch_hole_sent(msg_punch);
@@ -737,14 +741,7 @@ impl RendezvousMediator {
socket.send_to(&data, addr).await.ok();
}
});
udp_nat_listen(
socket_cloned.clone(),
peer_addr,
peer_addr,
server,
control_permissions,
)
.await?;
udp_nat_listen(socket_cloned.clone(), peer_addr, peer_addr, server, meta).await?;
Ok(())
}
@@ -901,7 +898,7 @@ async fn direct_server(server: ServerPtr) {
hbb_common::Stream::from(stream, local_addr),
addr,
false,
None, // Direct connections don't have control_permissions
ConnectionMeta::default(), // Direct connections don't have server-side user context.
)
.await
);
@@ -933,21 +930,14 @@ async fn start_ipv6(
peer_addr_v6: SocketAddr,
peer_addr_v4: SocketAddr,
server: ServerPtr,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) -> bytes::Bytes {
crate::test_ipv6().await;
if let Some((socket, local_addr_v6)) = crate::get_ipv6_socket().await {
let server = server.clone();
tokio::spawn(async move {
allow_err!(
udp_nat_listen(
socket.clone(),
peer_addr_v6,
peer_addr_v4,
server,
control_permissions
)
.await
udp_nat_listen(socket.clone(), peer_addr_v6, peer_addr_v4, server, meta).await
);
});
return local_addr_v6;
@@ -960,7 +950,7 @@ async fn udp_nat_listen(
peer_addr: SocketAddr,
peer_addr_v4: SocketAddr,
server: ServerPtr,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) -> ResultType<()> {
let tm = Instant::now();
let socket_cloned = socket.clone();
@@ -973,14 +963,7 @@ async fn udp_nat_listen(
res,
)
.await?;
crate::server::create_tcp_connection(
server,
stream.1,
peer_addr_v4,
true,
control_permissions,
)
.await?;
crate::server::create_tcp_connection(server, stream.1, peer_addr_v4, true, meta).await?;
Ok(())
};
func.await.map_err(|e: anyhow::Error| {

View File

@@ -81,6 +81,12 @@ pub mod printer_service;
pub type Childs = Arc<Mutex<Vec<std::process::Child>>>;
type ConnMap = HashMap<i32, ConnInner>;
#[derive(Clone, Default)]
pub struct ConnectionMeta {
pub control_permissions: Option<ControlPermissions>,
pub controlled_context: Option<ControlledContext>,
}
#[cfg(any(target_os = "macos", target_os = "linux"))]
const CONFIG_SYNC_INTERVAL_SECS: f32 = 0.3;
#[cfg(any(target_os = "macos", target_os = "linux"))]
@@ -163,7 +169,7 @@ async fn accept_connection_(
server: ServerPtr,
socket: Stream,
secure: bool,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) -> ResultType<()> {
let local_addr = socket.local_addr();
drop(socket);
@@ -180,7 +186,7 @@ async fn accept_connection_(
Stream::from(stream, stream_addr),
addr,
secure,
control_permissions,
meta,
)
.await?;
}
@@ -192,7 +198,7 @@ pub async fn create_tcp_connection(
stream: Stream,
addr: SocketAddr,
secure: bool,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) -> ResultType<()> {
let mut stream = stream;
let id = server.write().unwrap().get_new_id();
@@ -260,14 +266,7 @@ pub async fn create_tcp_connection(
}
log::info!("wake up macos");
}
Connection::start(
addr,
stream,
id,
Arc::downgrade(&server),
control_permissions,
)
.await;
Connection::start(addr, stream, id, Arc::downgrade(&server), meta).await;
Ok(())
}
@@ -276,9 +275,9 @@ pub async fn accept_connection(
socket: Stream,
peer_addr: SocketAddr,
secure: bool,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) {
if let Err(err) = accept_connection_(server, socket, secure, control_permissions).await {
if let Err(err) = accept_connection_(server, socket, secure, meta).await {
log::warn!("Failed to accept connection from {}: {}", peer_addr, err);
}
}
@@ -290,7 +289,7 @@ pub async fn create_relay_connection(
peer_addr: SocketAddr,
secure: bool,
ipv4: bool,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) {
if let Err(err) = create_relay_connection_(
server,
@@ -299,7 +298,7 @@ pub async fn create_relay_connection(
peer_addr,
secure,
ipv4,
control_permissions,
meta,
)
.await
{
@@ -319,7 +318,7 @@ async fn create_relay_connection_(
peer_addr: SocketAddr,
secure: bool,
ipv4: bool,
control_permissions: Option<ControlPermissions>,
meta: ConnectionMeta,
) -> ResultType<()> {
let mut stream = socket_client::connect_tcp(
socket_client::ipv4_to_ipv6(crate::check_port(relay_server, RELAY_PORT), ipv4),
@@ -334,7 +333,7 @@ async fn create_relay_connection_(
..Default::default()
});
stream.send(&msg_out).await?;
create_tcp_connection(server, stream, peer_addr, secure, control_permissions).await?;
create_tcp_connection(server, stream, peer_addr, secure, meta).await?;
Ok(())
}

View File

@@ -310,6 +310,7 @@ pub struct Connection {
video_ack_required: bool,
server_audit_conn: String,
server_audit_file: String,
controlled_context: Option<ControlledContext>,
lr: LoginRequest,
peer_argb: u32,
session_last_recv_time: Option<Arc<Mutex<Instant>>>,
@@ -407,8 +408,12 @@ impl Connection {
stream: super::Stream,
id: i32,
server: super::ServerPtrWeak,
control_permissions: Option<ControlPermissions>,
meta: super::ConnectionMeta,
) {
let super::ConnectionMeta {
control_permissions,
controlled_context,
} = meta;
// Android is not supported yet, so we always set control_permissions to None.
#[cfg(target_os = "android")]
let control_permissions = None;
@@ -495,6 +500,7 @@ impl Connection {
video_ack_required: false,
server_audit_conn: "".to_owned(),
server_audit_file: "".to_owned(),
controlled_context,
lr: Default::default(),
peer_argb: 0u32,
session_last_recv_time: None,
@@ -1308,7 +1314,7 @@ impl Connection {
{
self.send_login_error("Your ip is blocked by the peer")
.await;
Self::post_alarm_audit(
self.post_alarm_audit(
AlarmAuditType::IpWhitelist, //"ip whitelist",
json!({ "ip":addr.ip() }),
);
@@ -1334,10 +1340,14 @@ impl Connection {
msg_out.set_hash(self.hash.clone());
self.send(msg_out).await;
self.get_api_server();
self.post_conn_audit(json!({
let mut audit = json!({
"ip": addr.ip(),
"action": "new",
}));
});
if let Some(audit_ref) = self.conn_audit_ref() {
audit["conn_audit_ref"] = json!(audit_ref);
}
self.post_conn_audit(audit);
true
}
@@ -1354,6 +1364,18 @@ impl Connection {
);
}
fn conn_audit_ref(&self) -> Option<&str> {
let audit_ref = self
.controlled_context
.as_ref()
.map(|c| c.conn_audit_ref.as_str())?;
if audit_ref.is_empty() {
None
} else {
Some(audit_ref)
}
}
fn post_conn_audit(&self, v: Value) {
if self.server_audit_conn.is_empty() {
return;
@@ -1408,6 +1430,7 @@ impl Connection {
"id":json!(Config::get_id()),
"uuid":json!(crate::encode64(hbb_common::get_uuid())),
"peer_id":json!(self.lr.my_id),
"conn_id":json!(self.inner.id()),
"type": r#type as i8,
"path":path,
"is_file":is_file,
@@ -1418,7 +1441,7 @@ impl Connection {
});
}
pub fn post_alarm_audit(typ: AlarmAuditType, info: Value) {
fn post_alarm_audit(&self, typ: AlarmAuditType, info: Value) {
let url = crate::get_audit_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
@@ -1432,6 +1455,12 @@ impl Connection {
v["uuid"] = json!(crate::encode64(hbb_common::get_uuid()));
v["typ"] = json!(typ as i8);
v["info"] = serde_json::Value::String(info.to_string());
v["conn_id"] = json!(self.inner.id());
if typ == AlarmAuditType::IpWhitelist {
if let Some(audit_ref) = self.conn_audit_ref() {
v["conn_audit_ref"] = json!(audit_ref);
}
}
tokio::spawn(async move {
allow_err!(Self::post_audit_async(url, v).await);
});
@@ -1565,9 +1594,10 @@ impl Connection {
.unwrap()
.get(&self.session_key())
.map(|s| s.last_recv_time.clone());
self.post_conn_audit(
json!({"peer": ((&self.lr.my_id, &self.lr.my_name)), "type": conn_type}),
);
self.post_conn_audit(json!({
"peer": ((&self.lr.my_id, &self.lr.my_name)),
"type": conn_type,
}));
#[allow(unused_mut)]
let mut username = crate::platform::get_active_username();
let mut res = LoginResponse::new();
@@ -3668,7 +3698,7 @@ impl Connection {
);
self.send_login_error("Please try 1 minute later").await;
sleep(1.).await;
Self::post_alarm_audit(
self.post_alarm_audit(
AlarmAuditType::TerminalOsLoginConcurrency,
json!({
"ip": self.ip,
@@ -3856,7 +3886,7 @@ impl Connection {
prefix_num
))
.await;
Self::post_alarm_audit(
self.post_alarm_audit(
AlarmAuditType::ExceedIPv6PrefixAttempts,
json!({
"ip": self.ip,
@@ -3901,7 +3931,7 @@ impl Connection {
if let Some(audit) = decision.audit {
// For OS blocked/backoff events, we currently emit one alarm report per blocked attempt.
// TODO: Add unified cumulative/aggregation fields across alarm producers.
Self::post_alarm_audit(
self.post_alarm_audit(
audit,
json!({
"ip": self.ip,
@@ -3938,7 +3968,7 @@ impl Connection {
let res = if failure.2 > 30 {
self.send_login_error("Too many wrong attempts").await;
Self::post_alarm_audit(
self.post_alarm_audit(
AlarmAuditType::ExceedThirtyAttempts,
json!({
"ip": self.ip,
@@ -3949,7 +3979,7 @@ impl Connection {
false
} else if time == failure.0 && failure.1 > 6 {
self.send_login_error("Please try 1 minute later").await;
Self::post_alarm_audit(
self.post_alarm_audit(
AlarmAuditType::SixAttemptsWithinOneMinute,
json!({
"ip": self.ip,
@@ -5490,6 +5520,7 @@ fn try_activate_screen() {
});
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum AlarmAuditType {
IpWhitelist = 0,
ExceedThirtyAttempts = 1,