diff --git a/Cargo.lock b/Cargo.lock
index f90bab8..1bd2904 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -745,16 +745,6 @@ dependencies = [
"futures-util",
]
-[[package]]
-name = "futures-async-runtime-preview"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "33c03035be1dae627b7e05c6984acb1f2086043fde5249ae51604f1ff20ed037"
-dependencies = [
- "futures-core-preview",
- "futures-stable-preview",
-]
-
[[package]]
name = "futures-channel"
version = "0.3.31"
@@ -765,30 +755,12 @@ dependencies = [
"futures-sink",
]
-[[package]]
-name = "futures-channel-preview"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d6f8aec6b0eb1d281843ec666fba2b71a49610181e3078fbef7a8cbed481821e"
-dependencies = [
- "futures-core-preview",
-]
-
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
-[[package]]
-name = "futures-core-preview"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "098785413db44e5dbf3b1fc23c24039a9091bea5acb3eb0d293f386f18aff97d"
-dependencies = [
- "either",
-]
-
[[package]]
name = "futures-executor"
version = "0.3.31"
@@ -800,35 +772,12 @@ dependencies = [
"futures-util",
]
-[[package]]
-name = "futures-executor-preview"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "28ff61425699ca85de5c63c1f135278403518c3398bd15cf4b6fd1d21c9846e4"
-dependencies = [
- "futures-channel-preview",
- "futures-core-preview",
- "futures-util-preview",
- "lazy_static",
- "num_cpus",
-]
-
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
-[[package]]
-name = "futures-io-preview"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aaa769a6ac904912c1557b4dcf85b93db2bc9ba57c349f9ce43870e49d67f8e1"
-dependencies = [
- "futures-core-preview",
- "iovec",
-]
-
[[package]]
name = "futures-macro"
version = "0.3.31"
@@ -840,49 +789,12 @@ dependencies = [
"syn",
]
-[[package]]
-name = "futures-preview"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c4d575096a4e2cf458f309b5b7bce5c8aaad8e874b8d77f0aa26c08d7ac18f74"
-dependencies = [
- "futures-async-runtime-preview",
- "futures-channel-preview",
- "futures-core-preview",
- "futures-executor-preview",
- "futures-io-preview",
- "futures-sink-preview",
- "futures-stable-preview",
- "futures-util-preview",
-]
-
[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
-[[package]]
-name = "futures-sink-preview"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5dc4cdc628b934f18a11ba070d589655f68cfec031a16381b0e7784ff0e9cc18"
-dependencies = [
- "either",
- "futures-channel-preview",
- "futures-core-preview",
-]
-
-[[package]]
-name = "futures-stable-preview"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a6ba960b8bbbc14a9a741cc8ad9c26aff44538ea14be021db905b43f33854da"
-dependencies = [
- "futures-core-preview",
- "futures-executor-preview",
-]
-
[[package]]
name = "futures-task"
version = "0.3.31"
@@ -907,19 +819,6 @@ dependencies = [
"slab",
]
-[[package]]
-name = "futures-util-preview"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4b29aa737dba9e2e47a5dcd4d58ec7c7c2d5f78e8460f609f857bcf04163235e"
-dependencies = [
- "either",
- "futures-channel-preview",
- "futures-core-preview",
- "futures-io-preview",
- "futures-sink-preview",
-]
-
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -1144,15 +1043,6 @@ dependencies = [
"hashbrown",
]
-[[package]]
-name = "iovec"
-version = "0.1.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
-dependencies = [
- "libc",
-]
-
[[package]]
name = "is-terminal"
version = "0.4.16"
@@ -1270,6 +1160,7 @@ version = "1.0.0"
dependencies = [
"anyhow",
"async-trait",
+ "block",
"chrono",
"dbus",
"dbus-codegen",
@@ -1278,7 +1169,7 @@ dependencies = [
"dbus-tree",
"directories",
"env_logger 0.11.8",
- "futures-preview",
+ "futures",
"futures-util",
"keyring",
"kordophone",
@@ -1556,16 +1447,6 @@ dependencies = [
"autocfg",
]
-[[package]]
-name = "num_cpus"
-version = "1.17.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
-dependencies = [
- "hermit-abi 0.5.0",
- "libc",
-]
-
[[package]]
name = "object"
version = "0.32.2"
diff --git a/kordophoned/Cargo.toml b/kordophoned/Cargo.toml
index 22b005f..dedfe31 100644
--- a/kordophoned/Cargo.toml
+++ b/kordophoned/Cargo.toml
@@ -35,7 +35,8 @@ dbus-crossroads = "0.5.1"
# XPC (libxpc) interface for macOS IPC
[target.'cfg(target_os = "macos")'.dependencies]
-futures-preview = "=0.2.2"
+block = "0.1.6"
+futures = "0.3.31"
xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" }
xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" }
serde = { version = "1.0", features = ["derive"] }
diff --git a/kordophoned/include/net.buzzert.kordophonecd.plist b/kordophoned/include/net.buzzert.kordophonecd.plist
index 976049b..df9c1a1 100644
--- a/kordophoned/include/net.buzzert.kordophonecd.plist
+++ b/kordophoned/include/net.buzzert.kordophonecd.plist
@@ -7,7 +7,7 @@
ProgramArguments
- /Users/buzzert/src/kordophone-rs/target/debug/kordophoned
+ /Users/buzzert/src/kordophone/kordophone-rs/target/debug/kordophoned
EnvironmentVariables
diff --git a/kordophoned/src/xpc/agent.rs b/kordophoned/src/xpc/agent.rs
index cd7b0b3..49079bb 100644
--- a/kordophoned/src/xpc/agent.rs
+++ b/kordophoned/src/xpc/agent.rs
@@ -1,16 +1,25 @@
use crate::xpc::interface::SERVICE_NAME;
-use futures_util::StreamExt;
use kordophoned::daemon::settings::Settings;
use kordophoned::daemon::{events::Event, signals::Signal, DaemonResult};
use std::collections::HashMap;
use std::ffi::CString;
+use std::os::raw::c_char;
+use std::ptr;
use std::sync::Arc;
use std::thread;
-use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
-use xpc_connection::{Message, MessageError, XpcClient, XpcListener};
+use tokio::sync::{mpsc, oneshot, Mutex};
+use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
+use xpc_connection_sys as xpc_sys;
static LOG_TARGET: &str = "xpc";
+/// Wrapper for raw XPC connection pointer to declare cross-thread usage.
+/// Safety: libxpc connections are reference-counted and may be used to send from other threads.
+#[derive(Copy, Clone)]
+struct XpcConn(pub xpc_sys::xpc_connection_t);
+unsafe impl Send for XpcConn {}
+unsafe impl Sync for XpcConn {}
+
/// XPC IPC agent that forwards daemon events and signals over libxpc.
#[derive(Clone)]
pub struct XpcAgent {
@@ -21,14 +30,15 @@ pub struct XpcAgent {
impl XpcAgent {
/// Create a new XPC agent with an event sink and signal receiver.
pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self {
- Self {
- event_sink,
- signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
- }
+ Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))) }
}
/// Run the XPC agent and host the XPC service. Implements generic dispatch.
pub async fn run(self) {
+ use block::ConcreteBlock;
+ use std::ops::Deref;
+ use std::sync::Mutex as StdMutex;
+
log::info!(target: LOG_TARGET, "XPCAgent running");
// Construct the Mach service name without a trailing NUL for CString.
@@ -47,47 +57,137 @@ impl XpcAgent {
service_name
);
- // Broadcast channel for signals to all connected clients
- let (signal_tx, _signal_rx) = broadcast::channel::(64);
+ // Multi-thread runtime to drive async dispatch from XPC event handlers.
+ let rt = match tokio::runtime::Runtime::new() {
+ Ok(rt) => Arc::new(rt),
+ Err(e) => {
+ log::error!(target: LOG_TARGET, "Failed to create Tokio runtime: {}", e);
+ return;
+ }
+ };
- // Spawn a single distributor task that forwards daemon signals to broadcast
+ // Shared list of connected clients for signal fanout
+ let connections: Arc>> = Arc::new(StdMutex::new(Vec::new()));
+ // Forward daemon signals to all connected clients
{
let receiver_arc = self.signal_receiver.clone();
- let signal_tx_clone = signal_tx.clone();
- tokio::spawn(async move {
+ let conns = connections.clone();
+ rt.spawn(async move {
let mut receiver = receiver_arc
.lock()
.await
.take()
.expect("Signal receiver already taken");
-
while let Some(signal) = receiver.recv().await {
- let _ = signal_tx_clone.send(signal);
+ log::info!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
+ let msg = signal_to_message(signal);
+ let xobj = unsafe { message_to_xpc_object(msg) };
+ let list = conns.lock().unwrap();
+ log::info!(target: LOG_TARGET, "Active XPC clients: {}", list.len());
+ for c in list.iter() {
+ log::info!(target: LOG_TARGET, "Sending signal to client");
+ unsafe { xpc_sys::xpc_connection_send_message(c.0, xobj) };
+ }
+ unsafe { xpc_sys::xpc_release(xobj) };
}
});
}
- let mut listener = XpcListener::listen(&mach_port_name);
+ // Create the XPC Mach service listener.
+ let service = unsafe {
+ xpc_sys::xpc_connection_create_mach_service(
+ mach_port_name.as_ptr(),
+ ptr::null_mut(),
+ xpc_sys::XPC_CONNECTION_MACH_SERVICE_LISTENER as u64,
+ )
+ };
- while let Some(client) = listener.next().await {
- let agent = self.clone();
- let signal_rx = signal_tx.subscribe();
- thread::spawn(move || {
- let rt = match tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- {
- Ok(rt) => rt,
- Err(e) => {
- log::error!(target: LOG_TARGET, "Failed to build runtime for client: {}", e);
- return;
- }
- };
- rt.block_on(handle_client(agent, client, signal_rx));
- });
+ // Event handler for the service: accepts new client connections.
+ let agent = self.clone();
+ let rt_accept = rt.clone();
+ let conns_accept = connections.clone();
+ let service_handler = ConcreteBlock::new(move |event: xpc_sys::xpc_object_t| {
+ unsafe {
+ // Treat incoming events as connections; ignore others
+ // We detect connections by trying to set a per-connection handler.
+ let client = event as xpc_sys::xpc_connection_t;
+ log::info!(target: LOG_TARGET, "New XPC connection accepted");
+ // Do not register for signals until the client explicitly subscribes
+
+ // Per-connection handler
+ let agent_conn = agent.clone();
+ let rt_conn = rt_accept.clone();
+ let conns_for_handler = conns_accept.clone();
+ let conn_handler = ConcreteBlock::new(move |msg: xpc_sys::xpc_object_t| {
+ unsafe {
+ // Convert to higher-level Message for type matching
+ match xpc_object_to_message(msg) {
+ Message::Dictionary(map) => {
+ // Trace inbound method
+ let method = dict_get_str(&map, "method").or_else(|| dict_get_str(&map, "type")).unwrap_or_else(|| "".to_string());
+ log::info!(target: LOG_TARGET, "XPC request received: {}", method);
+ let response = rt_conn.block_on(dispatch(&agent_conn, &conns_for_handler, client, &map));
+ let reply = xpc_sys::xpc_dictionary_create_reply(msg);
+ if !reply.is_null() {
+ let payload = message_to_xpc_object(response);
+ let apply_block = ConcreteBlock::new(move |key: *const c_char, value: xpc_sys::xpc_object_t| {
+ xpc_sys::xpc_dictionary_set_value(reply, key, value);
+ })
+ .copy();
+ xpc_sys::xpc_dictionary_apply(payload, apply_block.deref() as *const _ as *mut _);
+ xpc_sys::xpc_connection_send_message(client, reply);
+ xpc_sys::xpc_release(payload);
+ xpc_sys::xpc_release(reply);
+ log::info!(target: LOG_TARGET, "XPC reply sent for method: {}", method);
+ } else {
+ log::warn!(target: LOG_TARGET, "No reply port for method: {}", method);
+ }
+ }
+ Message::Error(e) => {
+ match e {
+ MessageError::ConnectionInvalid => {
+ // Normal for one-shot RPC connections; keep logs quiet
+ let mut list = conns_for_handler.lock().unwrap();
+ let before = list.len();
+ list.retain(|c| c.0 != client);
+ let after = list.len();
+ if after < before {
+ log::info!(target: LOG_TARGET, "Removed closed XPC client from subscribers ({} -> {})", before, after);
+ } else {
+ log::debug!(target: LOG_TARGET, "XPC connection closed (no subscription)");
+ }
+ }
+ other => {
+ log::warn!(target: LOG_TARGET, "XPC error event: {:?}", other);
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+ })
+ .copy();
+
+ xpc_sys::xpc_connection_set_event_handler(
+ client,
+ conn_handler.deref() as *const _ as *mut _,
+ );
+ xpc_sys::xpc_connection_resume(client);
+ }
+ }
+ )
+ .copy();
+
+ unsafe {
+ xpc_sys::xpc_connection_set_event_handler(
+ service,
+ service_handler.deref() as *const _ as *mut _,
+ );
+ xpc_sys::xpc_connection_resume(service);
}
- log::info!(target: LOG_TARGET, "XPC listener shutting down");
+ // Keep this future alive forever.
+ futures_util::future::pending::<()>().await;
}
/// Send an event to the daemon and await its reply.
@@ -128,6 +228,8 @@ fn get_dictionary_field<'a>(
}
fn make_error_reply(code: &str, message: &str) -> Message {
+ log::error!(target: LOG_TARGET, "XPC error: {code}: {message}");
+
let mut reply: HashMap = HashMap::new();
reply.insert(cstr("type"), Message::String(cstr("Error")));
reply.insert(cstr("error"), Message::String(cstr(code)));
@@ -172,16 +274,31 @@ fn make_ok_reply() -> Message {
Message::Dictionary(reply)
}
-async fn dispatch(agent: &XpcAgent, root: &HashMap) -> Message {
- // Standardized request: { method: String, arguments: Dictionary? }
+/// Attach an optional request_id to a dictionary reply message.
+fn attach_request_id(mut message: Message, request_id: Option) -> Message {
+ if let (Some(id), Message::Dictionary(ref mut m)) = (request_id, &mut message) {
+ dict_put_str(m, "request_id", &id);
+ }
+ message
+}
+
+async fn dispatch(
+ agent: &XpcAgent,
+ subscribers: &std::sync::Mutex>,
+ current_client: xpc_sys::xpc_connection_t,
+ root: &HashMap,
+) -> Message {
+ // Standardized request: { method: String, arguments: Dictionary?, request_id: String? }
+ let request_id = dict_get_str(root, "request_id");
+
let method = match dict_get_str(root, "method").or_else(|| dict_get_str(root, "type")) {
Some(m) => m,
- None => return make_error_reply("InvalidRequest", "Missing method/type"),
+ None => return attach_request_id(make_error_reply("InvalidRequest", "Missing method/type"), request_id),
};
let _arguments = get_dictionary_field(root, "arguments");
- match method.as_str() {
+ let mut response = match method.as_str() {
// Example implemented method: GetVersion
"GetVersion" => match agent.send_event(Event::GetVersion).await {
Ok(version) => {
@@ -493,12 +610,24 @@ async fn dispatch(agent: &XpcAgent, root: &HashMap) -> Message
}
}
- // No-op used by clients to ensure the connection is established and subscribed
- "SubscribeSignals" => make_ok_reply(),
+ // Subscribe and return immediately
+ "SubscribeSignals" => {
+ let mut list = subscribers.lock().unwrap();
+ // Avoid duplicates
+ if !list.iter().any(|c| c.0 == current_client) {
+ list.push(XpcConn(current_client));
+ log::info!(target: LOG_TARGET, "Client subscribed to signals (total subscribers: {})", list.len());
+ }
+ make_ok_reply()
+ },
// Unknown method fallback
other => make_error_reply("UnknownMethod", other),
- }
+ };
+
+ // Echo request_id back (if present) so clients can correlate replies
+ response = attach_request_id(response, request_id);
+ response
}
fn signal_to_message(signal: Signal) -> Message {
@@ -531,45 +660,5 @@ fn signal_to_message(signal: Signal) -> Message {
Message::Dictionary(root)
}
-async fn handle_client(
- agent: XpcAgent,
- mut client: XpcClient,
- mut signal_rx: broadcast::Receiver,
-) {
- log::info!(target: LOG_TARGET, "New XPC connection");
+// legacy async client handler removed in reply-port implementation
- loop {
- tokio::select! {
- maybe_msg = client.next() => {
- match maybe_msg {
- Some(Message::Error(MessageError::ConnectionInterrupted)) => {
- log::warn!(target: LOG_TARGET, "XPC connection interrupted");
- }
- Some(Message::Dictionary(map)) => {
- let response = dispatch(&agent, &map).await;
- client.send_message(response);
- }
- Some(other) => {
- log::info!(target: LOG_TARGET, "Echoing message: {:?}", other);
- client.send_message(other);
- }
- None => break,
- }
- }
- recv = signal_rx.recv() => {
- match recv {
- Ok(signal) => {
- let msg = signal_to_message(signal);
- client.send_message(msg);
- }
- Err(broadcast::error::RecvError::Closed) => break,
- Err(broadcast::error::RecvError::Lagged(_)) => {
- log::warn!(target: LOG_TARGET, "Lagged behind on signals; dropping some events for this client");
- }
- }
- }
- }
- }
-
- log::info!(target: LOG_TARGET, "XPC connection closed");
-}
diff --git a/kpcli/src/daemon/xpc.rs b/kpcli/src/daemon/xpc.rs
index d67d9c2..bada46e 100644
--- a/kpcli/src/daemon/xpc.rs
+++ b/kpcli/src/daemon/xpc.rs
@@ -78,6 +78,23 @@ impl XPCClient {
xpc_release(xpc_object);
}
}
+
+ pub fn send_message_with_reply(&self, message: Message) -> Message {
+ use xpc_connection::message_to_xpc_object;
+ use xpc_connection::xpc_object_to_message;
+ use xpc_connection_sys::{xpc_connection_send_message_with_reply_sync, xpc_release};
+
+ unsafe {
+ let xobj = message_to_xpc_object(message);
+ let reply = xpc_connection_send_message_with_reply_sync(self.connection, xobj);
+ xpc_release(xobj);
+ let msg = xpc_object_to_message(reply);
+ if !reply.is_null() {
+ xpc_release(reply);
+ }
+ msg
+ }
+ }
}
impl Drop for XPCClient {
@@ -147,12 +164,10 @@ impl XpcDaemonInterface {
args: Option>,
) -> anyhow::Result> {
let request = Self::build_request(method, args);
- client.send_message(Message::Dictionary(request));
-
- match client.next().await {
- Some(Message::Dictionary(map)) => Ok(map),
- Some(other) => Err(anyhow::anyhow!("Unexpected XPC reply: {:?}", other)),
- None => Err(anyhow::anyhow!("No reply received from XPC daemon")),
+ let reply = client.send_message_with_reply(Message::Dictionary(request));
+ match reply {
+ Message::Dictionary(map) => Ok(map),
+ other => Err(anyhow::anyhow!("Unexpected XPC reply: {:?}", other)),
}
}
@@ -384,7 +399,8 @@ impl DaemonInterface for XpcDaemonInterface {
let mach_port_name = Self::build_service_name()?;
let mut client = XPCClient::connect(&mach_port_name);
- // Send a subscription/warm-up message so the server loop starts selecting for this client
+ // Subscribe to begin receiving signals on this connection
+ eprintln!("[kpcli] Sending SubscribeSignals");
client.send_message(Message::Dictionary(Self::build_request(
"SubscribeSignals",
None,
@@ -394,6 +410,7 @@ impl DaemonInterface for XpcDaemonInterface {
while let Some(msg) = client.next().await {
match msg {
Message::Dictionary(map) => {
+ eprintln!("[kpcli] Received signal dictionary");
let name_key = Self::key("name");
let args_key = Self::key("arguments");
let name = match map.get(&name_key) {
@@ -476,8 +493,13 @@ impl DaemonInterface for XpcDaemonInterface {
_ => {}
}
}
- Message::Error(xpc_connection::MessageError::ConnectionInvalid) => break,
- _ => {}
+ Message::Error(xpc_connection::MessageError::ConnectionInvalid) => {
+ eprintln!("[kpcli] XPC connection invalid");
+ break
+ }
+ other => {
+ eprintln!("[kpcli] Unexpected XPC message: {:?}", other);
+ }
}
}
Ok(())