xpc: refactor, less chatty logging
This commit is contained in:
@@ -6,7 +6,6 @@ use std::ffi::CString;
|
|||||||
use std::os::raw::c_char;
|
use std::os::raw::c_char;
|
||||||
use std::ptr;
|
use std::ptr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread;
|
|
||||||
use tokio::sync::{mpsc, oneshot, Mutex};
|
use tokio::sync::{mpsc, oneshot, Mutex};
|
||||||
use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
|
use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
|
||||||
use xpc_connection_sys as xpc_sys;
|
use xpc_connection_sys as xpc_sys;
|
||||||
@@ -20,6 +19,8 @@ struct XpcConn(pub xpc_sys::xpc_connection_t);
|
|||||||
unsafe impl Send for XpcConn {}
|
unsafe impl Send for XpcConn {}
|
||||||
unsafe impl Sync for XpcConn {}
|
unsafe impl Sync for XpcConn {}
|
||||||
|
|
||||||
|
type Subscribers = Arc<std::sync::Mutex<Vec<XpcConn>>>;
|
||||||
|
|
||||||
/// XPC IPC agent that forwards daemon events and signals over libxpc.
|
/// XPC IPC agent that forwards daemon events and signals over libxpc.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct XpcAgent {
|
pub struct XpcAgent {
|
||||||
@@ -37,9 +38,6 @@ impl XpcAgent {
|
|||||||
pub async fn run(self) {
|
pub async fn run(self) {
|
||||||
use block::ConcreteBlock;
|
use block::ConcreteBlock;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::Mutex as StdMutex;
|
|
||||||
|
|
||||||
log::info!(target: LOG_TARGET, "XPCAgent running");
|
|
||||||
|
|
||||||
// Construct the Mach service name without a trailing NUL for CString.
|
// Construct the Mach service name without a trailing NUL for CString.
|
||||||
let service_name = SERVICE_NAME.trim_end_matches('\0');
|
let service_name = SERVICE_NAME.trim_end_matches('\0');
|
||||||
@@ -67,7 +65,7 @@ impl XpcAgent {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Shared list of connected clients for signal fanout
|
// Shared list of connected clients for signal fanout
|
||||||
let connections: Arc<StdMutex<Vec<XpcConn>>> = Arc::new(StdMutex::new(Vec::new()));
|
let connections: Subscribers = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||||
// Forward daemon signals to all connected clients
|
// Forward daemon signals to all connected clients
|
||||||
{
|
{
|
||||||
let receiver_arc = self.signal_receiver.clone();
|
let receiver_arc = self.signal_receiver.clone();
|
||||||
@@ -79,13 +77,13 @@ impl XpcAgent {
|
|||||||
.take()
|
.take()
|
||||||
.expect("Signal receiver already taken");
|
.expect("Signal receiver already taken");
|
||||||
while let Some(signal) = receiver.recv().await {
|
while let Some(signal) = receiver.recv().await {
|
||||||
log::info!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
|
log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
|
||||||
let msg = signal_to_message(signal);
|
let msg = signal_to_message(signal);
|
||||||
let xobj = unsafe { message_to_xpc_object(msg) };
|
let xobj = message_to_xpc_object(msg);
|
||||||
let list = conns.lock().unwrap();
|
let list = conns.lock().unwrap();
|
||||||
log::info!(target: LOG_TARGET, "Active XPC clients: {}", list.len());
|
log::trace!(target: LOG_TARGET, "Active XPC clients: {}", list.len());
|
||||||
for c in list.iter() {
|
for c in list.iter() {
|
||||||
log::info!(target: LOG_TARGET, "Sending signal to client");
|
log::trace!(target: LOG_TARGET, "Sending signal to client");
|
||||||
unsafe { xpc_sys::xpc_connection_send_message(c.0, xobj) };
|
unsafe { xpc_sys::xpc_connection_send_message(c.0, xobj) };
|
||||||
}
|
}
|
||||||
unsafe { xpc_sys::xpc_release(xobj) };
|
unsafe { xpc_sys::xpc_release(xobj) };
|
||||||
@@ -111,7 +109,7 @@ impl XpcAgent {
|
|||||||
// Treat incoming events as connections; ignore others
|
// Treat incoming events as connections; ignore others
|
||||||
// We detect connections by trying to set a per-connection handler.
|
// We detect connections by trying to set a per-connection handler.
|
||||||
let client = event as xpc_sys::xpc_connection_t;
|
let client = event as xpc_sys::xpc_connection_t;
|
||||||
log::info!(target: LOG_TARGET, "New XPC connection accepted");
|
log::trace!(target: LOG_TARGET, "New XPC connection accepted");
|
||||||
// Do not register for signals until the client explicitly subscribes
|
// Do not register for signals until the client explicitly subscribes
|
||||||
|
|
||||||
// Per-connection handler
|
// Per-connection handler
|
||||||
@@ -119,51 +117,51 @@ impl XpcAgent {
|
|||||||
let rt_conn = rt_accept.clone();
|
let rt_conn = rt_accept.clone();
|
||||||
let conns_for_handler = conns_accept.clone();
|
let conns_for_handler = conns_accept.clone();
|
||||||
let conn_handler = ConcreteBlock::new(move |msg: xpc_sys::xpc_object_t| {
|
let conn_handler = ConcreteBlock::new(move |msg: xpc_sys::xpc_object_t| {
|
||||||
unsafe {
|
// Convert to higher-level Message for type matching
|
||||||
// Convert to higher-level Message for type matching
|
match xpc_object_to_message(msg) {
|
||||||
match xpc_object_to_message(msg) {
|
Message::Dictionary(map) => {
|
||||||
Message::Dictionary(map) => {
|
// Trace inbound method
|
||||||
// Trace inbound method
|
let method = dict_get_str(&map, "method").or_else(|| dict_get_str(&map, "type")).unwrap_or_else(|| "<unknown>".to_string());
|
||||||
let method = dict_get_str(&map, "method").or_else(|| dict_get_str(&map, "type")).unwrap_or_else(|| "<unknown>".to_string());
|
log::trace!(target: LOG_TARGET, "XPC request received: {}", method);
|
||||||
log::info!(target: LOG_TARGET, "XPC request received: {}", method);
|
let response = rt_conn.block_on(dispatch(&agent_conn, &conns_for_handler, client, &map));
|
||||||
let response = rt_conn.block_on(dispatch(&agent_conn, &conns_for_handler, client, &map));
|
let reply = unsafe { xpc_sys::xpc_dictionary_create_reply(msg) };
|
||||||
let reply = xpc_sys::xpc_dictionary_create_reply(msg);
|
if !reply.is_null() {
|
||||||
if !reply.is_null() {
|
let payload = message_to_xpc_object(response);
|
||||||
let payload = message_to_xpc_object(response);
|
let apply_block = ConcreteBlock::new(move |key: *const c_char, value: xpc_sys::xpc_object_t| {
|
||||||
let apply_block = ConcreteBlock::new(move |key: *const c_char, value: xpc_sys::xpc_object_t| {
|
unsafe { xpc_sys::xpc_dictionary_set_value(reply, key, value); }
|
||||||
xpc_sys::xpc_dictionary_set_value(reply, key, value);
|
})
|
||||||
})
|
.copy();
|
||||||
.copy();
|
unsafe {
|
||||||
xpc_sys::xpc_dictionary_apply(payload, apply_block.deref() as *const _ as *mut _);
|
xpc_sys::xpc_dictionary_apply(payload, apply_block.deref() as *const _ as *mut _);
|
||||||
xpc_sys::xpc_connection_send_message(client, reply);
|
xpc_sys::xpc_connection_send_message(client, reply);
|
||||||
xpc_sys::xpc_release(payload);
|
xpc_sys::xpc_release(payload);
|
||||||
xpc_sys::xpc_release(reply);
|
xpc_sys::xpc_release(reply);
|
||||||
log::info!(target: LOG_TARGET, "XPC reply sent for method: {}", method);
|
|
||||||
} else {
|
|
||||||
log::warn!(target: LOG_TARGET, "No reply port for method: {}", method);
|
|
||||||
}
|
}
|
||||||
|
log::trace!(target: LOG_TARGET, "XPC reply sent for method: {}", method);
|
||||||
|
} else {
|
||||||
|
log::warn!(target: LOG_TARGET, "No reply port for method: {}", method);
|
||||||
}
|
}
|
||||||
Message::Error(e) => {
|
|
||||||
match e {
|
|
||||||
MessageError::ConnectionInvalid => {
|
|
||||||
// Normal for one-shot RPC connections; keep logs quiet
|
|
||||||
let mut list = conns_for_handler.lock().unwrap();
|
|
||||||
let before = list.len();
|
|
||||||
list.retain(|c| c.0 != client);
|
|
||||||
let after = list.len();
|
|
||||||
if after < before {
|
|
||||||
log::info!(target: LOG_TARGET, "Removed closed XPC client from subscribers ({} -> {})", before, after);
|
|
||||||
} else {
|
|
||||||
log::debug!(target: LOG_TARGET, "XPC connection closed (no subscription)");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
other => {
|
|
||||||
log::warn!(target: LOG_TARGET, "XPC error event: {:?}", other);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
}
|
||||||
|
Message::Error(e) => {
|
||||||
|
match e {
|
||||||
|
MessageError::ConnectionInvalid => {
|
||||||
|
// Normal for one-shot RPC connections; keep logs quiet
|
||||||
|
let mut list = conns_for_handler.lock().unwrap();
|
||||||
|
let before = list.len();
|
||||||
|
list.retain(|c| c.0 != client);
|
||||||
|
let after = list.len();
|
||||||
|
if after < before {
|
||||||
|
log::trace!(target: LOG_TARGET, "Removed closed XPC client from subscribers ({} -> {})", before, after);
|
||||||
|
} else {
|
||||||
|
log::debug!(target: LOG_TARGET, "XPC connection closed (no subscription)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
log::warn!(target: LOG_TARGET, "XPC error event: {:?}", other);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.copy();
|
.copy();
|
||||||
@@ -616,7 +614,7 @@ async fn dispatch(
|
|||||||
// Avoid duplicates
|
// Avoid duplicates
|
||||||
if !list.iter().any(|c| c.0 == current_client) {
|
if !list.iter().any(|c| c.0 == current_client) {
|
||||||
list.push(XpcConn(current_client));
|
list.push(XpcConn(current_client));
|
||||||
log::info!(target: LOG_TARGET, "Client subscribed to signals (total subscribers: {})", list.len());
|
log::trace!(target: LOG_TARGET, "Client subscribed to signals (total subscribers: {})", list.len());
|
||||||
}
|
}
|
||||||
make_ok_reply()
|
make_ok_reply()
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user