Private
Public Access
1
0

xpc: Use reply port when replying to RPC messages

This commit is contained in:
2025-08-24 15:28:33 -07:00
parent 06b27c041a
commit a93a773071
5 changed files with 205 additions and 212 deletions

View File

@@ -1,16 +1,25 @@
use crate::xpc::interface::SERVICE_NAME;
use futures_util::StreamExt;
use kordophoned::daemon::settings::Settings;
use kordophoned::daemon::{events::Event, signals::Signal, DaemonResult};
use std::collections::HashMap;
use std::ffi::CString;
use std::os::raw::c_char;
use std::ptr;
use std::sync::Arc;
use std::thread;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use xpc_connection::{Message, MessageError, XpcClient, XpcListener};
use tokio::sync::{mpsc, oneshot, Mutex};
use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
use xpc_connection_sys as xpc_sys;
static LOG_TARGET: &str = "xpc";
/// Wrapper for raw XPC connection pointer to declare cross-thread usage.
/// Safety: libxpc connections are reference-counted and may be used to send from other threads.
#[derive(Copy, Clone)]
struct XpcConn(pub xpc_sys::xpc_connection_t);
unsafe impl Send for XpcConn {}
unsafe impl Sync for XpcConn {}
/// XPC IPC agent that forwards daemon events and signals over libxpc.
#[derive(Clone)]
pub struct XpcAgent {
@@ -21,14 +30,15 @@ pub struct XpcAgent {
impl XpcAgent {
/// Create a new XPC agent with an event sink and signal receiver.
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self {
Self {
event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
}
Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))) }
}
/// Run the XPC agent and host the XPC service. Implements generic dispatch.
pub async fn run(self) {
use block::ConcreteBlock;
use std::ops::Deref;
use std::sync::Mutex as StdMutex;
log::info!(target: LOG_TARGET, "XPCAgent running");
// Construct the Mach service name without a trailing NUL for CString.
@@ -47,47 +57,137 @@ impl XpcAgent {
service_name
);
// Broadcast channel for signals to all connected clients
let (signal_tx, _signal_rx) = broadcast::channel::<Signal>(64);
// Multi-thread runtime to drive async dispatch from XPC event handlers.
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => Arc::new(rt),
Err(e) => {
log::error!(target: LOG_TARGET, "Failed to create Tokio runtime: {}", e);
return;
}
};
// Spawn a single distributor task that forwards daemon signals to broadcast
// Shared list of connected clients for signal fanout
let connections: Arc<StdMutex<Vec<XpcConn>>> = Arc::new(StdMutex::new(Vec::new()));
// Forward daemon signals to all connected clients
{
let receiver_arc = self.signal_receiver.clone();
let signal_tx_clone = signal_tx.clone();
tokio::spawn(async move {
let conns = connections.clone();
rt.spawn(async move {
let mut receiver = receiver_arc
.lock()
.await
.take()
.expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await {
let _ = signal_tx_clone.send(signal);
log::info!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
let msg = signal_to_message(signal);
let xobj = unsafe { message_to_xpc_object(msg) };
let list = conns.lock().unwrap();
log::info!(target: LOG_TARGET, "Active XPC clients: {}", list.len());
for c in list.iter() {
log::info!(target: LOG_TARGET, "Sending signal to client");
unsafe { xpc_sys::xpc_connection_send_message(c.0, xobj) };
}
unsafe { xpc_sys::xpc_release(xobj) };
}
});
}
let mut listener = XpcListener::listen(&mach_port_name);
// Create the XPC Mach service listener.
let service = unsafe {
xpc_sys::xpc_connection_create_mach_service(
mach_port_name.as_ptr(),
ptr::null_mut(),
xpc_sys::XPC_CONNECTION_MACH_SERVICE_LISTENER as u64,
)
};
while let Some(client) = listener.next().await {
let agent = self.clone();
let signal_rx = signal_tx.subscribe();
thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::error!(target: LOG_TARGET, "Failed to build runtime for client: {}", e);
return;
}
};
rt.block_on(handle_client(agent, client, signal_rx));
});
// Event handler for the service: accepts new client connections.
let agent = self.clone();
let rt_accept = rt.clone();
let conns_accept = connections.clone();
let service_handler = ConcreteBlock::new(move |event: xpc_sys::xpc_object_t| {
unsafe {
// Treat incoming events as connections; ignore others
// We detect connections by trying to set a per-connection handler.
let client = event as xpc_sys::xpc_connection_t;
log::info!(target: LOG_TARGET, "New XPC connection accepted");
// Do not register for signals until the client explicitly subscribes
// Per-connection handler
let agent_conn = agent.clone();
let rt_conn = rt_accept.clone();
let conns_for_handler = conns_accept.clone();
let conn_handler = ConcreteBlock::new(move |msg: xpc_sys::xpc_object_t| {
unsafe {
// Convert to higher-level Message for type matching
match xpc_object_to_message(msg) {
Message::Dictionary(map) => {
// Trace inbound method
let method = dict_get_str(&map, "method").or_else(|| dict_get_str(&map, "type")).unwrap_or_else(|| "<unknown>".to_string());
log::info!(target: LOG_TARGET, "XPC request received: {}", method);
let response = rt_conn.block_on(dispatch(&agent_conn, &conns_for_handler, client, &map));
let reply = xpc_sys::xpc_dictionary_create_reply(msg);
if !reply.is_null() {
let payload = message_to_xpc_object(response);
let apply_block = ConcreteBlock::new(move |key: *const c_char, value: xpc_sys::xpc_object_t| {
xpc_sys::xpc_dictionary_set_value(reply, key, value);
})
.copy();
xpc_sys::xpc_dictionary_apply(payload, apply_block.deref() as *const _ as *mut _);
xpc_sys::xpc_connection_send_message(client, reply);
xpc_sys::xpc_release(payload);
xpc_sys::xpc_release(reply);
log::info!(target: LOG_TARGET, "XPC reply sent for method: {}", method);
} else {
log::warn!(target: LOG_TARGET, "No reply port for method: {}", method);
}
}
Message::Error(e) => {
match e {
MessageError::ConnectionInvalid => {
// Normal for one-shot RPC connections; keep logs quiet
let mut list = conns_for_handler.lock().unwrap();
let before = list.len();
list.retain(|c| c.0 != client);
let after = list.len();
if after < before {
log::info!(target: LOG_TARGET, "Removed closed XPC client from subscribers ({} -> {})", before, after);
} else {
log::debug!(target: LOG_TARGET, "XPC connection closed (no subscription)");
}
}
other => {
log::warn!(target: LOG_TARGET, "XPC error event: {:?}", other);
}
}
}
_ => {}
}
}
})
.copy();
xpc_sys::xpc_connection_set_event_handler(
client,
conn_handler.deref() as *const _ as *mut _,
);
xpc_sys::xpc_connection_resume(client);
}
}
)
.copy();
unsafe {
xpc_sys::xpc_connection_set_event_handler(
service,
service_handler.deref() as *const _ as *mut _,
);
xpc_sys::xpc_connection_resume(service);
}
log::info!(target: LOG_TARGET, "XPC listener shutting down");
// Keep this future alive forever.
futures_util::future::pending::<()>().await;
}
/// Send an event to the daemon and await its reply.
@@ -128,6 +228,8 @@ fn get_dictionary_field<'a>(
}
fn make_error_reply(code: &str, message: &str) -> Message {
log::error!(target: LOG_TARGET, "XPC error: {code}: {message}");
let mut reply: HashMap<CString, Message> = HashMap::new();
reply.insert(cstr("type"), Message::String(cstr("Error")));
reply.insert(cstr("error"), Message::String(cstr(code)));
@@ -172,16 +274,31 @@ fn make_ok_reply() -> Message {
Message::Dictionary(reply)
}
async fn dispatch(agent: &XpcAgent, root: &HashMap<CString, Message>) -> Message {
// Standardized request: { method: String, arguments: Dictionary? }
/// Attach an optional request_id to a dictionary reply message.
fn attach_request_id(mut message: Message, request_id: Option<String>) -> Message {
if let (Some(id), Message::Dictionary(ref mut m)) = (request_id, &mut message) {
dict_put_str(m, "request_id", &id);
}
message
}
async fn dispatch(
agent: &XpcAgent,
subscribers: &std::sync::Mutex<Vec<XpcConn>>,
current_client: xpc_sys::xpc_connection_t,
root: &HashMap<CString, Message>,
) -> Message {
// Standardized request: { method: String, arguments: Dictionary?, request_id: String? }
let request_id = dict_get_str(root, "request_id");
let method = match dict_get_str(root, "method").or_else(|| dict_get_str(root, "type")) {
Some(m) => m,
None => return make_error_reply("InvalidRequest", "Missing method/type"),
None => return attach_request_id(make_error_reply("InvalidRequest", "Missing method/type"), request_id),
};
let _arguments = get_dictionary_field(root, "arguments");
match method.as_str() {
let mut response = match method.as_str() {
// Example implemented method: GetVersion
"GetVersion" => match agent.send_event(Event::GetVersion).await {
Ok(version) => {
@@ -493,12 +610,24 @@ async fn dispatch(agent: &XpcAgent, root: &HashMap<CString, Message>) -> Message
}
}
// No-op used by clients to ensure the connection is established and subscribed
"SubscribeSignals" => make_ok_reply(),
// Subscribe and return immediately
"SubscribeSignals" => {
let mut list = subscribers.lock().unwrap();
// Avoid duplicates
if !list.iter().any(|c| c.0 == current_client) {
list.push(XpcConn(current_client));
log::info!(target: LOG_TARGET, "Client subscribed to signals (total subscribers: {})", list.len());
}
make_ok_reply()
},
// Unknown method fallback
other => make_error_reply("UnknownMethod", other),
}
};
// Echo request_id back (if present) so clients can correlate replies
response = attach_request_id(response, request_id);
response
}
fn signal_to_message(signal: Signal) -> Message {
@@ -531,45 +660,5 @@ fn signal_to_message(signal: Signal) -> Message {
Message::Dictionary(root)
}
async fn handle_client(
agent: XpcAgent,
mut client: XpcClient,
mut signal_rx: broadcast::Receiver<Signal>,
) {
log::info!(target: LOG_TARGET, "New XPC connection");
// legacy async client handler removed in reply-port implementation
loop {
tokio::select! {
maybe_msg = client.next() => {
match maybe_msg {
Some(Message::Error(MessageError::ConnectionInterrupted)) => {
log::warn!(target: LOG_TARGET, "XPC connection interrupted");
}
Some(Message::Dictionary(map)) => {
let response = dispatch(&agent, &map).await;
client.send_message(response);
}
Some(other) => {
log::info!(target: LOG_TARGET, "Echoing message: {:?}", other);
client.send_message(other);
}
None => break,
}
}
recv = signal_rx.recv() => {
match recv {
Ok(signal) => {
let msg = signal_to_message(signal);
client.send_message(msg);
}
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_)) => {
log::warn!(target: LOG_TARGET, "Lagged behind on signals; dropping some events for this client");
}
}
}
}
}
log::info!(target: LOG_TARGET, "XPC connection closed");
}