Private
Public Access
1
0

xpc: generic interface for dispatching methods

This commit is contained in:
2025-08-23 19:24:42 -07:00
parent e51fa3abeb
commit 8ff95f4bf9
4 changed files with 81 additions and 88 deletions

View File

@@ -5,6 +5,7 @@ use std::collections::HashMap;
use std::ffi::CString;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex};
use std::thread;
use xpc_connection::{Message, MessageError, XpcClient, XpcListener};
@@ -26,7 +27,7 @@ impl XpcAgent {
}
}
/// Run the XPC agent and host the XPC service. Implements `GetVersion`.
/// Run the XPC agent and host the XPC service. Implements generic dispatch.
pub async fn run(self) {
log::info!(target: LOG_TARGET, "XPCAgent running");
@@ -49,7 +50,20 @@ impl XpcAgent {
let mut listener = XpcListener::listen(&mach_port_name);
while let Some(client) = listener.next().await {
tokio::spawn(handle_client(client));
let agent = self.clone();
thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::error!(target: LOG_TARGET, "Failed to build runtime for client: {}", e);
return;
}
};
rt.block_on(handle_client(agent, client));
});
}
log::info!(target: LOG_TARGET, "XPC listener shutting down");
@@ -69,7 +83,62 @@ impl XpcAgent {
}
}
async fn handle_client(mut client: XpcClient) {
fn cstr(s: &str) -> CString { CString::new(s).unwrap_or_else(|_| CString::new("").unwrap()) }
fn get_string_field(map: &HashMap<CString, Message>, key: &str) -> Option<String> {
let k = CString::new(key).ok()?;
map.get(&k).and_then(|v| match v {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
}
fn get_dictionary_field<'a>(map: &'a HashMap<CString, Message>, key: &str) -> Option<&'a HashMap<CString, Message>> {
let k = CString::new(key).ok()?;
map.get(&k).and_then(|v| match v {
Message::Dictionary(d) => Some(d),
_ => None,
})
}
fn make_error_reply(code: &str, message: &str) -> Message {
let mut reply: HashMap<CString, Message> = HashMap::new();
reply.insert(cstr("type"), Message::String(cstr("Error")));
reply.insert(cstr("error"), Message::String(cstr(code)));
reply.insert(cstr("message"), Message::String(cstr(message)));
Message::Dictionary(reply)
}
async fn dispatch(agent: &XpcAgent, root: &HashMap<CString, Message>) -> Message {
// Standardized request: { method: String, arguments: Dictionary? }
let method = match get_string_field(root, "method").or_else(|| get_string_field(root, "type")) {
Some(m) => m,
None => return make_error_reply("InvalidRequest", "Missing method/type"),
};
let _arguments = get_dictionary_field(root, "arguments");
match method.as_str() {
// Example implemented method: GetVersion
"GetVersion" => {
match agent.send_event(Event::GetVersion).await {
Ok(version) => {
let mut reply: HashMap<CString, Message> = HashMap::new();
reply.insert(cstr("type"), Message::String(cstr("GetVersionResponse")));
reply.insert(cstr("version"), Message::String(cstr(&version)));
Message::Dictionary(reply)
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// Unknown method fallback
other => make_error_reply("UnknownMethod", other),
}
}
async fn handle_client(agent: XpcAgent, mut client: XpcClient) {
log::info!(target: LOG_TARGET, "New XPC connection");
while let Some(message) = client.next().await {
@@ -78,62 +147,8 @@ async fn handle_client(mut client: XpcClient) {
log::warn!(target: LOG_TARGET, "XPC connection interrupted");
}
Message::Dictionary(map) => {
// Try keys "method" or "type" to identify the call.
let method_key = CString::new("method").unwrap();
let type_key = CString::new("type").unwrap();
let maybe_method = map
.get(&method_key)
.or_else(|| map.get(&type_key))
.and_then(|v| match v {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
});
match maybe_method.as_deref() {
Some("GetVersion") => {
let mut reply: HashMap<CString, Message> = HashMap::new();
reply.insert(
CString::new("type").unwrap(),
Message::String(CString::new("GetVersionResponse").unwrap()),
);
reply.insert(
CString::new("version").unwrap(),
Message::String(CString::new(env!("CARGO_PKG_VERSION")).unwrap()),
);
client.send_message(Message::Dictionary(reply));
}
Some(other) => {
log::warn!(target: LOG_TARGET, "Unknown XPC method: {}", other);
let mut reply: HashMap<CString, Message> = HashMap::new();
reply.insert(
CString::new("type").unwrap(),
Message::String(CString::new("Error").unwrap()),
);
reply.insert(
CString::new("error").unwrap(),
Message::String(CString::new("UnknownMethod").unwrap()),
);
reply.insert(
CString::new("message").unwrap(),
Message::String(CString::new(other).unwrap_or_else(|_| CString::new("").unwrap())),
);
client.send_message(Message::Dictionary(reply));
}
None => {
log::warn!(target: LOG_TARGET, "XPC message missing method/type");
let mut reply: HashMap<CString, Message> = HashMap::new();
reply.insert(
CString::new("type").unwrap(),
Message::String(CString::new("Error").unwrap()),
);
reply.insert(
CString::new("error").unwrap(),
Message::String(CString::new("InvalidRequest").unwrap()),
);
client.send_message(Message::Dictionary(reply));
}
}
let response = dispatch(&agent, &map).await;
client.send_message(response);
}
other => {
// For now just echo any non-dictionary messages (useful for testing).