Private
Public Access
1
0

kptui: organize client code into kordophoned-client

This commit is contained in:
2025-12-14 18:06:14 -08:00
parent 68bb94aa0b
commit fc69c387c5
13 changed files with 661 additions and 609 deletions

View File

@@ -0,0 +1,5 @@
mod platform;
mod worker;
pub use worker::{spawn_worker, ChatMessage, ConversationSummary, Event, Request};

View File

@@ -0,0 +1,189 @@
#![cfg(target_os = "linux")]
use crate::worker::{ChatMessage, ConversationSummary, DaemonClient, Event};
use anyhow::Result;
use dbus::arg::{PropMap, RefArg};
use dbus::blocking::{Connection, Proxy};
use dbus::channel::Token;
use std::sync::mpsc::Sender;
use std::time::Duration;
const DBUS_NAME: &str = "net.buzzert.kordophonecd";
const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon";
#[allow(unused)]
mod dbus_interface {
#![allow(unused)]
include!(concat!(env!("OUT_DIR"), "/kordophone-client.rs"));
}
use dbus_interface::NetBuzzertKordophoneRepository as KordophoneRepository;
pub(crate) struct DBusClient {
conn: Connection,
signal_tokens: Vec<Token>,
}
impl DBusClient {
pub(crate) fn new() -> Result<Self> {
Ok(Self {
conn: Connection::new_session()?,
signal_tokens: Vec::new(),
})
}
fn proxy(&self) -> Proxy<&Connection> {
self.conn
.with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_millis(5000))
}
}
fn get_string(map: &PropMap, key: &str) -> String {
map.get(key)
.and_then(|v| v.0.as_str())
.unwrap_or_default()
.to_string()
}
fn get_i64(map: &PropMap, key: &str) -> i64 {
map.get(key).and_then(|v| v.0.as_i64()).unwrap_or(0)
}
fn get_u32(map: &PropMap, key: &str) -> u32 {
get_i64(map, key).try_into().unwrap_or(0)
}
fn get_vec_string(map: &PropMap, key: &str) -> Vec<String> {
map.get(key)
.and_then(|v| v.0.as_iter())
.map(|iter| {
iter.filter_map(|item| item.as_str().map(|s| s.to_string()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
}
impl DaemonClient for DBusClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>> {
let mut items = KordophoneRepository::get_conversations(&self.proxy(), limit, offset)?;
let mut conversations = items
.drain(..)
.map(|conv| {
let id = get_string(&conv, "guid");
let display_name = get_string(&conv, "display_name");
let participants = get_vec_string(&conv, "participants");
let title = if !display_name.trim().is_empty() {
display_name
} else if participants.is_empty() {
"<unknown>".to_string()
} else {
participants.join(", ")
};
ConversationSummary {
id,
title,
preview: get_string(&conv, "last_message_preview").replace('\n', " "),
unread_count: get_u32(&conv, "unread_count"),
date_unix: get_i64(&conv, "date"),
}
})
.collect::<Vec<_>>();
conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix));
Ok(conversations)
}
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>> {
let messages = KordophoneRepository::get_messages(
&self.proxy(),
&conversation_id,
&last_message_id.unwrap_or_default(),
)?;
Ok(messages
.into_iter()
.map(|msg| ChatMessage {
sender: get_string(&msg, "sender"),
text: get_string(&msg, "text"),
date_unix: get_i64(&msg, "date"),
})
.collect())
}
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
let attachment_guids: Vec<&str> = vec![];
let outgoing_id = KordophoneRepository::send_message(
&self.proxy(),
&conversation_id,
&text,
attachment_guids,
)?;
Ok(Some(outgoing_id))
}
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id)
.map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {e}"))
}
fn sync_conversation(&mut self, conversation_id: String) -> Result<()> {
KordophoneRepository::sync_conversation(&self.proxy(), &conversation_id)
.map_err(|e| anyhow::anyhow!("Failed to sync conversation: {e}"))
}
fn install_signal_handlers(&mut self, event_tx: Sender<Event>) -> Result<()> {
let conversations_tx = event_tx.clone();
let t1 = self
.proxy()
.match_signal(
move |_: dbus_interface::NetBuzzertKordophoneRepositoryConversationsUpdated,
_: &Connection,
_: &dbus::message::Message| {
let _ = conversations_tx.send(Event::ConversationsUpdated);
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match ConversationsUpdated: {e}"))?;
let messages_tx = event_tx.clone();
let t2 = self
.proxy()
.match_signal(
move |s: dbus_interface::NetBuzzertKordophoneRepositoryMessagesUpdated,
_: &Connection,
_: &dbus::message::Message| {
let _ = messages_tx.send(Event::MessagesUpdated {
conversation_id: s.conversation_id,
});
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match MessagesUpdated: {e}"))?;
let reconnected_tx = event_tx;
let t3 = self
.proxy()
.match_signal(
move |_: dbus_interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected,
_: &Connection,
_: &dbus::message::Message| {
let _ = reconnected_tx.send(Event::UpdateStreamReconnected);
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match UpdateStreamReconnected: {e}"))?;
self.signal_tokens.extend([t1, t2, t3]);
Ok(())
}
fn poll(&mut self, timeout: Duration) -> Result<()> {
self.conn.process(timeout)?;
Ok(())
}
}

View File

@@ -0,0 +1,233 @@
#![cfg(target_os = "macos")]
use crate::worker::{ChatMessage, ConversationSummary, DaemonClient};
use anyhow::Result;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use xpc_connection::Message;
const SERVICE_NAME: &str = "net.buzzert.kordophonecd\0";
struct XpcTransport {
connection: xpc_connection_sys::xpc_connection_t,
}
impl XpcTransport {
fn connect(name: impl AsRef<CStr>) -> Self {
use xpc_connection_sys::xpc_connection_create_mach_service;
use xpc_connection_sys::xpc_connection_resume;
let name = name.as_ref();
let connection =
unsafe { xpc_connection_create_mach_service(name.as_ptr(), std::ptr::null_mut(), 0) };
unsafe { xpc_connection_resume(connection) };
Self { connection }
}
fn send_with_reply(&self, message: Message) -> Message {
use xpc_connection::message_to_xpc_object;
use xpc_connection::xpc_object_to_message;
use xpc_connection_sys::{xpc_connection_send_message_with_reply_sync, xpc_release};
unsafe {
let xobj = message_to_xpc_object(message);
let reply = xpc_connection_send_message_with_reply_sync(self.connection, xobj);
xpc_release(xobj);
let msg = xpc_object_to_message(reply);
if !reply.is_null() {
xpc_release(reply);
}
msg
}
}
}
impl Drop for XpcTransport {
fn drop(&mut self) {
use xpc_connection_sys::xpc_object_t;
use xpc_connection_sys::xpc_release;
unsafe { xpc_release(self.connection as xpc_object_t) };
}
}
pub(crate) struct XpcClient {
transport: XpcTransport,
}
impl XpcClient {
pub(crate) fn new() -> Result<Self> {
let mach_port_name = CString::new(SERVICE_NAME).unwrap();
Ok(Self {
transport: XpcTransport::connect(&mach_port_name),
})
}
fn key(s: &str) -> CString {
CString::new(s).unwrap()
}
fn request(name: &str, arguments: Option<HashMap<CString, Message>>) -> Message {
let mut root: HashMap<CString, Message> = HashMap::new();
root.insert(Self::key("name"), Message::String(Self::key(name)));
if let Some(args) = arguments {
root.insert(Self::key("arguments"), Message::Dictionary(args));
}
Message::Dictionary(root)
}
fn get_string(map: &HashMap<CString, Message>, key: &str) -> Option<String> {
match map.get(&Self::key(key)) {
Some(Message::String(s)) => Some(s.to_string_lossy().into_owned()),
_ => None,
}
}
fn get_i64_from_str(map: &HashMap<CString, Message>, key: &str) -> i64 {
Self::get_string(map, key)
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0)
}
}
impl DaemonClient for XpcClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>> {
let mut args = HashMap::new();
args.insert(Self::key("limit"), Message::String(Self::key(&limit.to_string())));
args.insert(Self::key("offset"), Message::String(Self::key(&offset.to_string())));
let reply = self
.transport
.send_with_reply(Self::request("GetConversations", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected conversations response");
};
let Some(Message::Array(items)) = map.get(&Self::key("conversations")) else {
anyhow::bail!("Missing conversations in response");
};
let mut conversations = Vec::new();
for item in items {
let Message::Dictionary(conv) = item else { continue };
let id = Self::get_string(conv, "guid").unwrap_or_default();
let display_name = Self::get_string(conv, "display_name").unwrap_or_default();
let preview = Self::get_string(conv, "last_message_preview").unwrap_or_default();
let unread_count = Self::get_i64_from_str(conv, "unread_count") as u32;
let date_unix = Self::get_i64_from_str(conv, "date");
let participants = match conv.get(&Self::key("participants")) {
Some(Message::Array(arr)) => arr
.iter()
.filter_map(|m| match m {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
.collect::<Vec<_>>(),
_ => Vec::new(),
};
let title = if !display_name.trim().is_empty() {
display_name
} else if participants.is_empty() {
"<unknown>".to_string()
} else {
participants.join(", ")
};
conversations.push(ConversationSummary {
id,
title,
preview: preview.replace('\n', " "),
unread_count,
date_unix,
});
}
conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix));
Ok(conversations)
}
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
if let Some(last) = last_message_id {
args.insert(Self::key("last_message_id"), Message::String(Self::key(&last)));
}
let reply = self
.transport
.send_with_reply(Self::request("GetMessages", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected messages response");
};
let Some(Message::Array(items)) = map.get(&Self::key("messages")) else {
anyhow::bail!("Missing messages in response");
};
let mut messages = Vec::new();
for item in items {
let Message::Dictionary(msg) = item else { continue };
messages.push(ChatMessage {
sender: Self::get_string(msg, "sender").unwrap_or_default(),
text: Self::get_string(msg, "text").unwrap_or_default(),
date_unix: Self::get_i64_from_str(msg, "date"),
});
}
Ok(messages)
}
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
args.insert(Self::key("text"), Message::String(Self::key(&text)));
let reply = self
.transport
.send_with_reply(Self::request("SendMessage", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected send response");
};
Ok(Self::get_string(&map, "uuid"))
}
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
let _ = self
.transport
.send_with_reply(Self::request("MarkConversationAsRead", Some(args)));
Ok(())
}
fn sync_conversation(&mut self, conversation_id: String) -> Result<()> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
let _ = self
.transport
.send_with_reply(Self::request("SyncConversation", Some(args)));
Ok(())
}
}

View File

@@ -0,0 +1,24 @@
use crate::worker::DaemonClient;
use anyhow::Result;
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "macos")]
mod macos;
pub(crate) fn new_daemon_client() -> Result<Box<dyn DaemonClient>> {
#[cfg(target_os = "linux")]
{
Ok(Box::new(linux::DBusClient::new()?))
}
#[cfg(target_os = "macos")]
{
Ok(Box::new(macos::XpcClient::new()?))
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
anyhow::bail!("Unsupported platform")
}
}

