Private
Public Access
1
0

Merge branch 'wip/macos-xpc'

* wip/macos-xpc: (23 commits)
  auth: try switching to platform agnostic auth store
  daemon: fix crash when misconfigured
  xpc: better file descriptor handling
  xpc: adds OpenAttachmentFd
  xpc: full attachment data
  sync policy: only ignore empty bodies if there are no attachments
  xpc: include attachment guids
  cargo fmt
  xpc: Some cleanup
  xpc: refactor -- separate rpc impl and xpc glue
  xpc: refactor, less chatty logging
  xpc: Use reply port when replying to RPC messages
  cargo fmt
  xpc: implement signals
  xpc: implement rest of methods in kpcli except signals.
  cargo fmt
  xpc: Better type unpacking
  xpc: implement GetConversations
  xpc: kpcli: clean up client interface
  xpc: generic interface for dispatching methods
  ...
This commit is contained in:
2025-08-25 01:01:56 -07:00
34 changed files with 1948 additions and 179 deletions

View File

@@ -12,7 +12,7 @@ chrono = "0.4.38"
directories = "6.0.0"
env_logger = "0.11.6"
futures-util = "0.3.31"
keyring = { version = "3.6.2", features = ["sync-secret-service"] }
keyring = { version = "3.6.3", features = ["apple-native", "sync-secret-service"] }
kordophone = { path = "../kordophone" }
kordophone-db = { path = "../kordophone-db" }
log = "0.4.25"
@@ -23,18 +23,25 @@ tokio-condvar = "0.3.0"
uuid = "1.16.0"
once_cell = "1.19.0"
[target.'cfg(target_os = "linux")'.dependencies]
# D-Bus dependencies only on Linux
[target.'cfg(target_os = "linux")'.dependencies]
dbus = { version = "0.9.7", features = ["futures"] }
dbus-crossroads = "0.5.2"
dbus-tokio = "0.7.6"
dbus-tree = "0.9.2"
[target.'cfg(target_os = "linux")'.build-dependencies]
# D-Bus codegen only on Linux
[target.'cfg(target_os = "linux")'.build-dependencies]
dbus-codegen = "0.10.0"
dbus-crossroads = "0.5.1"
# XPC (libxpc) interface for macOS IPC
[target.'cfg(target_os = "macos")'.dependencies]
block = "0.1.6"
futures = "0.3.31"
xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" }
xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" }
serde = { version = "1.0", features = ["derive"] }
[package.metadata.generate-rpm]
assets = [
@@ -42,3 +49,4 @@ assets = [
{ source = "../target/release/kpcli", dest = "/usr/bin/kpcli", mode = "755" },
{ source = "include/net.buzzert.kordophonecd.service", dest = "/usr/share/dbus-1/services/net.buzzert.kordophonecd.service", mode = "644" },
]

View File

@@ -13,3 +13,25 @@ cargo build --release
strip -s target/release/kordophoned
cargo generate-rpm
```
## Running on macOS
Before any client can talk to the kordophone daemon on macOS, the XPC service needs to be manually registered with launchd.
- Register using `launchctl load net.buzzert.kordophonecd.plist`
Plans are to embed this into the app executable, which would then not need to be manually registered (only via the following Swift code):
```swift
try? SMAppService.agent(plistName: "net.buzzert.kordophonecd.plist").register()
```
and the following in Info.plist:
```xml
<key>Label</key><string>net.buzzert.kordophonecd</string>
<key>BundleProgram</key><string>Contents/MacOS/kordophoned</string>
<key>MachServices</key><dict><key>net.buzzert.kordophonecd</key><true/></dict>
<key>KeepAlive</key><true/>
```

View File

@@ -20,8 +20,8 @@ fn main() {
let xml = std::fs::read_to_string(KORDOPHONE_XML).expect("Error reading server dbus interface");
let output = dbus_codegen::generate(&xml, &opts)
.expect("Error generating server dbus interface");
let output =
dbus_codegen::generate(&xml, &opts).expect("Error generating server dbus interface");
std::fs::write(out_path, output).expect("Error writing server dbus code");

View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>net.buzzert.kordophonecd</string>
<key>ProgramArguments</key>
<array>
<string>/Users/buzzert/src/kordophone/kordophone-rs/target/debug/kordophoned</string>
</array>
<key>EnvironmentVariables</key>
<dict>
<key>RUST_LOG</key>
<string>info</string>
</dict>
<key>MachServices</key>
<dict>
<key>net.buzzert.kordophonecd</key>
<true/>
</dict>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardOutPath</key>
<string>/tmp/kordophoned.out.log</string>
<key>StandardErrorPath</key>
<string>/tmp/kordophoned.err.log</string>
</dict>
</plist>

View File

@@ -21,10 +21,7 @@ impl DatabaseAuthenticationStore {
#[async_trait]
impl AuthenticationStore for DatabaseAuthenticationStore {
#[cfg(target_os = "linux")]
async fn get_credentials(&mut self) -> Option<Credentials> {
use keyring::secret_service::SsCredential;
self.database
.lock()
.await
@@ -38,15 +35,14 @@ impl AuthenticationStore for DatabaseAuthenticationStore {
match username {
Some(username) => {
let credential = SsCredential::new_with_target(
None,
"net.buzzert.kordophonecd",
&username,
)
.unwrap();
let password: Result<String> =
Entry::new_with_credential(Box::new(credential)).get_password();
let credential_res = Entry::new("net.buzzert.kordophonecd", &username);
let password: Result<String> = match credential_res {
Ok(credential) => credential.get_password(),
Err(e) => {
log::error!("error creating keyring credential: {}", e);
return None;
}
};
match password {
Ok(password) => Some(Credentials { username, password }),
@@ -62,11 +58,6 @@ impl AuthenticationStore for DatabaseAuthenticationStore {
.await
}
#[cfg(not(target_os = "linux"))]
async fn get_credentials(&mut self) -> Option<Credentials> {
None
}
async fn get_token(&mut self) -> Option<String> {
self.database
.lock()

View File

@@ -1,10 +1,10 @@
use super::ContactResolverBackend;
use dbus::blocking::Connection;
use dbus::arg::{RefArg, Variant};
use dbus::blocking::Connection;
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use std::time::Duration;
use std::sync::Mutex;
use std::time::Duration;
#[derive(Clone)]
pub struct EDSContactResolverBackend;
@@ -40,8 +40,13 @@ static ADDRESS_BOOK_HANDLE: OnceCell<Mutex<AddressBookHandle>> = OnceCell::new()
/// Check whether a given well-known name currently has an owner on the bus.
fn name_has_owner(conn: &Connection, name: &str) -> bool {
let proxy = conn.with_proxy("org.freedesktop.DBus", "/org/freedesktop/DBus", Duration::from_secs(2));
let result: Result<(bool,), _> = proxy.method_call("org.freedesktop.DBus", "NameHasOwner", (name.to_string(),));
let proxy = conn.with_proxy(
"org.freedesktop.DBus",
"/org/freedesktop/DBus",
Duration::from_secs(2),
);
let result: Result<(bool,), _> =
proxy.method_call("org.freedesktop.DBus", "NameHasOwner", (name.to_string(),));
result.map(|(b,)| b).unwrap_or(false)
}
@@ -99,10 +104,7 @@ fn ensure_address_book_uid(conn: &Connection) -> anyhow::Result<String> {
// The GetManagedObjects reply is the usual ObjectManager map.
let (managed_objects,): (
HashMap<
dbus::Path<'static>,
HashMap<String, HashMap<String, Variant<Box<dyn RefArg>>>>,
>,
HashMap<dbus::Path<'static>, HashMap<String, HashMap<String, Variant<Box<dyn RefArg>>>>>,
) = source_manager_proxy.method_call(
"org.freedesktop.DBus.ObjectManager",
"GetManagedObjects",
@@ -153,10 +155,7 @@ fn data_contains_address_book_backend(data: &str) -> bool {
/// Open the Evolution address book referenced by `source_uid` and return the
/// pair `(object_path, bus_name)` that identifies the newly created D-Bus
/// proxy.
fn open_address_book(
conn: &Connection,
source_uid: &str,
) -> anyhow::Result<(String, String)> {
fn open_address_book(conn: &Connection, source_uid: &str) -> anyhow::Result<(String, String)> {
let factory_proxy = conn.with_proxy(
"org.gnome.evolution.dataserver.AddressBook10",
"/org/gnome/evolution/dataserver/AddressBookFactory",
@@ -177,11 +176,8 @@ fn open_address_book(
/// calls the `Open` method once per process. We ignore any error here
/// because the backend might already be open.
fn ensure_address_book_open(proxy: &dbus::blocking::Proxy<&Connection>) {
let _: Result<(), _> = proxy.method_call(
"org.gnome.evolution.dataserver.AddressBook",
"Open",
(),
);
let _: Result<(), _> =
proxy.method_call("org.gnome.evolution.dataserver.AddressBook", "Open", ());
}
impl ContactResolverBackend for EDSContactResolverBackend {
@@ -295,4 +291,4 @@ impl Default for EDSContactResolverBackend {
fn default() -> Self {
Self
}
}
}

View File

@@ -13,4 +13,4 @@ impl ContactResolverBackend for GenericContactResolverBackend {
fn get_contact_display_name(&self, contact_id: &Self::ContactID) -> Option<String> {
None
}
}
}

View File

@@ -15,7 +15,9 @@ pub struct EDSContactResolverBackend;
#[cfg(not(target_os = "linux"))]
impl Default for EDSContactResolverBackend {
fn default() -> Self { EDSContactResolverBackend }
fn default() -> Self {
EDSContactResolverBackend
}
}
#[cfg(not(target_os = "linux"))]
@@ -56,7 +58,11 @@ where
T: Default,
{
pub fn new(backend: T) -> Self {
Self { backend, display_name_cache: HashMap::new(), contact_id_cache: HashMap::new() }
Self {
backend,
display_name_cache: HashMap::new(),
contact_id_cache: HashMap::new(),
}
}
pub fn resolve_contact_id(&mut self, address: &str) -> Option<AnyContactID> {
@@ -66,7 +72,8 @@ where
let id = self.backend.resolve_contact_id(address).map(|id| id.into());
if let Some(ref id) = id {
self.contact_id_cache.insert(address.to_string(), id.clone());
self.contact_id_cache
.insert(address.to_string(), id.clone());
}
id
@@ -80,7 +87,8 @@ where
let backend_contact_id: T::ContactID = T::ContactID::from((*contact_id).clone());
let display_name = self.backend.get_contact_display_name(&backend_contact_id);
if let Some(ref display_name) = display_name {
self.display_name_cache.insert(contact_id.to_string(), display_name.clone());
self.display_name_cache
.insert(contact_id.to_string(), display_name.clone());
}
display_name

View File

@@ -522,12 +522,18 @@ impl Daemon {
.await?
{
for p in &saved.participants {
if let DbParticipant::Remote { handle, contact_id: None } = p {
if let DbParticipant::Remote {
handle,
contact_id: None,
} = p
{
log::trace!(target: target::SYNC, "Resolving contact id for participant: {}", handle);
if let Some(contact) = contact_resolver.resolve_contact_id(handle) {
log::trace!(target: target::SYNC, "Resolved contact id for participant: {}", contact);
let _ = database
.with_repository(|r| r.update_participant_contact(&handle, &contact))
.with_repository(|r| {
r.update_participant_contact(&handle, &contact)
})
.await;
} else {
log::trace!(target: target::SYNC, "No contact id found for participant: {}", handle);
@@ -620,7 +626,10 @@ impl Daemon {
// the typing indicator or stuff like that. In the future, we need to move to ChatItems instead of Messages.
let insertable_messages: Vec<kordophone::model::Message> = messages
.into_iter()
.filter(|m| !m.text.is_empty() && !m.text.trim().is_empty())
.filter(|m| {
(!m.text.is_empty() && !m.text.trim().is_empty())
|| !m.file_transfer_guids.is_empty()
})
.collect();
let db_messages: Vec<kordophone_db::models::Message> = insertable_messages
@@ -663,11 +672,11 @@ impl Daemon {
signal_sender: &Sender<Signal>,
) -> Result<()> {
log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid);
let updated = database.with_repository(|r| r.merge_conversation_metadata(conversation)).await?;
let updated = database
.with_repository(|r| r.merge_conversation_metadata(conversation))
.await?;
if updated {
signal_sender
.send(Signal::ConversationsUpdated)
.await?;
signal_sender.send(Signal::ConversationsUpdated).await?;
}
Ok(())
@@ -692,7 +701,13 @@ impl Daemon {
.ok_or(DaemonError::ClientNotConfigured)?;
let client = HTTPAPIClient::new(
server_url.parse().unwrap(),
match server_url.parse() {
Ok(url) => url,
Err(_) => {
log::error!(target: target::DAEMON, "Invalid server URL: {}", server_url);
return Err(DaemonError::ClientNotConfigured.into());
}
},
DatabaseAuthenticationStore::new(database.clone()),
);

View File

@@ -177,7 +177,10 @@ impl From<Participant> for DbParticipant {
fn from(participant: Participant) -> Self {
match participant {
Participant::Me => DbParticipant::Me,
Participant::Remote { handle, contact_id } => DbParticipant::Remote { handle, contact_id: contact_id.clone() },
Participant::Remote { handle, contact_id } => DbParticipant::Remote {
handle,
contact_id: contact_id.clone(),
},
}
}
}
}

View File

@@ -65,8 +65,9 @@ impl UpdateMonitor {
UpdateEventData::ConversationChanged(conversation) => {
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
// Explicitly update the unread count, we assume this is fresh from the notification.
let db_conversation: kordophone_db::models::Conversation = conversation.clone().into();
// Explicitly update the unread count, we assume this is fresh from the notification.
let db_conversation: kordophone_db::models::Conversation =
conversation.clone().into();
self.send_event(|r| Event::UpdateConversationMetadata(db_conversation, r))
.await
.unwrap_or_else(|e| {

View File

@@ -1,15 +1,15 @@
use dbus::arg;
use dbus_tree::MethodErr;
use std::{future::Future, thread};
use std::sync::Arc;
use std::{future::Future, thread};
use tokio::sync::{mpsc, oneshot, Mutex};
use crate::daemon::{
use kordophoned::daemon::{
contact_resolver::{ContactResolver, DefaultContactResolverBackend},
events::{Event, Reply},
settings::Settings,
signals::Signal,
DaemonResult,
contact_resolver::{ContactResolver, DefaultContactResolverBackend},
};
use kordophone_db::models::participant::Participant;
@@ -37,7 +37,8 @@ impl DBusAgent {
pub async fn run(self) {
// Establish a session bus connection.
let (resource, connection) = connection::new_session_sync().expect("Failed to connect to session bus");
let (resource, connection) =
connection::new_session_sync().expect("Failed to connect to session bus");
// Ensure the D-Bus resource is polled.
tokio::spawn(async move {
@@ -79,7 +80,10 @@ impl DBusAgent {
Signal::ConversationsUpdated => {
log::debug!("Sending signal: ConversationsUpdated");
registry
.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {})
.send_signal(
interface::OBJECT_PATH,
DbusSignals::ConversationsUpdated {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
@@ -118,7 +122,8 @@ impl DBusAgent {
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!(
"Sending signal: AttachmentUploaded for upload {}, attachment {}",
upload_guid, attachment_guid
upload_guid,
attachment_guid
);
registry
.send_signal(
@@ -154,7 +159,10 @@ impl DBusAgent {
std::future::pending::<()>().await;
}
pub async fn send_event<T>(&self, make_event: impl FnOnce(Reply<T>) -> Event) -> DaemonResult<T> {
pub async fn send_event<T>(
&self,
make_event: impl FnOnce(Reply<T>) -> Event,
) -> DaemonResult<T> {
let (reply_tx, reply_rx) = oneshot::channel();
self.event_sink
.send(make_event(reply_tx))
@@ -172,26 +180,29 @@ impl DBusAgent {
.unwrap()
.map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e)))
}
fn resolve_participant_display_name(&mut self, participant: &Participant) -> String {
match participant {
// Me (we should use a special string here...)
Participant::Me => "(Me)".to_string(),
// Remote participant with a resolved contact_id
Participant::Remote { handle, contact_id: Some(contact_id), .. } => {
self.contact_resolver.get_contact_display_name(contact_id).unwrap_or_else(|| handle.clone())
}
Participant::Remote {
handle,
contact_id: Some(contact_id),
..
} => self
.contact_resolver
.get_contact_display_name(contact_id)
.unwrap_or_else(|| handle.clone()),
// Remote participant without a resolved contact_id
Participant::Remote { handle, .. } => {
handle.clone()
}
Participant::Remote { handle, .. } => handle.clone(),
}
}
}
//
//
// D-Bus repository interface implementation
//
@@ -203,9 +214,12 @@ impl DbusRepository for DBusAgent {
self.send_event_sync(Event::GetVersion)
}
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<arg::PropMap>, MethodErr> {
self
.send_event_sync(|r| Event::GetAllConversations(limit, offset, r))
fn get_conversations(
&mut self,
limit: i32,
offset: i32,
) -> Result<Vec<arg::PropMap>, MethodErr> {
self.send_event_sync(|r| Event::GetAllConversations(limit, offset, r))
.map(|conversations| {
conversations
.into_iter()
@@ -243,7 +257,6 @@ impl DbusRepository for DBusAgent {
})
}
fn sync_conversation_list(&mut self) -> Result<(), MethodErr> {
self.send_event_sync(Event::SyncConversationList)
}
@@ -260,15 +273,18 @@ impl DbusRepository for DBusAgent {
self.send_event_sync(|r| Event::MarkConversationAsRead(conversation_id, r))
}
fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result<Vec<arg::PropMap>, MethodErr> {
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: String,
) -> Result<Vec<arg::PropMap>, MethodErr> {
let last_message_id_opt = if last_message_id.is_empty() {
None
} else {
Some(last_message_id)
};
self
.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))
.map(|messages| {
messages
.into_iter()
@@ -286,7 +302,9 @@ impl DbusRepository for DBusAgent {
);
map.insert(
"sender".into(),
arg::Variant(Box::new(self.resolve_participant_display_name(&msg.sender.into()))),
arg::Variant(Box::new(
self.resolve_participant_display_name(&msg.sender.into()),
)),
);
// Attachments array
@@ -312,7 +330,9 @@ impl DbusRepository for DBusAgent {
);
attachment_map.insert(
"preview_path".into(),
arg::Variant(Box::new(preview_path.to_string_lossy().to_string())),
arg::Variant(Box::new(
preview_path.to_string_lossy().to_string(),
)),
);
attachment_map.insert(
"downloaded".into(),
@@ -374,27 +394,34 @@ impl DbusRepository for DBusAgent {
text: String,
attachment_guids: Vec<String>,
) -> Result<String, MethodErr> {
self
.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
self.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
.map(|uuid| uuid.to_string())
}
fn get_attachment_info(&mut self, attachment_id: String) -> Result<(String, String, bool, bool), MethodErr> {
self.send_event_sync(|r| Event::GetAttachment(attachment_id, r)).map(|attachment| {
let path = attachment.get_path_for_preview(false);
let downloaded = attachment.is_downloaded(false);
let preview_path = attachment.get_path_for_preview(true);
let preview_downloaded = attachment.is_downloaded(true);
(
path.to_string_lossy().to_string(),
preview_path.to_string_lossy().to_string(),
downloaded,
preview_downloaded,
)
})
fn get_attachment_info(
&mut self,
attachment_id: String,
) -> Result<(String, String, bool, bool), MethodErr> {
self.send_event_sync(|r| Event::GetAttachment(attachment_id, r))
.map(|attachment| {
let path = attachment.get_path_for_preview(false);
let downloaded = attachment.is_downloaded(false);
let preview_path = attachment.get_path_for_preview(true);
let preview_downloaded = attachment.is_downloaded(true);
(
path.to_string_lossy().to_string(),
preview_path.to_string_lossy().to_string(),
downloaded,
preview_downloaded,
)
})
}
fn download_attachment(&mut self, attachment_id: String, preview: bool) -> Result<(), MethodErr> {
fn download_attachment(
&mut self,
attachment_id: String,
preview: bool,
) -> Result<(), MethodErr> {
self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r))
}
@@ -482,4 +509,4 @@ where
.join()
})
.expect("Error joining runtime thread")
}
}

View File

@@ -1,5 +1,5 @@
pub mod endpoint;
pub mod agent;
pub mod endpoint;
pub mod interface {
#![allow(unused)]

1
kordophoned/src/lib.rs Normal file
View File

@@ -0,0 +1 @@
pub mod daemon;

View File

@@ -1,12 +1,13 @@
mod daemon;
#[cfg(target_os = "linux")]
mod dbus;
#[cfg(target_os = "macos")]
mod xpc;
use log::LevelFilter;
use std::future;
use daemon::Daemon;
use kordophoned::daemon::Daemon;
fn initialize_logging() {
// Weird: is this the best way to do this?
@@ -33,7 +34,17 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
#[cfg(target_os = "macos")]
async fn start_ipc_agent(daemon: &mut Daemon) {
// TODO: Implement macOS IPC agent.
// Start the macOS XPC agent (events in, signals out) on a dedicated thread.
let agent =
xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
std::thread::spawn(move || {
// Use a single-threaded Tokio runtime for the XPC agent.
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Unable to create tokio runtime for XPC agent");
rt.block_on(agent.run());
});
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]

View File

@@ -0,0 +1,186 @@
use crate::xpc::interface::SERVICE_NAME;
use kordophoned::daemon::{events::Event, signals::Signal, DaemonResult};
use std::ffi::CString;
use std::os::raw::c_char;
use std::ptr;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex};
use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
use xpc_connection_sys as xpc_sys;
pub(super) static LOG_TARGET: &str = "xpc";
/// Wrapper for raw XPC connection pointer to declare cross-thread usage.
/// Safety: libxpc connections are reference-counted and may be used to send from other threads.
#[derive(Copy, Clone)]
pub(crate) struct XpcConn(pub xpc_sys::xpc_connection_t);
unsafe impl Send for XpcConn {}
unsafe impl Sync for XpcConn {}
type Subscribers = Arc<std::sync::Mutex<Vec<XpcConn>>>;
#[derive(Clone)]
pub struct XpcAgent {
event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>,
}
impl XpcAgent {
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self {
Self {
event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
}
}
pub async fn run(self) {
use block::ConcreteBlock;
use std::ops::Deref;
// Construct the Mach service name without a trailing NUL for CString.
let service_name = SERVICE_NAME.trim_end_matches('\0');
let mach_port_name = match CString::new(service_name) {
Ok(c) => c,
Err(e) => {
log::error!(target: LOG_TARGET, "Invalid XPC service name: {e}");
return;
}
};
log::info!(
target: LOG_TARGET,
"Waiting for XPC connections on {}",
service_name
);
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => Arc::new(rt),
Err(e) => {
log::error!(target: LOG_TARGET, "Failed to create Tokio runtime: {}", e);
return;
}
};
let connections: Subscribers = Arc::new(std::sync::Mutex::new(Vec::new()));
{
let receiver_arc = self.signal_receiver.clone();
let conns = connections.clone();
rt.spawn(async move {
let mut receiver = receiver_arc
.lock()
.await
.take()
.expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await {
log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
let msg = super::util::signal_to_message(signal);
let xobj = message_to_xpc_object(msg);
let list = conns.lock().unwrap();
log::trace!(target: LOG_TARGET, "Active XPC clients: {}", list.len());
for c in list.iter() {
log::trace!(target: LOG_TARGET, "Sending signal to client");
unsafe { xpc_sys::xpc_connection_send_message(c.0, xobj) };
}
unsafe { xpc_sys::xpc_release(xobj) };
}
});
}
let service = unsafe {
xpc_sys::xpc_connection_create_mach_service(
mach_port_name.as_ptr(),
ptr::null_mut(),
xpc_sys::XPC_CONNECTION_MACH_SERVICE_LISTENER as u64,
)
};
let agent = self.clone();
let rt_accept = rt.clone();
let conns_accept = connections.clone();
let service_handler = ConcreteBlock::new(move |event: xpc_sys::xpc_object_t| {
unsafe {
let client = event as xpc_sys::xpc_connection_t;
log::trace!(target: LOG_TARGET, "New XPC connection accepted");
let agent_conn = agent.clone();
let rt_conn = rt_accept.clone();
let conns_for_handler = conns_accept.clone();
let conn_handler = ConcreteBlock::new(move |msg: xpc_sys::xpc_object_t| {
match xpc_object_to_message(msg) {
Message::Dictionary(map) => {
let method = super::util::dict_get_str(&map, "method").or_else(|| super::util::dict_get_str(&map, "type")).unwrap_or_else(|| "<unknown>".to_string());
log::trace!(target: LOG_TARGET, "XPC request received: {}", method);
let response = rt_conn.block_on(super::rpc::dispatch(&agent_conn, &conns_for_handler, client, &map));
let reply = xpc_sys::xpc_dictionary_create_reply(msg);
if !reply.is_null() {
let payload = message_to_xpc_object(response);
let apply_block = ConcreteBlock::new(move |key: *const c_char, value: xpc_sys::xpc_object_t| {
xpc_sys::xpc_dictionary_set_value(reply, key, value);
})
.copy();
xpc_sys::xpc_dictionary_apply(payload, apply_block.deref() as *const _ as *mut _);
xpc_sys::xpc_connection_send_message(client, reply);
xpc_sys::xpc_release(payload);
xpc_sys::xpc_release(reply);
log::trace!(target: LOG_TARGET, "XPC reply sent for method: {}", method);
} else {
log::warn!(target: LOG_TARGET, "No reply port for method: {}", method);
}
}
Message::Error(e) => {
match e {
MessageError::ConnectionInvalid => {
let mut list = conns_for_handler.lock().unwrap();
let before = list.len();
list.retain(|c| c.0 != client);
let after = list.len();
if after < before {
log::trace!(target: LOG_TARGET, "Removed closed XPC client from subscribers ({} -> {})", before, after);
} else {
log::debug!(target: LOG_TARGET, "XPC connection closed (no subscription)");
}
}
other => {
log::warn!(target: LOG_TARGET, "XPC error event: {:?}", other);
}
}
}
_ => {}
}
})
.copy();
xpc_sys::xpc_connection_set_event_handler(
client,
conn_handler.deref() as *const _ as *mut _,
);
xpc_sys::xpc_connection_resume(client);
}
})
.copy();
unsafe {
xpc_sys::xpc_connection_set_event_handler(
service,
service_handler.deref() as *const _ as *mut _,
);
xpc_sys::xpc_connection_resume(service);
}
futures_util::future::pending::<()>().await;
}
pub async fn send_event<T>(
&self,
make_event: impl FnOnce(kordophoned::daemon::events::Reply<T>) -> Event,
) -> DaemonResult<T> {
let (tx, rx) = oneshot::channel();
self.event_sink
.send(make_event(tx))
.await
.map_err(|_| "Failed to send event")?;
rx.await.map_err(|_| "Failed to receive reply".into())
}
}

View File

@@ -0,0 +1,5 @@
#![cfg(target_os = "macos")]
//! XPC interface definitions for macOS IPC
/// Mach service name for the XPC interface (must include trailing NUL).
pub const SERVICE_NAME: &str = "net.buzzert.kordophonecd\0";

View File

@@ -0,0 +1,4 @@
pub mod agent;
pub mod interface;
pub mod rpc;
pub mod util;

452
kordophoned/src/xpc/rpc.rs Normal file
View File

@@ -0,0 +1,452 @@
use super::agent::{XpcAgent, XpcConn, LOG_TARGET};
use kordophoned::daemon::events::Event;
use kordophoned::daemon::settings::Settings;
use std::collections::HashMap;
use std::ffi::CString;
use xpc_connection::Message;
use xpc_connection_sys as xpc_sys;
use super::util::*;
pub async fn dispatch(
agent: &XpcAgent,
subscribers: &std::sync::Mutex<Vec<XpcConn>>,
current_client: xpc_sys::xpc_connection_t,
root: &HashMap<CString, Message>,
) -> Message {
let request_id = dict_get_str(root, "request_id");
let method = match dict_get_str(root, "method").or_else(|| dict_get_str(root, "type")) {
Some(m) => m,
None => {
return attach_request_id(
make_error_reply("InvalidRequest", "Missing method/type"),
request_id,
)
}
};
let _arguments = get_dictionary_field(root, "arguments");
let mut response = match method.as_str() {
// GetVersion
"GetVersion" => match agent.send_event(Event::GetVersion).await {
Ok(version) => {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "GetVersionResponse");
dict_put_str(&mut reply, "version", &version);
Message::Dictionary(reply)
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
},
// GetConversations
"GetConversations" => {
let mut limit: i32 = 100;
let mut offset: i32 = 0;
if let Some(args) = get_dictionary_field(root, "arguments") {
if let Some(v) = dict_get_i64_from_str(args, "limit") {
limit = v as i32;
}
if let Some(v) = dict_get_i64_from_str(args, "offset") {
offset = v as i32;
}
}
match agent
.send_event(|r| Event::GetAllConversations(limit, offset, r))
.await
{
Ok(conversations) => {
let mut items: Vec<Message> = Vec::with_capacity(conversations.len());
for conv in conversations {
let mut m: XpcMap = HashMap::new();
dict_put_str(&mut m, "guid", &conv.guid);
dict_put_str(
&mut m,
"display_name",
&conv.display_name.unwrap_or_default(),
);
dict_put_i64_as_str(&mut m, "unread_count", conv.unread_count as i64);
dict_put_str(
&mut m,
"last_message_preview",
&conv.last_message_preview.unwrap_or_default(),
);
let participant_names: Vec<String> = conv
.participants
.into_iter()
.map(|p| p.display_name())
.collect();
m.insert(cstr("participants"), array_from_strs(participant_names));
dict_put_i64_as_str(&mut m, "date", conv.date.and_utc().timestamp());
items.push(Message::Dictionary(m));
}
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "GetConversationsResponse");
reply.insert(cstr("conversations"), Message::Array(items));
Message::Dictionary(reply)
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// Sync ops
"SyncConversationList" => match agent.send_event(Event::SyncConversationList).await {
Ok(()) => make_ok_reply(),
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
},
"SyncAllConversations" => match agent.send_event(Event::SyncAllConversations).await {
Ok(()) => make_ok_reply(),
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
},
"SyncConversation" => {
let conversation_id = match get_dictionary_field(root, "arguments")
.and_then(|m| dict_get_str(m, "conversation_id"))
{
Some(id) => id,
None => return make_error_reply("InvalidRequest", "Missing conversation_id"),
};
match agent
.send_event(|r| Event::SyncConversation(conversation_id, r))
.await
{
Ok(()) => make_ok_reply(),
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// Mark as read
"MarkConversationAsRead" => {
let conversation_id = match get_dictionary_field(root, "arguments")
.and_then(|m| dict_get_str(m, "conversation_id"))
{
Some(id) => id,
None => return make_error_reply("InvalidRequest", "Missing conversation_id"),
};
match agent
.send_event(|r| Event::MarkConversationAsRead(conversation_id, r))
.await
{
Ok(()) => make_ok_reply(),
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// GetMessages
"GetMessages" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return make_error_reply("InvalidRequest", "Missing arguments"),
};
let conversation_id = match dict_get_str(args, "conversation_id") {
Some(id) => id,
None => return make_error_reply("InvalidRequest", "Missing conversation_id"),
};
let last_message_id = dict_get_str(args, "last_message_id");
match agent
.send_event(|r| Event::GetMessages(conversation_id, last_message_id, r))
.await
{
Ok(messages) => {
let mut items: Vec<Message> = Vec::with_capacity(messages.len());
for msg in messages {
let mut m: XpcMap = HashMap::new();
dict_put_str(&mut m, "id", &msg.id);
dict_put_str(&mut m, "text", &msg.text.replace('\u{FFFC}', ""));
dict_put_i64_as_str(&mut m, "date", msg.date.and_utc().timestamp());
dict_put_str(&mut m, "sender", &msg.sender.display_name());
// Include attachment GUIDs for the client to resolve/download
let attachment_guids: Vec<String> = msg
.attachments
.iter()
.map(|a| a.guid.clone())
.collect();
m.insert(cstr("attachment_guids"), array_from_strs(attachment_guids));
// Full attachments array with metadata (mirrors DBus fields)
let mut attachments_items: Vec<Message> = Vec::new();
for attachment in msg.attachments.iter() {
let mut a: XpcMap = HashMap::new();
// Basic identifiers
dict_put_str(&mut a, "guid", &attachment.guid);
// Paths and download status
let path = attachment.get_path_for_preview(false);
let preview_path = attachment.get_path_for_preview(true);
let downloaded = attachment.is_downloaded(false);
let preview_downloaded = attachment.is_downloaded(true);
dict_put_str(&mut a, "path", &path.to_string_lossy());
dict_put_str(&mut a, "preview_path", &preview_path.to_string_lossy());
dict_put_str(&mut a, "downloaded", &downloaded.to_string());
dict_put_str(
&mut a,
"preview_downloaded",
&preview_downloaded.to_string(),
);
// Metadata (optional)
if let Some(metadata) = &attachment.metadata {
let mut metadata_map: XpcMap = HashMap::new();
if let Some(attribution_info) = &metadata.attribution_info {
let mut attribution_map: XpcMap = HashMap::new();
if let Some(width) = attribution_info.width {
dict_put_i64_as_str(&mut attribution_map, "width", width as i64);
}
if let Some(height) = attribution_info.height {
dict_put_i64_as_str(&mut attribution_map, "height", height as i64);
}
metadata_map.insert(cstr("attribution_info"), Message::Dictionary(attribution_map));
}
if !metadata_map.is_empty() {
a.insert(cstr("metadata"), Message::Dictionary(metadata_map));
}
}
attachments_items.push(Message::Dictionary(a));
}
m.insert(cstr("attachments"), Message::Array(attachments_items));
items.push(Message::Dictionary(m));
}
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "GetMessagesResponse");
reply.insert(cstr("messages"), Message::Array(items));
Message::Dictionary(reply)
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// Delete all
"DeleteAllConversations" => match agent.send_event(Event::DeleteAllConversations).await {
Ok(()) => make_ok_reply(),
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
},
// SendMessage
"SendMessage" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return make_error_reply("InvalidRequest", "Missing arguments"),
};
let conversation_id = match dict_get_str(args, "conversation_id") {
Some(v) => v,
None => return make_error_reply("InvalidRequest", "Missing conversation_id"),
};
let text = dict_get_str(args, "text").unwrap_or_default();
let attachment_guids: Vec<String> = match args.get(&cstr("attachment_guids")) {
Some(Message::Array(arr)) => arr
.iter()
.filter_map(|m| match m {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
.collect(),
_ => Vec::new(),
};
match agent
.send_event(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
.await
{
Ok(uuid) => {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "SendMessageResponse");
dict_put_str(&mut reply, "uuid", &uuid.to_string());
Message::Dictionary(reply)
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// GetAttachmentInfo
"GetAttachmentInfo" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return make_error_reply("InvalidRequest", "Missing arguments"),
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return make_error_reply("InvalidRequest", "Missing attachment_id"),
};
match agent
.send_event(|r| Event::GetAttachment(attachment_id, r))
.await
{
Ok(attachment) => {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "GetAttachmentInfoResponse");
dict_put_str(
&mut reply,
"path",
&attachment.get_path_for_preview(false).to_string_lossy(),
);
dict_put_str(
&mut reply,
"preview_path",
&attachment.get_path_for_preview(true).to_string_lossy(),
);
dict_put_str(
&mut reply,
"downloaded",
&attachment.is_downloaded(false).to_string(),
);
dict_put_str(
&mut reply,
"preview_downloaded",
&attachment.is_downloaded(true).to_string(),
);
Message::Dictionary(reply)
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// OpenAttachmentFd (return file descriptor in reply)
"OpenAttachmentFd" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return make_error_reply("InvalidRequest", "Missing arguments"),
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return make_error_reply("InvalidRequest", "Missing attachment_id"),
};
let preview = dict_get_str(args, "preview")
.map(|s| s == "true")
.unwrap_or(false);
match agent
.send_event(|r| Event::GetAttachment(attachment_id, r))
.await
{
Ok(attachment) => {
use std::os::fd::AsRawFd;
let path = attachment.get_path_for_preview(preview);
match std::fs::OpenOptions::new().read(true).open(&path) {
Ok(file) => {
let fd = file.as_raw_fd();
// Keep file alive until after conversion to XPC
std::mem::forget(file);
// Return file descriptor in reply
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "OpenAttachmentFdResponse");
reply.insert(cstr("fd"), Message::Fd(fd));
Message::Dictionary(reply)
}
Err(e) => make_error_reply("OpenFailed", &format!("{}", e)),
}
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// DownloadAttachment
"DownloadAttachment" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return make_error_reply("InvalidRequest", "Missing arguments"),
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return make_error_reply("InvalidRequest", "Missing attachment_id"),
};
let preview = dict_get_str(args, "preview")
.map(|s| s == "true")
.unwrap_or(false);
match agent
.send_event(|r| Event::DownloadAttachment(attachment_id, preview, r))
.await
{
Ok(()) => make_ok_reply(),
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// UploadAttachment
"UploadAttachment" => {
use std::path::PathBuf;
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return make_error_reply("InvalidRequest", "Missing arguments"),
};
let path = match dict_get_str(args, "path") {
Some(v) => v,
None => return make_error_reply("InvalidRequest", "Missing path"),
};
match agent
.send_event(|r| Event::UploadAttachment(PathBuf::from(path), r))
.await
{
Ok(upload_guid) => {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "UploadAttachmentResponse");
dict_put_str(&mut reply, "upload_guid", &upload_guid);
Message::Dictionary(reply)
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// Settings
"GetAllSettings" => match agent.send_event(Event::GetAllSettings).await {
Ok(settings) => {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "GetAllSettingsResponse");
dict_put_str(
&mut reply,
"server_url",
&settings.server_url.unwrap_or_default(),
);
dict_put_str(
&mut reply,
"username",
&settings.username.unwrap_or_default(),
);
Message::Dictionary(reply)
}
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
},
"UpdateSettings" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return make_error_reply("InvalidRequest", "Missing arguments"),
};
let server_url = dict_get_str(args, "server_url");
let username = dict_get_str(args, "username");
let settings = Settings {
server_url,
username,
token: None,
};
match agent
.send_event(|r| Event::UpdateSettings(settings, r))
.await
{
Ok(()) => make_ok_reply(),
Err(e) => make_error_reply("DaemonError", &format!("{}", e)),
}
}
// Subscribe
"SubscribeSignals" => {
let mut list = subscribers.lock().unwrap();
if !list.iter().any(|c| c.0 == current_client) {
list.push(XpcConn(current_client));
log::trace!(target: LOG_TARGET, "Client subscribed to signals (total subscribers: {})", list.len());
}
make_ok_reply()
}
// Unknown method fallback
other => make_error_reply("UnknownMethod", other),
};
response = attach_request_id(response, request_id);
response
}

100
kordophoned/src/xpc/util.rs Normal file
View File

@@ -0,0 +1,100 @@
use kordophoned::daemon::signals::Signal;
use std::collections::HashMap;
use std::ffi::CString;
use xpc_connection::Message;
pub type XpcMap = HashMap<CString, Message>;
pub fn cstr(s: &str) -> CString {
CString::new(s).unwrap_or_else(|_| CString::new("").unwrap())
}
pub fn get_dictionary_field<'a>(
map: &'a HashMap<CString, Message>,
key: &str,
) -> Option<&'a HashMap<CString, Message>> {
let k = CString::new(key).ok()?;
map.get(&k).and_then(|v| match v {
Message::Dictionary(d) => Some(d),
_ => None,
})
}
pub fn dict_get_str(map: &HashMap<CString, Message>, key: &str) -> Option<String> {
let k = CString::new(key).ok()?;
match map.get(&k) {
Some(Message::String(v)) => Some(v.to_string_lossy().into_owned()),
_ => None,
}
}
pub fn dict_get_i64_from_str(map: &HashMap<CString, Message>, key: &str) -> Option<i64> {
dict_get_str(map, key).and_then(|s| s.parse::<i64>().ok())
}
pub fn dict_put_str(map: &mut XpcMap, key: &str, value: impl AsRef<str>) {
map.insert(cstr(key), Message::String(cstr(value.as_ref())));
}
pub fn dict_put_i64_as_str(map: &mut XpcMap, key: &str, value: i64) {
dict_put_str(map, key, value.to_string());
}
pub fn array_from_strs(values: impl IntoIterator<Item = String>) -> Message {
let arr = values
.into_iter()
.map(|s| Message::String(cstr(&s)))
.collect();
Message::Array(arr)
}
pub fn make_ok_reply() -> Message {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "Ok");
Message::Dictionary(reply)
}
pub fn make_error_reply(code: &str, message: &str) -> Message {
let mut reply: HashMap<CString, Message> = HashMap::new();
reply.insert(cstr("type"), Message::String(cstr("Error")));
reply.insert(cstr("error"), Message::String(cstr(code)));
reply.insert(cstr("message"), Message::String(cstr(message)));
Message::Dictionary(reply)
}
pub fn attach_request_id(mut message: Message, request_id: Option<String>) -> Message {
if let (Some(id), Message::Dictionary(ref mut m)) = (request_id, &mut message) {
dict_put_str(m, "request_id", &id);
}
message
}
pub fn signal_to_message(signal: Signal) -> Message {
let mut root: XpcMap = HashMap::new();
let mut args: XpcMap = HashMap::new();
match signal {
Signal::ConversationsUpdated => {
dict_put_str(&mut root, "name", "ConversationsUpdated");
}
Signal::MessagesUpdated(conversation_id) => {
dict_put_str(&mut root, "name", "MessagesUpdated");
dict_put_str(&mut args, "conversation_id", &conversation_id);
}
Signal::AttachmentDownloaded(attachment_id) => {
dict_put_str(&mut root, "name", "AttachmentDownloadCompleted");
dict_put_str(&mut args, "attachment_id", &attachment_id);
}
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
dict_put_str(&mut root, "name", "AttachmentUploadCompleted");
dict_put_str(&mut args, "upload_guid", &upload_guid);
dict_put_str(&mut args, "attachment_guid", &attachment_guid);
}
Signal::UpdateStreamReconnected => {
dict_put_str(&mut root, "name", "UpdateStreamReconnected");
}
}
if !args.is_empty() {
root.insert(cstr("arguments"), Message::Dictionary(args));
}
Message::Dictionary(root)
}