From fc69c387c5062fad270260ef4698214962449d5d Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sun, 14 Dec 2025 18:06:14 -0800 Subject: [PATCH] kptui: organize client code into kordophoned-client --- core/Cargo.lock | 20 +- core/Cargo.toml | 1 + core/README.md | 1 + core/kordophoned-client/Cargo.toml | 23 + core/{kptui => kordophoned-client}/build.rs | 0 core/kordophoned-client/src/lib.rs | 5 + core/kordophoned-client/src/platform/linux.rs | 189 ++++++ core/kordophoned-client/src/platform/macos.rs | 233 +++++++ core/kordophoned-client/src/platform/mod.rs | 24 + core/kordophoned-client/src/worker.rs | 133 ++++ core/kptui/Cargo.toml | 16 +- core/kptui/src/daemon/mod.rs | 581 ------------------ core/kptui/src/main.rs | 44 +- 13 files changed, 661 insertions(+), 609 deletions(-) create mode 100644 core/kordophoned-client/Cargo.toml rename core/{kptui => kordophoned-client}/build.rs (100%) create mode 100644 core/kordophoned-client/src/lib.rs create mode 100644 core/kordophoned-client/src/platform/linux.rs create mode 100644 core/kordophoned-client/src/platform/macos.rs create mode 100644 core/kordophoned-client/src/platform/mod.rs create mode 100644 core/kordophoned-client/src/worker.rs delete mode 100644 core/kptui/src/daemon/mod.rs diff --git a/core/Cargo.lock b/core/Cargo.lock index d002331..3cf6e97 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -1315,6 +1315,19 @@ dependencies = [ "xpc-connection-sys", ] +[[package]] +name = "kordophoned-client" +version = "0.1.0" +dependencies = [ + "anyhow", + "block", + "dbus", + "dbus-codegen", + "log", + "xpc-connection", + "xpc-connection-sys", +] + [[package]] name = "kpcli" version = "0.1.0" @@ -1347,16 +1360,11 @@ name = "kptui" version = "0.1.0" dependencies = [ "anyhow", - "block", "crossterm", - "dbus", - "dbus-codegen", - "log", + "kordophoned-client", "ratatui", "time", "unicode-width 0.2.0", - "xpc-connection", - "xpc-connection-sys", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index ba7fb01..0030d23 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -3,6 +3,7 @@ members = [ "kordophone", "kordophone-db", "kordophoned", + "kordophoned-client", "kpcli", "kptui", "utilities", diff --git a/core/README.md b/core/README.md index d58b431..8027456 100644 --- a/core/README.md +++ b/core/README.md @@ -9,6 +9,7 @@ Workspace members: - `kordophoned/` — Client daemon providing local caching and IPC - Linux: D‑Bus - macOS: XPC (see notes below) +- `kordophoned-client/` — Cross-platform client library for talking to `kordophoned` (D-Bus/XPC). - `kpcli/` — Command‑line interface for interacting with the API, DB, and daemon. - `kptui/` — Terminal UI client (Ratatui) for reading and replying to chats via the daemon. - `utilities/` — Small helper tools (e.g., testing utilities). diff --git a/core/kordophoned-client/Cargo.toml b/core/kordophoned-client/Cargo.toml new file mode 100644 index 0000000..b5e8e83 --- /dev/null +++ b/core/kordophoned-client/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "kordophoned-client" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.93" +log = "0.4.22" + +# D-Bus dependencies only on Linux +[target.'cfg(target_os = "linux")'.dependencies] +dbus = "0.9.7" + +# D-Bus codegen only on Linux +[target.'cfg(target_os = "linux")'.build-dependencies] +dbus-codegen = "0.10.0" + +# XPC (libxpc) interface only on macOS +[target.'cfg(target_os = "macos")'.dependencies] +block = "0.1.6" +xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" } +xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" } + diff --git a/core/kptui/build.rs b/core/kordophoned-client/build.rs similarity index 100% rename from core/kptui/build.rs rename to core/kordophoned-client/build.rs diff --git a/core/kordophoned-client/src/lib.rs b/core/kordophoned-client/src/lib.rs new file mode 100644 index 0000000..6e97842 --- /dev/null +++ b/core/kordophoned-client/src/lib.rs @@ -0,0 +1,5 @@ +mod platform; +mod worker; + +pub use worker::{spawn_worker, ChatMessage, ConversationSummary, Event, Request}; + diff --git a/core/kordophoned-client/src/platform/linux.rs b/core/kordophoned-client/src/platform/linux.rs new file mode 100644 index 0000000..b45f6c9 --- /dev/null +++ b/core/kordophoned-client/src/platform/linux.rs @@ -0,0 +1,189 @@ +#![cfg(target_os = "linux")] + +use crate::worker::{ChatMessage, ConversationSummary, DaemonClient, Event}; +use anyhow::Result; +use dbus::arg::{PropMap, RefArg}; +use dbus::blocking::{Connection, Proxy}; +use dbus::channel::Token; +use std::sync::mpsc::Sender; +use std::time::Duration; + +const DBUS_NAME: &str = "net.buzzert.kordophonecd"; +const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon"; + +#[allow(unused)] +mod dbus_interface { + #![allow(unused)] + include!(concat!(env!("OUT_DIR"), "/kordophone-client.rs")); +} +use dbus_interface::NetBuzzertKordophoneRepository as KordophoneRepository; + +pub(crate) struct DBusClient { + conn: Connection, + signal_tokens: Vec, +} + +impl DBusClient { + pub(crate) fn new() -> Result { + Ok(Self { + conn: Connection::new_session()?, + signal_tokens: Vec::new(), + }) + } + + fn proxy(&self) -> Proxy<&Connection> { + self.conn + .with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_millis(5000)) + } +} + +fn get_string(map: &PropMap, key: &str) -> String { + map.get(key) + .and_then(|v| v.0.as_str()) + .unwrap_or_default() + .to_string() +} + +fn get_i64(map: &PropMap, key: &str) -> i64 { + map.get(key).and_then(|v| v.0.as_i64()).unwrap_or(0) +} + +fn get_u32(map: &PropMap, key: &str) -> u32 { + get_i64(map, key).try_into().unwrap_or(0) +} + +fn get_vec_string(map: &PropMap, key: &str) -> Vec { + map.get(key) + .and_then(|v| v.0.as_iter()) + .map(|iter| { + iter.filter_map(|item| item.as_str().map(|s| s.to_string())) + .collect::>() + }) + .unwrap_or_default() +} + +impl DaemonClient for DBusClient { + fn get_conversations(&mut self, limit: i32, offset: i32) -> Result> { + let mut items = KordophoneRepository::get_conversations(&self.proxy(), limit, offset)?; + let mut conversations = items + .drain(..) + .map(|conv| { + let id = get_string(&conv, "guid"); + let display_name = get_string(&conv, "display_name"); + let participants = get_vec_string(&conv, "participants"); + let title = if !display_name.trim().is_empty() { + display_name + } else if participants.is_empty() { + "".to_string() + } else { + participants.join(", ") + }; + + ConversationSummary { + id, + title, + preview: get_string(&conv, "last_message_preview").replace('\n', " "), + unread_count: get_u32(&conv, "unread_count"), + date_unix: get_i64(&conv, "date"), + } + }) + .collect::>(); + + conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix)); + Ok(conversations) + } + + fn get_messages( + &mut self, + conversation_id: String, + last_message_id: Option, + ) -> Result> { + let messages = KordophoneRepository::get_messages( + &self.proxy(), + &conversation_id, + &last_message_id.unwrap_or_default(), + )?; + + Ok(messages + .into_iter() + .map(|msg| ChatMessage { + sender: get_string(&msg, "sender"), + text: get_string(&msg, "text"), + date_unix: get_i64(&msg, "date"), + }) + .collect()) + } + + fn send_message(&mut self, conversation_id: String, text: String) -> Result> { + let attachment_guids: Vec<&str> = vec![]; + let outgoing_id = KordophoneRepository::send_message( + &self.proxy(), + &conversation_id, + &text, + attachment_guids, + )?; + Ok(Some(outgoing_id)) + } + + fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> { + KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id) + .map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {e}")) + } + + fn sync_conversation(&mut self, conversation_id: String) -> Result<()> { + KordophoneRepository::sync_conversation(&self.proxy(), &conversation_id) + .map_err(|e| anyhow::anyhow!("Failed to sync conversation: {e}")) + } + + fn install_signal_handlers(&mut self, event_tx: Sender) -> Result<()> { + let conversations_tx = event_tx.clone(); + let t1 = self + .proxy() + .match_signal( + move |_: dbus_interface::NetBuzzertKordophoneRepositoryConversationsUpdated, + _: &Connection, + _: &dbus::message::Message| { + let _ = conversations_tx.send(Event::ConversationsUpdated); + true + }, + ) + .map_err(|e| anyhow::anyhow!("Failed to match ConversationsUpdated: {e}"))?; + + let messages_tx = event_tx.clone(); + let t2 = self + .proxy() + .match_signal( + move |s: dbus_interface::NetBuzzertKordophoneRepositoryMessagesUpdated, + _: &Connection, + _: &dbus::message::Message| { + let _ = messages_tx.send(Event::MessagesUpdated { + conversation_id: s.conversation_id, + }); + true + }, + ) + .map_err(|e| anyhow::anyhow!("Failed to match MessagesUpdated: {e}"))?; + + let reconnected_tx = event_tx; + let t3 = self + .proxy() + .match_signal( + move |_: dbus_interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected, + _: &Connection, + _: &dbus::message::Message| { + let _ = reconnected_tx.send(Event::UpdateStreamReconnected); + true + }, + ) + .map_err(|e| anyhow::anyhow!("Failed to match UpdateStreamReconnected: {e}"))?; + + self.signal_tokens.extend([t1, t2, t3]); + Ok(()) + } + + fn poll(&mut self, timeout: Duration) -> Result<()> { + self.conn.process(timeout)?; + Ok(()) + } +} + diff --git a/core/kordophoned-client/src/platform/macos.rs b/core/kordophoned-client/src/platform/macos.rs new file mode 100644 index 0000000..94c1de8 --- /dev/null +++ b/core/kordophoned-client/src/platform/macos.rs @@ -0,0 +1,233 @@ +#![cfg(target_os = "macos")] + +use crate::worker::{ChatMessage, ConversationSummary, DaemonClient}; +use anyhow::Result; +use std::collections::HashMap; +use std::ffi::{CStr, CString}; + +use xpc_connection::Message; + +const SERVICE_NAME: &str = "net.buzzert.kordophonecd\0"; + +struct XpcTransport { + connection: xpc_connection_sys::xpc_connection_t, +} + +impl XpcTransport { + fn connect(name: impl AsRef) -> Self { + use xpc_connection_sys::xpc_connection_create_mach_service; + use xpc_connection_sys::xpc_connection_resume; + + let name = name.as_ref(); + let connection = + unsafe { xpc_connection_create_mach_service(name.as_ptr(), std::ptr::null_mut(), 0) }; + + unsafe { xpc_connection_resume(connection) }; + + Self { connection } + } + + fn send_with_reply(&self, message: Message) -> Message { + use xpc_connection::message_to_xpc_object; + use xpc_connection::xpc_object_to_message; + use xpc_connection_sys::{xpc_connection_send_message_with_reply_sync, xpc_release}; + + unsafe { + let xobj = message_to_xpc_object(message); + let reply = xpc_connection_send_message_with_reply_sync(self.connection, xobj); + xpc_release(xobj); + let msg = xpc_object_to_message(reply); + if !reply.is_null() { + xpc_release(reply); + } + msg + } + } +} + +impl Drop for XpcTransport { + fn drop(&mut self) { + use xpc_connection_sys::xpc_object_t; + use xpc_connection_sys::xpc_release; + unsafe { xpc_release(self.connection as xpc_object_t) }; + } +} + +pub(crate) struct XpcClient { + transport: XpcTransport, +} + +impl XpcClient { + pub(crate) fn new() -> Result { + let mach_port_name = CString::new(SERVICE_NAME).unwrap(); + Ok(Self { + transport: XpcTransport::connect(&mach_port_name), + }) + } + + fn key(s: &str) -> CString { + CString::new(s).unwrap() + } + + fn request(name: &str, arguments: Option>) -> Message { + let mut root: HashMap = HashMap::new(); + root.insert(Self::key("name"), Message::String(Self::key(name))); + if let Some(args) = arguments { + root.insert(Self::key("arguments"), Message::Dictionary(args)); + } + Message::Dictionary(root) + } + + fn get_string(map: &HashMap, key: &str) -> Option { + match map.get(&Self::key(key)) { + Some(Message::String(s)) => Some(s.to_string_lossy().into_owned()), + _ => None, + } + } + + fn get_i64_from_str(map: &HashMap, key: &str) -> i64 { + Self::get_string(map, key) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0) + } +} + +impl DaemonClient for XpcClient { + fn get_conversations(&mut self, limit: i32, offset: i32) -> Result> { + let mut args = HashMap::new(); + args.insert(Self::key("limit"), Message::String(Self::key(&limit.to_string()))); + args.insert(Self::key("offset"), Message::String(Self::key(&offset.to_string()))); + + let reply = self + .transport + .send_with_reply(Self::request("GetConversations", Some(args))); + + let Message::Dictionary(map) = reply else { + anyhow::bail!("Unexpected conversations response"); + }; + + let Some(Message::Array(items)) = map.get(&Self::key("conversations")) else { + anyhow::bail!("Missing conversations in response"); + }; + + let mut conversations = Vec::new(); + for item in items { + let Message::Dictionary(conv) = item else { continue }; + let id = Self::get_string(conv, "guid").unwrap_or_default(); + let display_name = Self::get_string(conv, "display_name").unwrap_or_default(); + let preview = Self::get_string(conv, "last_message_preview").unwrap_or_default(); + let unread_count = Self::get_i64_from_str(conv, "unread_count") as u32; + let date_unix = Self::get_i64_from_str(conv, "date"); + + let participants = match conv.get(&Self::key("participants")) { + Some(Message::Array(arr)) => arr + .iter() + .filter_map(|m| match m { + Message::String(s) => Some(s.to_string_lossy().into_owned()), + _ => None, + }) + .collect::>(), + _ => Vec::new(), + }; + + let title = if !display_name.trim().is_empty() { + display_name + } else if participants.is_empty() { + "".to_string() + } else { + participants.join(", ") + }; + + conversations.push(ConversationSummary { + id, + title, + preview: preview.replace('\n', " "), + unread_count, + date_unix, + }); + } + + conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix)); + Ok(conversations) + } + + fn get_messages( + &mut self, + conversation_id: String, + last_message_id: Option, + ) -> Result> { + let mut args = HashMap::new(); + args.insert( + Self::key("conversation_id"), + Message::String(Self::key(&conversation_id)), + ); + if let Some(last) = last_message_id { + args.insert(Self::key("last_message_id"), Message::String(Self::key(&last))); + } + + let reply = self + .transport + .send_with_reply(Self::request("GetMessages", Some(args))); + let Message::Dictionary(map) = reply else { + anyhow::bail!("Unexpected messages response"); + }; + + let Some(Message::Array(items)) = map.get(&Self::key("messages")) else { + anyhow::bail!("Missing messages in response"); + }; + + let mut messages = Vec::new(); + for item in items { + let Message::Dictionary(msg) = item else { continue }; + messages.push(ChatMessage { + sender: Self::get_string(msg, "sender").unwrap_or_default(), + text: Self::get_string(msg, "text").unwrap_or_default(), + date_unix: Self::get_i64_from_str(msg, "date"), + }); + } + Ok(messages) + } + + fn send_message(&mut self, conversation_id: String, text: String) -> Result> { + let mut args = HashMap::new(); + args.insert( + Self::key("conversation_id"), + Message::String(Self::key(&conversation_id)), + ); + args.insert(Self::key("text"), Message::String(Self::key(&text))); + + let reply = self + .transport + .send_with_reply(Self::request("SendMessage", Some(args))); + let Message::Dictionary(map) = reply else { + anyhow::bail!("Unexpected send response"); + }; + + Ok(Self::get_string(&map, "uuid")) + } + + fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> { + let mut args = HashMap::new(); + args.insert( + Self::key("conversation_id"), + Message::String(Self::key(&conversation_id)), + ); + let _ = self + .transport + .send_with_reply(Self::request("MarkConversationAsRead", Some(args))); + Ok(()) + } + + fn sync_conversation(&mut self, conversation_id: String) -> Result<()> { + let mut args = HashMap::new(); + args.insert( + Self::key("conversation_id"), + Message::String(Self::key(&conversation_id)), + ); + let _ = self + .transport + .send_with_reply(Self::request("SyncConversation", Some(args))); + Ok(()) + } +} + diff --git a/core/kordophoned-client/src/platform/mod.rs b/core/kordophoned-client/src/platform/mod.rs new file mode 100644 index 0000000..68e7ab7 --- /dev/null +++ b/core/kordophoned-client/src/platform/mod.rs @@ -0,0 +1,24 @@ +use crate::worker::DaemonClient; +use anyhow::Result; + +#[cfg(target_os = "linux")] +mod linux; + +#[cfg(target_os = "macos")] +mod macos; + +pub(crate) fn new_daemon_client() -> Result> { + #[cfg(target_os = "linux")] + { + Ok(Box::new(linux::DBusClient::new()?)) + } + #[cfg(target_os = "macos")] + { + Ok(Box::new(macos::XpcClient::new()?)) + } + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + { + anyhow::bail!("Unsupported platform") + } +} + diff --git a/core/kordophoned-client/src/worker.rs b/core/kordophoned-client/src/worker.rs new file mode 100644 index 0000000..2f31a2f --- /dev/null +++ b/core/kordophoned-client/src/worker.rs @@ -0,0 +1,133 @@ +use crate::platform; +use anyhow::Result; +use std::sync::mpsc; +use std::time::Duration; + +#[derive(Clone, Debug)] +pub struct ConversationSummary { + pub id: String, + pub title: String, + pub preview: String, + pub unread_count: u32, + pub date_unix: i64, +} + +#[derive(Clone, Debug)] +pub struct ChatMessage { + pub sender: String, + pub text: String, + pub date_unix: i64, +} + +pub enum Request { + RefreshConversations, + RefreshMessages { conversation_id: String }, + SendMessage { conversation_id: String, text: String }, + MarkRead { conversation_id: String }, + SyncConversation { conversation_id: String }, +} + +pub enum Event { + Conversations(Vec), + Messages { + conversation_id: String, + messages: Vec, + }, + MessageSent { + conversation_id: String, + outgoing_id: Option, + }, + MarkedRead, + ConversationSyncTriggered { conversation_id: String }, + ConversationsUpdated, + MessagesUpdated { conversation_id: String }, + UpdateStreamReconnected, + Error(String), +} + +pub fn spawn_worker( + request_rx: std::sync::mpsc::Receiver, + event_tx: std::sync::mpsc::Sender, +) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || { + let mut client = match platform::new_daemon_client() { + Ok(c) => c, + Err(e) => { + let _ = event_tx.send(Event::Error(format!("Failed to connect to daemon: {e}"))); + return; + } + }; + + if let Err(e) = client.install_signal_handlers(event_tx.clone()) { + let _ = event_tx.send(Event::Error(format!("Failed to install daemon signals: {e}"))); + } + + loop { + match request_rx.recv_timeout(Duration::from_millis(100)) { + Ok(req) => { + let res = match req { + Request::RefreshConversations => client + .get_conversations(200, 0) + .map(Event::Conversations), + Request::RefreshMessages { conversation_id } => client + .get_messages(conversation_id.clone(), None) + .map(|messages| Event::Messages { + conversation_id, + messages, + }), + Request::SendMessage { + conversation_id, + text, + } => client + .send_message(conversation_id.clone(), text) + .map(|outgoing_id| Event::MessageSent { + conversation_id, + outgoing_id, + }), + Request::MarkRead { conversation_id } => client + .mark_conversation_as_read(conversation_id.clone()) + .map(|_| Event::MarkedRead), + Request::SyncConversation { conversation_id } => client + .sync_conversation(conversation_id.clone()) + .map(|_| Event::ConversationSyncTriggered { conversation_id }), + }; + + match res { + Ok(evt) => { + let _ = event_tx.send(evt); + } + Err(e) => { + let _ = event_tx.send(Event::Error(format!("{e}"))); + } + } + } + Err(mpsc::RecvTimeoutError::Timeout) => {} + Err(mpsc::RecvTimeoutError::Disconnected) => break, + } + + if let Err(e) = client.poll(Duration::from_millis(0)) { + let _ = event_tx.send(Event::Error(format!("Daemon polling error: {e}"))); + } + } + }) +} + +pub(crate) trait DaemonClient { + fn get_conversations(&mut self, limit: i32, offset: i32) -> Result>; + fn get_messages( + &mut self, + conversation_id: String, + last_message_id: Option, + ) -> Result>; + fn send_message(&mut self, conversation_id: String, text: String) -> Result>; + fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>; + fn sync_conversation(&mut self, conversation_id: String) -> Result<()>; + fn install_signal_handlers(&mut self, _event_tx: mpsc::Sender) -> Result<()> { + Ok(()) + } + fn poll(&mut self, timeout: Duration) -> Result<()> { + std::thread::sleep(timeout); + Ok(()) + } +} + diff --git a/core/kptui/Cargo.toml b/core/kptui/Cargo.toml index e15b685..c89a8dd 100644 --- a/core/kptui/Cargo.toml +++ b/core/kptui/Cargo.toml @@ -6,21 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0.93" crossterm = "0.28.1" -log = "0.4.22" +kordophoned-client = { path = "../kordophoned-client" } ratatui = "0.29.0" time = { version = "0.3.37", features = ["formatting"] } unicode-width = "0.2.0" - -# D-Bus dependencies only on Linux -[target.'cfg(target_os = "linux")'.dependencies] -dbus = "0.9.7" - -# D-Bus codegen only on Linux -[target.'cfg(target_os = "linux")'.build-dependencies] -dbus-codegen = "0.10.0" - -# XPC (libxpc) interface only on macOS -[target.'cfg(target_os = "macos")'.dependencies] -block = "0.1.6" -xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" } -xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" } diff --git a/core/kptui/src/daemon/mod.rs b/core/kptui/src/daemon/mod.rs deleted file mode 100644 index ea0e9cd..0000000 --- a/core/kptui/src/daemon/mod.rs +++ /dev/null @@ -1,581 +0,0 @@ -use anyhow::Result; -use std::time::Duration; - -#[derive(Clone, Debug)] -pub struct ConversationSummary { - pub id: String, - pub title: String, - pub preview: String, - pub unread_count: u32, - pub date_unix: i64, -} - -#[derive(Clone, Debug)] -pub struct ChatMessage { - pub sender: String, - pub text: String, - pub date_unix: i64, -} - -pub enum Request { - RefreshConversations, - RefreshMessages { conversation_id: String }, - SendMessage { conversation_id: String, text: String }, - MarkRead { conversation_id: String }, - SyncConversation { conversation_id: String }, -} - -pub enum Event { - Conversations(Vec), - Messages { - conversation_id: String, - messages: Vec, - }, - MessageSent { - conversation_id: String, - outgoing_id: Option, - }, - MarkedRead, - ConversationSyncTriggered { conversation_id: String }, - ConversationsUpdated, - MessagesUpdated { conversation_id: String }, - UpdateStreamReconnected, - Error(String), -} - -pub fn spawn_worker( - request_rx: std::sync::mpsc::Receiver, - event_tx: std::sync::mpsc::Sender, -) -> std::thread::JoinHandle<()> { - std::thread::spawn(move || { - let mut client = match new_daemon_client() { - Ok(c) => c, - Err(e) => { - let _ = event_tx.send(Event::Error(format!("Failed to connect to daemon: {e}"))); - return; - } - }; - - if let Err(e) = client.install_signal_handlers(event_tx.clone()) { - let _ = event_tx.send(Event::Error(format!("Failed to install daemon signals: {e}"))); - } - - loop { - match request_rx.recv_timeout(Duration::from_millis(100)) { - Ok(req) => { - let res = match req { - Request::RefreshConversations => client - .get_conversations(200, 0) - .map(Event::Conversations), - Request::RefreshMessages { conversation_id } => client - .get_messages(conversation_id.clone(), None) - .map(|messages| Event::Messages { - conversation_id, - messages, - }), - Request::SendMessage { - conversation_id, - text, - } => client - .send_message(conversation_id.clone(), text) - .map(|outgoing_id| Event::MessageSent { - conversation_id, - outgoing_id, - }), - Request::MarkRead { conversation_id } => client - .mark_conversation_as_read(conversation_id.clone()) - .map(|_| Event::MarkedRead), - Request::SyncConversation { conversation_id } => client - .sync_conversation(conversation_id.clone()) - .map(|_| Event::ConversationSyncTriggered { conversation_id }), - }; - - match res { - Ok(evt) => { - let _ = event_tx.send(evt); - } - Err(e) => { - let _ = event_tx.send(Event::Error(format!("{e}"))); - } - } - } - Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} - Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, - } - - if let Err(e) = client.poll(Duration::from_millis(0)) { - let _ = event_tx.send(Event::Error(format!("Daemon polling error: {e}"))); - } - } - }) -} - -trait DaemonClient { - fn get_conversations(&mut self, limit: i32, offset: i32) -> Result>; - fn get_messages( - &mut self, - conversation_id: String, - last_message_id: Option, - ) -> Result>; - fn send_message( - &mut self, - conversation_id: String, - text: String, - ) -> Result>; - fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>; - fn sync_conversation(&mut self, conversation_id: String) -> Result<()>; - fn install_signal_handlers(&mut self, _event_tx: std::sync::mpsc::Sender) -> Result<()> { - Ok(()) - } - fn poll(&mut self, timeout: Duration) -> Result<()> { - std::thread::sleep(timeout); - Ok(()) - } -} - -fn new_daemon_client() -> Result> { - #[cfg(target_os = "linux")] - { - Ok(Box::new(linux::DBusClient::new()?)) - } - #[cfg(target_os = "macos")] - { - Ok(Box::new(macos::XpcClient::new()?)) - } - #[cfg(not(any(target_os = "linux", target_os = "macos")))] - { - anyhow::bail!("Unsupported platform") - } -} - -#[cfg(target_os = "linux")] -mod linux { - use super::{ChatMessage, ConversationSummary, DaemonClient, Event}; - use anyhow::Result; - use dbus::arg::{PropMap, RefArg}; - use dbus::blocking::{Connection, Proxy}; - use dbus::channel::Token; - use std::sync::mpsc::Sender; - use std::time::Duration; - - const DBUS_NAME: &str = "net.buzzert.kordophonecd"; - const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon"; - - #[allow(unused)] - mod dbus_interface { - #![allow(unused)] - include!(concat!(env!("OUT_DIR"), "/kordophone-client.rs")); - } - use dbus_interface::NetBuzzertKordophoneRepository as KordophoneRepository; - - pub struct DBusClient { - conn: Connection, - signal_tokens: Vec, - } - - impl DBusClient { - pub fn new() -> Result { - Ok(Self { - conn: Connection::new_session()?, - signal_tokens: Vec::new(), - }) - } - - fn proxy(&self) -> Proxy<&Connection> { - self.conn - .with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_millis(5000)) - } - } - - fn get_string(map: &PropMap, key: &str) -> String { - map.get(key) - .and_then(|v| v.0.as_str()) - .unwrap_or_default() - .to_string() - } - - fn get_i64(map: &PropMap, key: &str) -> i64 { - map.get(key).and_then(|v| v.0.as_i64()).unwrap_or(0) - } - - fn get_u32(map: &PropMap, key: &str) -> u32 { - get_i64(map, key).try_into().unwrap_or(0) - } - - fn get_vec_string(map: &PropMap, key: &str) -> Vec { - map.get(key) - .and_then(|v| v.0.as_iter()) - .map(|iter| { - iter.filter_map(|item| item.as_str().map(|s| s.to_string())) - .collect::>() - }) - .unwrap_or_default() - } - - impl DaemonClient for DBusClient { - fn get_conversations(&mut self, limit: i32, offset: i32) -> Result> { - let mut items = KordophoneRepository::get_conversations(&self.proxy(), limit, offset)?; - let mut conversations = items - .drain(..) - .map(|conv| { - let id = get_string(&conv, "guid"); - let display_name = get_string(&conv, "display_name"); - let participants = get_vec_string(&conv, "participants"); - let title = if !display_name.trim().is_empty() { - display_name - } else if participants.is_empty() { - "".to_string() - } else { - participants.join(", ") - }; - - ConversationSummary { - id, - title, - preview: get_string(&conv, "last_message_preview").replace('\n', " "), - unread_count: get_u32(&conv, "unread_count"), - date_unix: get_i64(&conv, "date"), - } - }) - .collect::>(); - - conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix)); - Ok(conversations) - } - - fn get_messages( - &mut self, - conversation_id: String, - last_message_id: Option, - ) -> Result> { - let messages = KordophoneRepository::get_messages( - &self.proxy(), - &conversation_id, - &last_message_id.unwrap_or_default(), - )?; - - Ok(messages - .into_iter() - .map(|msg| ChatMessage { - sender: get_string(&msg, "sender"), - text: get_string(&msg, "text"), - date_unix: get_i64(&msg, "date"), - }) - .collect()) - } - - fn send_message( - &mut self, - conversation_id: String, - text: String, - ) -> Result> { - let attachment_guids: Vec<&str> = vec![]; - let outgoing_id = KordophoneRepository::send_message( - &self.proxy(), - &conversation_id, - &text, - attachment_guids, - )?; - Ok(Some(outgoing_id)) - } - - fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> { - KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id) - .map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {e}")) - } - - fn sync_conversation(&mut self, conversation_id: String) -> Result<()> { - KordophoneRepository::sync_conversation(&self.proxy(), &conversation_id) - .map_err(|e| anyhow::anyhow!("Failed to sync conversation: {e}")) - } - - fn install_signal_handlers(&mut self, event_tx: Sender) -> Result<()> { - let conversations_tx = event_tx.clone(); - let t1 = self - .proxy() - .match_signal( - move |_: dbus_interface::NetBuzzertKordophoneRepositoryConversationsUpdated, - _: &Connection, - _: &dbus::message::Message| { - let _ = conversations_tx.send(Event::ConversationsUpdated); - true - }, - ) - .map_err(|e| anyhow::anyhow!("Failed to match ConversationsUpdated: {e}"))?; - - let messages_tx = event_tx.clone(); - let t2 = self - .proxy() - .match_signal( - move |s: dbus_interface::NetBuzzertKordophoneRepositoryMessagesUpdated, - _: &Connection, - _: &dbus::message::Message| { - let _ = messages_tx.send(Event::MessagesUpdated { - conversation_id: s.conversation_id, - }); - true - }, - ) - .map_err(|e| anyhow::anyhow!("Failed to match MessagesUpdated: {e}"))?; - - let reconnected_tx = event_tx; - let t3 = self - .proxy() - .match_signal( - move |_: dbus_interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected, - _: &Connection, - _: &dbus::message::Message| { - let _ = reconnected_tx.send(Event::UpdateStreamReconnected); - true - }, - ) - .map_err(|e| anyhow::anyhow!("Failed to match UpdateStreamReconnected: {e}"))?; - - self.signal_tokens.extend([t1, t2, t3]); - Ok(()) - } - - fn poll(&mut self, timeout: Duration) -> Result<()> { - self.conn.process(timeout)?; - Ok(()) - } - } -} - -#[cfg(target_os = "macos")] -mod macos { - use super::{ChatMessage, ConversationSummary, DaemonClient}; - use anyhow::Result; - use std::collections::HashMap; - use std::ffi::{CStr, CString}; - - use xpc_connection::Message; - - const SERVICE_NAME: &str = "net.buzzert.kordophonecd\0"; - - struct XpcTransport { - connection: xpc_connection_sys::xpc_connection_t, - } - - impl XpcTransport { - fn connect(name: impl AsRef) -> Self { - use xpc_connection_sys::xpc_connection_create_mach_service; - use xpc_connection_sys::xpc_connection_resume; - - let name = name.as_ref(); - let connection = - unsafe { xpc_connection_create_mach_service(name.as_ptr(), std::ptr::null_mut(), 0) }; - - unsafe { xpc_connection_resume(connection) }; - - Self { connection } - } - - fn send_with_reply(&self, message: Message) -> Message { - use xpc_connection::message_to_xpc_object; - use xpc_connection::xpc_object_to_message; - use xpc_connection_sys::{xpc_connection_send_message_with_reply_sync, xpc_release}; - - unsafe { - let xobj = message_to_xpc_object(message); - let reply = xpc_connection_send_message_with_reply_sync(self.connection, xobj); - xpc_release(xobj); - let msg = xpc_object_to_message(reply); - if !reply.is_null() { - xpc_release(reply); - } - msg - } - } - } - - impl Drop for XpcTransport { - fn drop(&mut self) { - use xpc_connection_sys::xpc_object_t; - use xpc_connection_sys::xpc_release; - unsafe { xpc_release(self.connection as xpc_object_t) }; - } - } - - pub struct XpcClient { - transport: XpcTransport, - } - - impl XpcClient { - pub fn new() -> Result { - let mach_port_name = CString::new(SERVICE_NAME).unwrap(); - Ok(Self { - transport: XpcTransport::connect(&mach_port_name), - }) - } - - fn key(s: &str) -> CString { - CString::new(s).unwrap() - } - - fn request(name: &str, arguments: Option>) -> Message { - let mut root: HashMap = HashMap::new(); - root.insert(Self::key("name"), Message::String(Self::key(name))); - if let Some(args) = arguments { - root.insert(Self::key("arguments"), Message::Dictionary(args)); - } - Message::Dictionary(root) - } - - fn get_string(map: &HashMap, key: &str) -> Option { - match map.get(&Self::key(key)) { - Some(Message::String(s)) => Some(s.to_string_lossy().into_owned()), - _ => None, - } - } - - fn get_i64_from_str(map: &HashMap, key: &str) -> i64 { - Self::get_string(map, key) - .and_then(|s| s.parse::().ok()) - .unwrap_or(0) - } - } - - impl DaemonClient for XpcClient { - fn get_conversations(&mut self, limit: i32, offset: i32) -> Result> { - let mut args = HashMap::new(); - args.insert(Self::key("limit"), Message::String(Self::key(&limit.to_string()))); - args.insert(Self::key("offset"), Message::String(Self::key(&offset.to_string()))); - - let reply = self - .transport - .send_with_reply(Self::request("GetConversations", Some(args))); - - let Message::Dictionary(map) = reply else { - anyhow::bail!("Unexpected conversations response"); - }; - - let Some(Message::Array(items)) = map.get(&Self::key("conversations")) else { - anyhow::bail!("Missing conversations in response"); - }; - - let mut conversations = Vec::new(); - for item in items { - let Message::Dictionary(conv) = item else { continue }; - let id = Self::get_string(conv, "guid").unwrap_or_default(); - let display_name = Self::get_string(conv, "display_name").unwrap_or_default(); - let preview = Self::get_string(conv, "last_message_preview").unwrap_or_default(); - let unread_count = Self::get_i64_from_str(conv, "unread_count") as u32; - let date_unix = Self::get_i64_from_str(conv, "date"); - - let participants = match conv.get(&Self::key("participants")) { - Some(Message::Array(arr)) => arr - .iter() - .filter_map(|m| match m { - Message::String(s) => Some(s.to_string_lossy().into_owned()), - _ => None, - }) - .collect::>(), - _ => Vec::new(), - }; - - let title = if !display_name.trim().is_empty() { - display_name - } else if participants.is_empty() { - "".to_string() - } else { - participants.join(", ") - }; - - conversations.push(ConversationSummary { - id, - title, - preview: preview.replace('\n', " "), - unread_count, - date_unix, - }); - } - - conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix)); - Ok(conversations) - } - - fn get_messages( - &mut self, - conversation_id: String, - last_message_id: Option, - ) -> Result> { - let mut args = HashMap::new(); - args.insert( - Self::key("conversation_id"), - Message::String(Self::key(&conversation_id)), - ); - if let Some(last) = last_message_id { - args.insert(Self::key("last_message_id"), Message::String(Self::key(&last))); - } - - let reply = self - .transport - .send_with_reply(Self::request("GetMessages", Some(args))); - let Message::Dictionary(map) = reply else { - anyhow::bail!("Unexpected messages response"); - }; - - let Some(Message::Array(items)) = map.get(&Self::key("messages")) else { - anyhow::bail!("Missing messages in response"); - }; - - let mut messages = Vec::new(); - for item in items { - let Message::Dictionary(msg) = item else { continue }; - messages.push(ChatMessage { - sender: Self::get_string(msg, "sender").unwrap_or_default(), - text: Self::get_string(msg, "text").unwrap_or_default(), - date_unix: Self::get_i64_from_str(msg, "date"), - }); - } - Ok(messages) - } - - fn send_message( - &mut self, - conversation_id: String, - text: String, - ) -> Result> { - let mut args = HashMap::new(); - args.insert( - Self::key("conversation_id"), - Message::String(Self::key(&conversation_id)), - ); - args.insert(Self::key("text"), Message::String(Self::key(&text))); - - let reply = self - .transport - .send_with_reply(Self::request("SendMessage", Some(args))); - let Message::Dictionary(map) = reply else { - anyhow::bail!("Unexpected send response"); - }; - - Ok(Self::get_string(&map, "uuid")) - } - - fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> { - let mut args = HashMap::new(); - args.insert( - Self::key("conversation_id"), - Message::String(Self::key(&conversation_id)), - ); - let _ = self - .transport - .send_with_reply(Self::request("MarkConversationAsRead", Some(args))); - Ok(()) - } - - fn sync_conversation(&mut self, conversation_id: String) -> Result<()> { - let mut args = HashMap::new(); - args.insert( - Self::key("conversation_id"), - Message::String(Self::key(&conversation_id)), - ); - let _ = self - .transport - .send_with_reply(Self::request("SyncConversation", Some(args))); - Ok(()) - } - } -} diff --git a/core/kptui/src/main.rs b/core/kptui/src/main.rs index 1bd8ac4..a9e424e 100644 --- a/core/kptui/src/main.rs +++ b/core/kptui/src/main.rs @@ -1,4 +1,4 @@ -mod daemon; +use kordophoned_client as daemon; use anyhow::Result; use crossterm::event::{Event as CEvent, KeyCode, KeyEventKind, KeyModifiers}; @@ -25,6 +25,7 @@ enum Focus { struct AppState { conversations: Vec, selected_idx: usize, + selected_conversation_id: Option, messages: Vec, active_conversation_id: Option, active_conversation_title: String, @@ -42,6 +43,7 @@ impl AppState { Self { conversations: Vec::new(), selected_idx: 0, + selected_conversation_id: None, messages: Vec::new(), active_conversation_id: None, active_conversation_title: String::new(), @@ -58,23 +60,34 @@ impl AppState { fn select_next(&mut self) { if self.conversations.is_empty() { self.selected_idx = 0; + self.selected_conversation_id = None; return; } self.selected_idx = (self.selected_idx + 1).min(self.conversations.len() - 1); + self.selected_conversation_id = self + .conversations + .get(self.selected_idx) + .map(|c| c.id.clone()); } fn select_prev(&mut self) { if self.conversations.is_empty() { self.selected_idx = 0; + self.selected_conversation_id = None; return; } self.selected_idx = self.selected_idx.saturating_sub(1); + self.selected_conversation_id = self + .conversations + .get(self.selected_idx) + .map(|c| c.id.clone()); } fn open_selected_conversation(&mut self) { if let Some(conv) = self.conversations.get(self.selected_idx) { self.active_conversation_id = Some(conv.id.clone()); self.active_conversation_title = conv.title.clone(); + self.selected_conversation_id = Some(conv.id.clone()); self.messages.clear(); self.transcript_scroll = 0; self.pinned_to_bottom = true; @@ -331,14 +344,31 @@ fn run_app(terminal: &mut ratatui::Terminal { + let keep_selected_id = app + .selected_conversation_id + .clone() + .or_else(|| app.active_conversation_id.clone()); + app.refresh_conversations_in_flight = false; app.status.clear(); app.conversations = convs; - if app.selected_idx >= app.conversations.len() { - app.selected_idx = app.conversations.len().saturating_sub(1); - } - if app.active_conversation_id.is_none() && !app.conversations.is_empty() { + if app.conversations.is_empty() { + app.selected_idx = 0; + app.selected_conversation_id = None; + } else if let Some(id) = keep_selected_id { + if let Some(idx) = app.conversations.iter().position(|c| c.id == id) { + app.selected_idx = idx; + app.selected_conversation_id = Some(id); + } else { + app.selected_idx = 0; + app.selected_conversation_id = + Some(app.conversations[0].id.clone()); + } + } else { app.selected_idx = app.selected_idx.min(app.conversations.len() - 1); + app.selected_conversation_id = Some( + app.conversations[app.selected_idx].id.clone(), + ); } } daemon::Event::Messages { @@ -477,7 +507,7 @@ fn run_app(terminal: &mut ratatui::Terminal app.focus = Focus::Input, + KeyCode::Char('i') if app.focus != Focus::Input => app.focus = Focus::Input, _ => { handle_chat_keys(&mut app, &request_tx, key.code, max_scroll); } @@ -490,7 +520,7 @@ fn run_app(terminal: &mut ratatui::Terminal app.focus = Focus::Navigation, - KeyCode::Char('i') => app.focus = Focus::Input, + KeyCode::Char('i') if app.focus != Focus::Input => app.focus = Focus::Input, KeyCode::Up => { if app.focus == Focus::Navigation { app.select_prev()