View File

@@ -0,0 +1,133 @@
use crate::platform;
use anyhow::Result;
use std::sync::mpsc;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct ConversationSummary {
pub id: String,
pub title: String,
pub preview: String,
pub unread_count: u32,
pub date_unix: i64,
}
#[derive(Clone, Debug)]
pub struct ChatMessage {
pub sender: String,
pub text: String,
pub date_unix: i64,
}
pub enum Request {
RefreshConversations,
RefreshMessages { conversation_id: String },
SendMessage { conversation_id: String, text: String },
MarkRead { conversation_id: String },
SyncConversation { conversation_id: String },
}
pub enum Event {
Conversations(Vec<ConversationSummary>),
Messages {
conversation_id: String,
messages: Vec<ChatMessage>,
},
MessageSent {
conversation_id: String,
outgoing_id: Option<String>,
},
MarkedRead,
ConversationSyncTriggered { conversation_id: String },
ConversationsUpdated,
MessagesUpdated { conversation_id: String },
UpdateStreamReconnected,
Error(String),
}
pub fn spawn_worker(
request_rx: std::sync::mpsc::Receiver<Request>,
event_tx: std::sync::mpsc::Sender<Event>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
let mut client = match platform::new_daemon_client() {
Ok(c) => c,
Err(e) => {
let _ = event_tx.send(Event::Error(format!("Failed to connect to daemon: {e}")));
return;
}
};
if let Err(e) = client.install_signal_handlers(event_tx.clone()) {
let _ = event_tx.send(Event::Error(format!("Failed to install daemon signals: {e}")));
}
loop {
match request_rx.recv_timeout(Duration::from_millis(100)) {
Ok(req) => {
let res = match req {
Request::RefreshConversations => client
.get_conversations(200, 0)
.map(Event::Conversations),
Request::RefreshMessages { conversation_id } => client
.get_messages(conversation_id.clone(), None)
.map(|messages| Event::Messages {
conversation_id,
messages,
}),
Request::SendMessage {
conversation_id,
text,
} => client
.send_message(conversation_id.clone(), text)
.map(|outgoing_id| Event::MessageSent {
conversation_id,
outgoing_id,
}),
Request::MarkRead { conversation_id } => client
.mark_conversation_as_read(conversation_id.clone())
.map(|_| Event::MarkedRead),
Request::SyncConversation { conversation_id } => client
.sync_conversation(conversation_id.clone())
.map(|_| Event::ConversationSyncTriggered { conversation_id }),
};
match res {
Ok(evt) => {
let _ = event_tx.send(evt);
}
Err(e) => {
let _ = event_tx.send(Event::Error(format!("{e}")));
}
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
if let Err(e) = client.poll(Duration::from_millis(0)) {
let _ = event_tx.send(Event::Error(format!("Daemon polling error: {e}")));
}
}
})
}
pub(crate) trait DaemonClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>>;
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>>;
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>>;
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>;
fn sync_conversation(&mut self, conversation_id: String) -> Result<()>;
fn install_signal_handlers(&mut self, _event_tx: mpsc::Sender<Event>) -> Result<()> {
Ok(())
}
fn poll(&mut self, timeout: Duration) -> Result<()> {
std::thread::sleep(timeout);
Ok(())
}
}