Private
Public Access
1
0

first pass at xpc impl

This commit is contained in:
James Magahern
2025-08-01 12:26:17 -07:00
parent 43b668e9a2
commit 911454aafb
29 changed files with 761 additions and 141 deletions

View File

@@ -21,14 +21,21 @@ tokio-condvar = "0.3.0"
uuid = "1.16.0"
once_cell = "1.19.0"
[target.'cfg(target_os = "linux")'.dependencies]
# D-Bus dependencies only on Linux
[target.'cfg(target_os = "linux")'.dependencies]
dbus = { version = "0.9.7", features = ["futures"] }
dbus-crossroads = "0.5.2"
dbus-tokio = "0.7.6"
dbus-tree = "0.9.2"
[target.'cfg(target_os = "linux")'.build-dependencies]
# D-Bus codegen only on Linux
[target.'cfg(target_os = "linux")'.build-dependencies]
dbus-codegen = "0.10.0"
dbus-crossroads = "0.5.1"
# XPC (libxpc) interface for macOS IPC
[target.'cfg(target_os = "macos")'.dependencies]
futures-preview = "=0.2.2"
xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" }
xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" }
serde = { version = "1.0", features = ["derive"] }

View File

@@ -20,8 +20,8 @@ fn main() {
let xml = std::fs::read_to_string(KORDOPHONE_XML).expect("Error reading server dbus interface");
let output = dbus_codegen::generate(&xml, &opts)
.expect("Error generating server dbus interface");
let output =
dbus_codegen::generate(&xml, &opts).expect("Error generating server dbus interface");
std::fs::write(out_path, output).expect("Error writing server dbus code");

View File

@@ -1,10 +1,10 @@
use super::ContactResolverBackend;
use dbus::blocking::Connection;
use dbus::arg::{RefArg, Variant};
use dbus::blocking::Connection;
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use std::time::Duration;
use std::sync::Mutex;
use std::time::Duration;
#[derive(Clone)]
pub struct EDSContactResolverBackend;
@@ -40,8 +40,13 @@ static ADDRESS_BOOK_HANDLE: OnceCell<Mutex<AddressBookHandle>> = OnceCell::new()
/// Check whether a given well-known name currently has an owner on the bus.
fn name_has_owner(conn: &Connection, name: &str) -> bool {
let proxy = conn.with_proxy("org.freedesktop.DBus", "/org/freedesktop/DBus", Duration::from_secs(2));
let result: Result<(bool,), _> = proxy.method_call("org.freedesktop.DBus", "NameHasOwner", (name.to_string(),));
let proxy = conn.with_proxy(
"org.freedesktop.DBus",
"/org/freedesktop/DBus",
Duration::from_secs(2),
);
let result: Result<(bool,), _> =
proxy.method_call("org.freedesktop.DBus", "NameHasOwner", (name.to_string(),));
result.map(|(b,)| b).unwrap_or(false)
}
@@ -99,10 +104,7 @@ fn ensure_address_book_uid(conn: &Connection) -> anyhow::Result<String> {
// The GetManagedObjects reply is the usual ObjectManager map.
let (managed_objects,): (
HashMap<
dbus::Path<'static>,
HashMap<String, HashMap<String, Variant<Box<dyn RefArg>>>>,
>,
HashMap<dbus::Path<'static>, HashMap<String, HashMap<String, Variant<Box<dyn RefArg>>>>>,
) = source_manager_proxy.method_call(
"org.freedesktop.DBus.ObjectManager",
"GetManagedObjects",
@@ -153,10 +155,7 @@ fn data_contains_address_book_backend(data: &str) -> bool {
/// Open the Evolution address book referenced by `source_uid` and return the
/// pair `(object_path, bus_name)` that identifies the newly created D-Bus
/// proxy.
fn open_address_book(
conn: &Connection,
source_uid: &str,
) -> anyhow::Result<(String, String)> {
fn open_address_book(conn: &Connection, source_uid: &str) -> anyhow::Result<(String, String)> {
let factory_proxy = conn.with_proxy(
"org.gnome.evolution.dataserver.AddressBook10",
"/org/gnome/evolution/dataserver/AddressBookFactory",
@@ -177,11 +176,8 @@ fn open_address_book(
/// calls the `Open` method once per process. We ignore any error here
/// because the backend might already be open.
fn ensure_address_book_open(proxy: &dbus::blocking::Proxy<&Connection>) {
let _: Result<(), _> = proxy.method_call(
"org.gnome.evolution.dataserver.AddressBook",
"Open",
(),
);
let _: Result<(), _> =
proxy.method_call("org.gnome.evolution.dataserver.AddressBook", "Open", ());
}
impl ContactResolverBackend for EDSContactResolverBackend {
@@ -295,4 +291,4 @@ impl Default for EDSContactResolverBackend {
fn default() -> Self {
Self
}
}
}

View File

@@ -13,4 +13,4 @@ impl ContactResolverBackend for GenericContactResolverBackend {
fn get_contact_display_name(&self, contact_id: &Self::ContactID) -> Option<String> {
None
}
}
}

View File

@@ -15,7 +15,9 @@ pub struct EDSContactResolverBackend;
#[cfg(not(target_os = "linux"))]
impl Default for EDSContactResolverBackend {
fn default() -> Self { EDSContactResolverBackend }
fn default() -> Self {
EDSContactResolverBackend
}
}
#[cfg(not(target_os = "linux"))]
@@ -56,7 +58,11 @@ where
T: Default,
{
pub fn new(backend: T) -> Self {
Self { backend, display_name_cache: HashMap::new(), contact_id_cache: HashMap::new() }
Self {
backend,
display_name_cache: HashMap::new(),
contact_id_cache: HashMap::new(),
}
}
pub fn resolve_contact_id(&mut self, address: &str) -> Option<AnyContactID> {
@@ -66,7 +72,8 @@ where
let id = self.backend.resolve_contact_id(address).map(|id| id.into());
if let Some(ref id) = id {
self.contact_id_cache.insert(address.to_string(), id.clone());
self.contact_id_cache
.insert(address.to_string(), id.clone());
}
id
@@ -80,7 +87,8 @@ where
let backend_contact_id: T::ContactID = T::ContactID::from((*contact_id).clone());
let display_name = self.backend.get_contact_display_name(&backend_contact_id);
if let Some(ref display_name) = display_name {
self.display_name_cache.insert(contact_id.to_string(), display_name.clone());
self.display_name_cache
.insert(contact_id.to_string(), display_name.clone());
}
display_name

View File

@@ -522,12 +522,18 @@ impl Daemon {
.await?
{
for p in &saved.participants {
if let DbParticipant::Remote { handle, contact_id: None } = p {
if let DbParticipant::Remote {
handle,
contact_id: None,
} = p
{
log::trace!(target: target::SYNC, "Resolving contact id for participant: {}", handle);
if let Some(contact) = contact_resolver.resolve_contact_id(handle) {
log::trace!(target: target::SYNC, "Resolved contact id for participant: {}", contact);
let _ = database
.with_repository(|r| r.update_participant_contact(&handle, &contact))
.with_repository(|r| {
r.update_participant_contact(&handle, &contact)
})
.await;
} else {
log::trace!(target: target::SYNC, "No contact id found for participant: {}", handle);
@@ -663,11 +669,11 @@ impl Daemon {
signal_sender: &Sender<Signal>,
) -> Result<()> {
log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid);
let updated = database.with_repository(|r| r.merge_conversation_metadata(conversation)).await?;
let updated = database
.with_repository(|r| r.merge_conversation_metadata(conversation))
.await?;
if updated {
signal_sender
.send(Signal::ConversationsUpdated)
.await?;
signal_sender.send(Signal::ConversationsUpdated).await?;
}
Ok(())

View File

@@ -177,7 +177,10 @@ impl From<Participant> for DbParticipant {
fn from(participant: Participant) -> Self {
match participant {
Participant::Me => DbParticipant::Me,
Participant::Remote { handle, contact_id } => DbParticipant::Remote { handle, contact_id: contact_id.clone() },
Participant::Remote { handle, contact_id } => DbParticipant::Remote {
handle,
contact_id: contact_id.clone(),
},
}
}
}
}

View File

@@ -65,8 +65,9 @@ impl UpdateMonitor {
UpdateEventData::ConversationChanged(conversation) => {
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
// Explicitly update the unread count, we assume this is fresh from the notification.
let db_conversation: kordophone_db::models::Conversation = conversation.clone().into();
// Explicitly update the unread count, we assume this is fresh from the notification.
let db_conversation: kordophone_db::models::Conversation =
conversation.clone().into();
self.send_event(|r| Event::UpdateConversationMetadata(db_conversation, r))
.await
.unwrap_or_else(|e| {

View File

@@ -1,15 +1,15 @@
use dbus::arg;
use dbus_tree::MethodErr;
use std::{future::Future, thread};
use std::sync::Arc;
use std::{future::Future, thread};
use tokio::sync::{mpsc, oneshot, Mutex};
use crate::daemon::{
contact_resolver::{ContactResolver, DefaultContactResolverBackend},
events::{Event, Reply},
settings::Settings,
signals::Signal,
DaemonResult,
contact_resolver::{ContactResolver, DefaultContactResolverBackend},
};
use kordophone_db::models::participant::Participant;
@@ -37,7 +37,8 @@ impl DBusAgent {
pub async fn run(self) {
// Establish a session bus connection.
let (resource, connection) = connection::new_session_sync().expect("Failed to connect to session bus");
let (resource, connection) =
connection::new_session_sync().expect("Failed to connect to session bus");
// Ensure the D-Bus resource is polled.
tokio::spawn(async move {
@@ -79,7 +80,10 @@ impl DBusAgent {
Signal::ConversationsUpdated => {
log::debug!("Sending signal: ConversationsUpdated");
registry
.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {})
.send_signal(
interface::OBJECT_PATH,
DbusSignals::ConversationsUpdated {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
@@ -118,7 +122,8 @@ impl DBusAgent {
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!(
"Sending signal: AttachmentUploaded for upload {}, attachment {}",
upload_guid, attachment_guid
upload_guid,
attachment_guid
);
registry
.send_signal(
@@ -154,7 +159,10 @@ impl DBusAgent {
std::future::pending::<()>().await;
}
pub async fn send_event<T>(&self, make_event: impl FnOnce(Reply<T>) -> Event) -> DaemonResult<T> {
pub async fn send_event<T>(
&self,
make_event: impl FnOnce(Reply<T>) -> Event,
) -> DaemonResult<T> {
let (reply_tx, reply_rx) = oneshot::channel();
self.event_sink
.send(make_event(reply_tx))
@@ -172,26 +180,29 @@ impl DBusAgent {
.unwrap()
.map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e)))
}
fn resolve_participant_display_name(&mut self, participant: &Participant) -> String {
match participant {
// Me (we should use a special string here...)
Participant::Me => "(Me)".to_string(),
// Remote participant with a resolved contact_id
Participant::Remote { handle, contact_id: Some(contact_id), .. } => {
self.contact_resolver.get_contact_display_name(contact_id).unwrap_or_else(|| handle.clone())
}
Participant::Remote {
handle,
contact_id: Some(contact_id),
..
} => self
.contact_resolver
.get_contact_display_name(contact_id)
.unwrap_or_else(|| handle.clone()),
// Remote participant without a resolved contact_id
Participant::Remote { handle, .. } => {
handle.clone()
}
Participant::Remote { handle, .. } => handle.clone(),
}
}
}
//
//
// D-Bus repository interface implementation
//
@@ -203,9 +214,12 @@ impl DbusRepository for DBusAgent {
self.send_event_sync(Event::GetVersion)
}
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<arg::PropMap>, MethodErr> {
self
.send_event_sync(|r| Event::GetAllConversations(limit, offset, r))
fn get_conversations(
&mut self,
limit: i32,
offset: i32,
) -> Result<Vec<arg::PropMap>, MethodErr> {
self.send_event_sync(|r| Event::GetAllConversations(limit, offset, r))
.map(|conversations| {
conversations
.into_iter()
@@ -243,7 +257,6 @@ impl DbusRepository for DBusAgent {
})
}
fn sync_conversation_list(&mut self) -> Result<(), MethodErr> {
self.send_event_sync(Event::SyncConversationList)
}
@@ -260,15 +273,18 @@ impl DbusRepository for DBusAgent {
self.send_event_sync(|r| Event::MarkConversationAsRead(conversation_id, r))
}
fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result<Vec<arg::PropMap>, MethodErr> {
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: String,
) -> Result<Vec<arg::PropMap>, MethodErr> {
let last_message_id_opt = if last_message_id.is_empty() {
None
} else {
Some(last_message_id)
};
self
.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))
.map(|messages| {
messages
.into_iter()
@@ -286,7 +302,9 @@ impl DbusRepository for DBusAgent {
);
map.insert(
"sender".into(),
arg::Variant(Box::new(self.resolve_participant_display_name(&msg.sender.into()))),
arg::Variant(Box::new(
self.resolve_participant_display_name(&msg.sender.into()),
)),
);
// Attachments array
@@ -312,7 +330,9 @@ impl DbusRepository for DBusAgent {
);
attachment_map.insert(
"preview_path".into(),
arg::Variant(Box::new(preview_path.to_string_lossy().to_string())),
arg::Variant(Box::new(
preview_path.to_string_lossy().to_string(),
)),
);
attachment_map.insert(
"downloaded".into(),
@@ -374,27 +394,34 @@ impl DbusRepository for DBusAgent {
text: String,
attachment_guids: Vec<String>,
) -> Result<String, MethodErr> {
self
.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
self.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
.map(|uuid| uuid.to_string())
}
fn get_attachment_info(&mut self, attachment_id: String) -> Result<(String, String, bool, bool), MethodErr> {
self.send_event_sync(|r| Event::GetAttachment(attachment_id, r)).map(|attachment| {
let path = attachment.get_path_for_preview(false);
let downloaded = attachment.is_downloaded(false);
let preview_path = attachment.get_path_for_preview(true);
let preview_downloaded = attachment.is_downloaded(true);
(
path.to_string_lossy().to_string(),
preview_path.to_string_lossy().to_string(),
downloaded,
preview_downloaded,
)
})
fn get_attachment_info(
&mut self,
attachment_id: String,
) -> Result<(String, String, bool, bool), MethodErr> {
self.send_event_sync(|r| Event::GetAttachment(attachment_id, r))
.map(|attachment| {
let path = attachment.get_path_for_preview(false);
let downloaded = attachment.is_downloaded(false);
let preview_path = attachment.get_path_for_preview(true);
let preview_downloaded = attachment.is_downloaded(true);
(
path.to_string_lossy().to_string(),
preview_path.to_string_lossy().to_string(),
downloaded,
preview_downloaded,
)
})
}
fn download_attachment(&mut self, attachment_id: String, preview: bool) -> Result<(), MethodErr> {
fn download_attachment(
&mut self,
attachment_id: String,
preview: bool,
) -> Result<(), MethodErr> {
self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r))
}
@@ -482,4 +509,4 @@ where
.join()
})
.expect("Error joining runtime thread")
}
}

View File

@@ -1,5 +1,5 @@
pub mod endpoint;
pub mod agent;
pub mod endpoint;
pub mod interface {
#![allow(unused)]

View File

@@ -3,6 +3,9 @@ mod daemon;
#[cfg(target_os = "linux")]
mod dbus;
#[cfg(target_os = "macos")]
mod xpc;
use log::LevelFilter;
use std::future;
@@ -33,7 +36,16 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
#[cfg(target_os = "macos")]
async fn start_ipc_agent(daemon: &mut Daemon) {
// TODO: Implement macOS IPC agent.
// Start the macOS XPC agent (events in, signals out) on a dedicated thread.
let agent = xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
std::thread::spawn(move || {
// Use a single-threaded Tokio runtime for the XPC agent.
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Unable to create tokio runtime for XPC agent");
rt.block_on(agent.run());
});
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]

View File

@@ -0,0 +1,38 @@
use crate::daemon::{events::Event, signals::Signal, DaemonResult};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex};
/// XPC IPC agent that forwards daemon events and signals over libxpc.
#[derive(Clone)]
pub struct XpcAgent {
event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>,
}
impl XpcAgent {
/// Create a new XPC agent with an event sink and signal receiver.
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self {
Self {
event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
}
}
/// Run the XPC agent: perform a basic GetVersion IPC call to the daemon and print the result.
pub async fn run(self) {
todo!()
}
/// Send an event to the daemon and await its reply.
pub async fn send_event<T>(
&self,
make_event: impl FnOnce(crate::daemon::events::Reply<T>) -> Event,
) -> DaemonResult<T> {
let (tx, rx) = oneshot::channel();
self.event_sink
.send(make_event(tx))
.await
.map_err(|_| "Failed to send event")?;
rx.await.map_err(|_| "Failed to receive reply".into())
}
}

View File

@@ -0,0 +1,27 @@
#![cfg(target_os = "macos")]
//! XPC registry for registering handlers and emitting signals.
/// Registry for XPC message handlers and signal emission.
pub struct XpcRegistry;
impl XpcRegistry {
/// Create a new XPC registry for the service.
pub fn new() -> Self {
XpcRegistry
}
/// Register a handler for incoming messages at a given endpoint.
pub fn register_handler<F>(&self, _name: &str, _handler: F)
where
F: Fn(&[u8]) -> Vec<u8> + Send + Sync + 'static,
{
// TODO: Implement handler registration over libxpc using SERVICE_NAME
let _ = (_name, _handler);
}
/// Send a signal (notification) to connected clients.
pub fn send_signal<T: serde::Serialize>(&self, _signal: &str, _data: &T) {
// TODO: Serialize and send signal over XPC
let _ = (_signal, _data);
}
}

View File

@@ -0,0 +1,8 @@
#![cfg(target_os = "macos")]
//! XPC interface definitions for macOS IPC
/// Mach service name for the XPC interface (must include trailing NUL).
pub const SERVICE_NAME: &str = "net.buzzert.kordophonecd\0";
/// Method names for the XPC interface (must include trailing NUL).
pub const GET_VERSION_METHOD: &str = "GetVersion\0";

View File

@@ -0,0 +1,6 @@
#![cfg(target_os = "macos")]
//! macOS XPC IPC interface modules.
pub mod agent;
pub mod endpoint;
pub mod interface;