Private
Public Access
1
0

3 Commits

60 changed files with 1964 additions and 2701 deletions

1
.gitignore vendored
View File

@@ -1 +0,0 @@
target/

View File

@@ -1,18 +0,0 @@
[target.arm-unknown-linux-gnueabihf]
# Match Raspberry Pi Zero CPU (ARM1176JZF-S).
rustflags = [
"-C", "target-cpu=arm1176jzf-s",
"-L", "native=/usr/lib/arm-linux-gnueabihf",
"-L", "native=/lib/arm-linux-gnueabihf",
"-C", "link-arg=-Wl,-rpath-link,/usr/lib/arm-linux-gnueabihf",
"-C", "link-arg=-Wl,-rpath-link,/lib/arm-linux-gnueabihf",
]
[target.aarch64-unknown-linux-gnu]
linker = "aarch64-linux-gnu-gcc"
rustflags = [
"-L", "native=/usr/lib/aarch64-linux-gnu",
"-L", "native=/lib/aarch64-linux-gnu",
"-C", "link-arg=-Wl,-rpath-link,/usr/lib/aarch64-linux-gnu",
"-C", "link-arg=-Wl,-rpath-link,/lib/aarch64-linux-gnu",
]

1214
core/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,9 +3,7 @@ members = [
"kordophone",
"kordophone-db",
"kordophoned",
"kordophoned-client",
"kpcli",
"kptui",
"utilities",
]
resolver = "2"

View File

@@ -1,37 +0,0 @@
[target.arm-unknown-linux-gnueabihf]
# Raspberry Pi Zero / Zero W (ARM1176JZF-S, ARMv6 + hard-float).
#
# Several workspace crates use native libs via pkg-config (dbus, sqlite, libsecret).
# Install the ARMv6/armhf -dev packages inside the cross image so they are available
# to the target linker.
pre-build = [
"dpkg --add-architecture armhf",
"apt-get update",
"apt-get install -y --no-install-recommends bash pkg-config libc6-dev:armhf libdbus-1-dev:armhf libsystemd-dev:armhf libsqlite3-dev:armhf libsecret-1-dev:armhf",
# `cross` doesn't reliably forward PKG_CONFIG_* env vars into the container, so install a tiny
# wrapper that selects the correct multiarch pkgconfig dir based on `$TARGET`.
"bash -lc 'if [ -x /usr/bin/pkg-config ] && [ ! -x /usr/bin/pkg-config.real ]; then mv /usr/bin/pkg-config /usr/bin/pkg-config.real; fi'",
"bash -lc 'printf \"%b\" \"#!/usr/bin/env bash\\nset -euo pipefail\\nREAL=/usr/bin/pkg-config.real\\ncase \\\"\\${TARGET:-}\\\" in\\n arm-unknown-linux-gnueabihf)\\n export PKG_CONFIG_ALLOW_CROSS=1\\n export PKG_CONFIG_SYSROOT_DIR=/\\n export PKG_CONFIG_LIBDIR=/usr/lib/arm-linux-gnueabihf/pkgconfig\\n export PKG_CONFIG_PATH=\\\"\\$PKG_CONFIG_LIBDIR\\\"\\n ;;\\n aarch64-unknown-linux-gnu)\\n export PKG_CONFIG_ALLOW_CROSS=1\\n export PKG_CONFIG_SYSROOT_DIR=/\\n export PKG_CONFIG_LIBDIR=/usr/lib/aarch64-linux-gnu/pkgconfig\\n export PKG_CONFIG_PATH=\\\"\\$PKG_CONFIG_LIBDIR\\\"\\n ;;\\n *)\\n ;;\\nesac\\nexec \\\"\\$REAL\\\" \\\"\\$@\\\"\\n\" > /usr/bin/pkg-config && chmod +x /usr/bin/pkg-config'",
# Sanity checks (use wrapper + armhf search path).
"bash -lc 'TARGET=arm-unknown-linux-gnueabihf pkg-config --modversion dbus-1'",
"bash -lc 'TARGET=arm-unknown-linux-gnueabihf pkg-config --modversion sqlite3'",
"bash -lc 'TARGET=arm-unknown-linux-gnueabihf pkg-config --modversion libsecret-1'",
]
[target.aarch64-unknown-linux-gnu]
# Raspberry Pi OS (64-bit) / other aarch64 Linux.
#
# Use a Debian 11 (bullseye) base so the resulting binaries are compatible with
# bullseye's glibc, and to get a system `libsqlite3` new enough for Diesel.
image = "debian:bullseye-slim"
pre-build = [
"dpkg --add-architecture arm64",
"apt-get update",
"apt-get install -y --no-install-recommends ca-certificates bash pkg-config build-essential gcc-aarch64-linux-gnu libc6-dev:arm64 libdbus-1-dev:arm64 libsystemd-dev:arm64 libsqlite3-dev:arm64 libsecret-1-dev:arm64",
# Same wrapper as above (installed once, safe to re-run).
"bash -lc 'if [ -x /usr/bin/pkg-config ] && [ ! -x /usr/bin/pkg-config.real ]; then mv /usr/bin/pkg-config /usr/bin/pkg-config.real; fi'",
"bash -lc 'printf \"%b\" \"#!/usr/bin/env bash\\nset -euo pipefail\\nREAL=/usr/bin/pkg-config.real\\ncase \\\"\\${TARGET:-}\\\" in\\n arm-unknown-linux-gnueabihf)\\n export PKG_CONFIG_ALLOW_CROSS=1\\n export PKG_CONFIG_SYSROOT_DIR=/\\n export PKG_CONFIG_LIBDIR=/usr/lib/arm-linux-gnueabihf/pkgconfig\\n export PKG_CONFIG_PATH=\\\"\\$PKG_CONFIG_LIBDIR\\\"\\n ;;\\n aarch64-unknown-linux-gnu)\\n export PKG_CONFIG_ALLOW_CROSS=1\\n export PKG_CONFIG_SYSROOT_DIR=/\\n export PKG_CONFIG_LIBDIR=/usr/lib/aarch64-linux-gnu/pkgconfig\\n export PKG_CONFIG_PATH=\\\"\\$PKG_CONFIG_LIBDIR\\\"\\n ;;\\n *)\\n ;;\\nesac\\nexec \\\"\\$REAL\\\" \\\"\\$@\\\"\\n\" > /usr/bin/pkg-config && chmod +x /usr/bin/pkg-config'",
"bash -lc 'TARGET=aarch64-unknown-linux-gnu pkg-config --modversion dbus-1'",
"bash -lc 'TARGET=aarch64-unknown-linux-gnu pkg-config --modversion sqlite3'",
"bash -lc 'TARGET=aarch64-unknown-linux-gnu pkg-config --modversion libsecret-1'",
]

View File

@@ -1,25 +0,0 @@
FROM debian:bookworm
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
curl \
build-essential \
make \
pkg-config \
libssl-dev \
libsqlite3-dev \
libdbus-1-dev \
libsystemd-dev \
dpkg \
&& rm -rf /var/lib/apt/lists/*
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
ENV PATH="/root/.cargo/bin:${PATH}"
RUN cargo install cargo-deb
WORKDIR /workspace
COPY . .
CMD ["make", "deb"]

View File

@@ -12,23 +12,5 @@ rpm:
cargo build --release --workspace
strip -s target/release/kordophoned
strip -s target/release/kpcli
strip -s target/release/kptui
cargo generate-rpm -p kordophoned --auto-req builtin
cargo generate-rpm -p kordophoned
.PHONY: deb
deb:
cargo build --release --workspace
strip -s target/release/kordophoned
strip -s target/release/kpcli
strip -s target/release/kptui
cargo deb -p kordophoned --no-build
.PHONY: pi-zero
pi-zero:
CARGO_TARGET_DIR=target/cross/arm-unknown-linux-gnueabihf \
cross build --release --target arm-unknown-linux-gnueabihf -p kordophoned -p kpcli -p kptui
.PHONY: pi-aarch64
pi-aarch64:
CARGO_TARGET_DIR=target/cross/aarch64-unknown-linux-gnu \
cross build --release --target aarch64-unknown-linux-gnu -p kordophoned -p kpcli -p kptui

View File

@@ -9,9 +9,7 @@ Workspace members:
- `kordophoned/` — Client daemon providing local caching and IPC
- Linux: DBus
- macOS: XPC (see notes below)
- `kordophoned-client/` — Cross-platform client library for talking to `kordophoned` (D-Bus/XPC).
- `kpcli/` — Commandline interface for interacting with the API, DB, and daemon.
- `kptui/` — Terminal UI client (Ratatui) for reading and replying to chats via the daemon.
- `utilities/` — Small helper tools (e.g., testing utilities).
## Build
@@ -29,42 +27,6 @@ cargo build -p kordophone
cargo build -p kordophoned --release
```
## Raspberry Pi Zero (cross build)
Recommended approach is `cross` (https://github.com/cross-rs/cross), which uses a containerized toolchain.
Prereqs:
- Install Docker or Podman
- Install `cross`: `cargo install cross`
Build ARMv6 (Pi Zero / Zero W):
```bash
cd core
make pi-zero
```
Build aarch64 (Pi OS 64-bit / Pi 3+):
```bash
cd core
make pi-aarch64
```
Notes:
- The aarch64 cross build uses a Debian 11 (bullseye) container base image to keep glibc compatible with bullseye.
Artifacts:
- `target/cross/arm-unknown-linux-gnueabihf/arm-unknown-linux-gnueabihf/release/kordophoned`
- `target/cross/arm-unknown-linux-gnueabihf/arm-unknown-linux-gnueabihf/release/kpcli`
- `target/cross/arm-unknown-linux-gnueabihf/arm-unknown-linux-gnueabihf/release/kptui`
- `target/cross/aarch64-unknown-linux-gnu/aarch64-unknown-linux-gnu/release/kordophoned`
- `target/cross/aarch64-unknown-linux-gnu/aarch64-unknown-linux-gnu/release/kpcli`
- `target/cross/aarch64-unknown-linux-gnu/aarch64-unknown-linux-gnu/release/kptui`
## `kordophoned` (Client Daemon)
The daemon maintains a local cache, handles update cycles, and exposes IPC for GUI apps.
@@ -91,16 +53,6 @@ strip -s target/release/kordophoned
cargo generate-rpm
```
### Packaging (DEB example)
`kordophoned` is configured for Debian packaging via `cargo-deb`.
```bash
cargo install cargo-deb
cd core
cargo deb -p kordophoned
```
## `kpcli` (CLI)
Useful for quick testing and interacting with the daemon/cache.
@@ -113,3 +65,4 @@ cargo run -p kpcli -- --help
- TLS/WebSocket: the `kordophone` crate includes `rustls` and installs a crypto provider at process start.
- DB: `kordophone-db` includes Diesel migrations under `kordophone-db/migrations/`.

View File

@@ -264,34 +264,16 @@ impl<'a> Repository<'a> {
.order_by(schema::messages::date.asc())
.load::<MessageRecord>(self.connection)?;
let sender_handles: Vec<String> = message_records
.iter()
.filter_map(|record| record.sender_participant_handle.clone())
.collect();
let participant_map: HashMap<String, Participant> = if sender_handles.is_empty() {
HashMap::new()
} else {
participants
.filter(handle.eq_any(&sender_handles))
.load::<ParticipantRecord>(self.connection)?
.into_iter()
.map(|participant| {
let key = participant.handle.clone();
(key, participant.into())
})
.collect()
};
let mut result = Vec::new();
for message_record in message_records {
let mut message: Message = message_record.clone().into();
// If the message references a sender participant, load the participant info
if let Some(sender_handle) = message_record.sender_participant_handle {
if let Some(participant) = participant_map.get(&sender_handle) {
message.sender = participant.clone();
}
let participant = participants
.find(sender_handle)
.first::<ParticipantRecord>(self.connection)?;
message.sender = participant.into();
}
result.push(message);

View File

@@ -14,7 +14,7 @@ ctor = "0.2.8"
env_logger = "0.11.5"
futures-util = "0.3.31"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = { version = "0.24", default-features = false, features = ["http1", "webpki-tokio"] }
hyper-tls = "0.5.0"
log = { version = "0.4.21", features = [] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91"

View File

@@ -7,7 +7,7 @@ use crate::api::event_socket::{EventSocket, SinkMessage, SocketEvent, SocketUpda
use crate::api::AuthenticationStore;
use bytes::Bytes;
use hyper::{Body, Client, Method, Request, Uri};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use hyper_tls::HttpsConnector;
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
@@ -394,8 +394,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
None => "updates".to_string(),
};
let uri = self
.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
let uri = self.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
loop {
log::debug!("Connecting to websocket: {:?}", uri);
@@ -426,18 +425,20 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
log::debug!("Websocket request: {:?}", request);
let mut should_retry = true; // retry once after authenticating.
let mut should_retry = true; // retry once after authenticating.
match connect_async(request).await.map_err(Error::from) {
Ok((socket, response)) => {
log::debug!("Websocket connected: {:?}", response.status());
break Ok(WebsocketEventSocket::new(socket))
break Ok(WebsocketEventSocket::new(socket));
}
Err(e) => match &e {
Error::ClientError(ce) => match ce.as_str() {
"HTTP error: 401 Unauthorized" | "Unauthorized" => {
// Try to authenticate
if let Some(credentials) = &self.auth_store.get_credentials().await {
log::warn!("Websocket connection failed, attempting to authenticate");
log::warn!(
"Websocket connection failed, attempting to authenticate"
);
let new_token = self.authenticate(credentials.clone()).await?;
self.auth_store.set_token(new_token.to_string()).await;
@@ -466,23 +467,19 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
pub fn new(base_url: Uri, auth_store: K) -> HTTPAPIClient<K> {
let https = HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.build();
let https = HttpsConnector::new();
let client = Client::builder().build::<_, Body>(https);
HTTPAPIClient { base_url, auth_store, client }
HTTPAPIClient {
base_url,
auth_store,
client,
}
}
fn uri_for_endpoint(&self, endpoint: &str, scheme: Option<&str>) -> Result<Uri, Error> {
let mut parts = self.base_url.clone().into_parts();
let root_path: PathBuf = parts
.path_and_query
.ok_or(Error::URLError)?
.path()
.into();
let root_path: PathBuf = parts.path_and_query.ok_or(Error::URLError)?.path().into();
let path = root_path.join(endpoint);
let path_str = path.to_str().ok_or(Error::URLError)?;

View File

@@ -1,22 +0,0 @@
[package]
name = "kordophoned-client"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.93"
log = "0.4.22"
# D-Bus dependencies only on Linux
[target.'cfg(target_os = "linux")'.dependencies]
dbus = "0.9.7"
# D-Bus codegen only on Linux
[target.'cfg(target_os = "linux")'.build-dependencies]
dbus-codegen = { version = "0.10.0", default-features = false }
# XPC (libxpc) interface only on macOS
[target.'cfg(target_os = "macos")'.dependencies]
block = "0.1.6"
xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" }
xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" }

View File

@@ -1,26 +0,0 @@
const KORDOPHONE_XML: &str = "../kordophoned/include/net.buzzert.kordophonecd.Server.xml";
#[cfg(not(target_os = "linux"))]
fn main() {
// No D-Bus codegen on non-Linux platforms.
}
#[cfg(target_os = "linux")]
fn main() {
let out_dir = std::env::var("OUT_DIR").unwrap();
let out_path = std::path::Path::new(&out_dir).join("kordophone-client.rs");
let opts = dbus_codegen::GenOpts {
connectiontype: dbus_codegen::ConnectionType::Blocking,
methodtype: None,
..Default::default()
};
let xml = std::fs::read_to_string(KORDOPHONE_XML).expect("Error reading server dbus interface");
let output =
dbus_codegen::generate(&xml, &opts).expect("Error generating client dbus interface");
std::fs::write(out_path, output).expect("Error writing client dbus code");
println!("cargo:rerun-if-changed={}", KORDOPHONE_XML);
}

View File

@@ -1,5 +0,0 @@
mod platform;
mod worker;
pub use worker::{spawn_worker, ChatMessage, ConversationSummary, Event, Request};

View File

@@ -1,189 +0,0 @@
#![cfg(target_os = "linux")]
use crate::worker::{ChatMessage, ConversationSummary, DaemonClient, Event};
use anyhow::Result;
use dbus::arg::{PropMap, RefArg};
use dbus::blocking::{Connection, Proxy};
use dbus::channel::Token;
use std::sync::mpsc::Sender;
use std::time::Duration;
const DBUS_NAME: &str = "net.buzzert.kordophonecd";
const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon";
#[allow(unused)]
mod dbus_interface {
#![allow(unused)]
include!(concat!(env!("OUT_DIR"), "/kordophone-client.rs"));
}
use dbus_interface::NetBuzzertKordophoneRepository as KordophoneRepository;
pub(crate) struct DBusClient {
conn: Connection,
signal_tokens: Vec<Token>,
}
impl DBusClient {
pub(crate) fn new() -> Result<Self> {
Ok(Self {
conn: Connection::new_session()?,
signal_tokens: Vec::new(),
})
}
fn proxy(&self) -> Proxy<&Connection> {
self.conn
.with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_millis(5000))
}
}
fn get_string(map: &PropMap, key: &str) -> String {
map.get(key)
.and_then(|v| v.0.as_str())
.unwrap_or_default()
.to_string()
}
fn get_i64(map: &PropMap, key: &str) -> i64 {
map.get(key).and_then(|v| v.0.as_i64()).unwrap_or(0)
}
fn get_u32(map: &PropMap, key: &str) -> u32 {
get_i64(map, key).try_into().unwrap_or(0)
}
fn get_vec_string(map: &PropMap, key: &str) -> Vec<String> {
map.get(key)
.and_then(|v| v.0.as_iter())
.map(|iter| {
iter.filter_map(|item| item.as_str().map(|s| s.to_string()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
}
impl DaemonClient for DBusClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>> {
let mut items = KordophoneRepository::get_conversations(&self.proxy(), limit, offset)?;
let mut conversations = items
.drain(..)
.map(|conv| {
let id = get_string(&conv, "guid");
let display_name = get_string(&conv, "display_name");
let participants = get_vec_string(&conv, "participants");
let title = if !display_name.trim().is_empty() {
display_name
} else if participants.is_empty() {
"<unknown>".to_string()
} else {
participants.join(", ")
};
ConversationSummary {
id,
title,
preview: get_string(&conv, "last_message_preview").replace('\n', " "),
unread_count: get_u32(&conv, "unread_count"),
date_unix: get_i64(&conv, "date"),
}
})
.collect::<Vec<_>>();
conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix));
Ok(conversations)
}
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>> {
let messages = KordophoneRepository::get_messages(
&self.proxy(),
&conversation_id,
&last_message_id.unwrap_or_default(),
)?;
Ok(messages
.into_iter()
.map(|msg| ChatMessage {
sender: get_string(&msg, "sender"),
text: get_string(&msg, "text"),
date_unix: get_i64(&msg, "date"),
})
.collect())
}
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
let attachment_guids: Vec<&str> = vec![];
let outgoing_id = KordophoneRepository::send_message(
&self.proxy(),
&conversation_id,
&text,
attachment_guids,
)?;
Ok(Some(outgoing_id))
}
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id)
.map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {e}"))
}
fn sync_conversation(&mut self, conversation_id: String) -> Result<()> {
KordophoneRepository::sync_conversation(&self.proxy(), &conversation_id)
.map_err(|e| anyhow::anyhow!("Failed to sync conversation: {e}"))
}
fn install_signal_handlers(&mut self, event_tx: Sender<Event>) -> Result<()> {
let conversations_tx = event_tx.clone();
let t1 = self
.proxy()
.match_signal(
move |_: dbus_interface::NetBuzzertKordophoneRepositoryConversationsUpdated,
_: &Connection,
_: &dbus::message::Message| {
let _ = conversations_tx.send(Event::ConversationsUpdated);
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match ConversationsUpdated: {e}"))?;
let messages_tx = event_tx.clone();
let t2 = self
.proxy()
.match_signal(
move |s: dbus_interface::NetBuzzertKordophoneRepositoryMessagesUpdated,
_: &Connection,
_: &dbus::message::Message| {
let _ = messages_tx.send(Event::MessagesUpdated {
conversation_id: s.conversation_id,
});
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match MessagesUpdated: {e}"))?;
let reconnected_tx = event_tx;
let t3 = self
.proxy()
.match_signal(
move |_: dbus_interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected,
_: &Connection,
_: &dbus::message::Message| {
let _ = reconnected_tx.send(Event::UpdateStreamReconnected);
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match UpdateStreamReconnected: {e}"))?;
self.signal_tokens.extend([t1, t2, t3]);
Ok(())
}
fn poll(&mut self, timeout: Duration) -> Result<()> {
self.conn.process(timeout)?;
Ok(())
}
}

View File

@@ -1,233 +0,0 @@
#![cfg(target_os = "macos")]
use crate::worker::{ChatMessage, ConversationSummary, DaemonClient};
use anyhow::Result;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use xpc_connection::Message;
const SERVICE_NAME: &str = "net.buzzert.kordophonecd\0";
struct XpcTransport {
connection: xpc_connection_sys::xpc_connection_t,
}
impl XpcTransport {
fn connect(name: impl AsRef<CStr>) -> Self {
use xpc_connection_sys::xpc_connection_create_mach_service;
use xpc_connection_sys::xpc_connection_resume;
let name = name.as_ref();
let connection =
unsafe { xpc_connection_create_mach_service(name.as_ptr(), std::ptr::null_mut(), 0) };
unsafe { xpc_connection_resume(connection) };
Self { connection }
}
fn send_with_reply(&self, message: Message) -> Message {
use xpc_connection::message_to_xpc_object;
use xpc_connection::xpc_object_to_message;
use xpc_connection_sys::{xpc_connection_send_message_with_reply_sync, xpc_release};
unsafe {
let xobj = message_to_xpc_object(message);
let reply = xpc_connection_send_message_with_reply_sync(self.connection, xobj);
xpc_release(xobj);
let msg = xpc_object_to_message(reply);
if !reply.is_null() {
xpc_release(reply);
}
msg
}
}
}
impl Drop for XpcTransport {
fn drop(&mut self) {
use xpc_connection_sys::xpc_object_t;
use xpc_connection_sys::xpc_release;
unsafe { xpc_release(self.connection as xpc_object_t) };
}
}
pub(crate) struct XpcClient {
transport: XpcTransport,
}
impl XpcClient {
pub(crate) fn new() -> Result<Self> {
let mach_port_name = CString::new(SERVICE_NAME).unwrap();
Ok(Self {
transport: XpcTransport::connect(&mach_port_name),
})
}
fn key(s: &str) -> CString {
CString::new(s).unwrap()
}
fn request(name: &str, arguments: Option<HashMap<CString, Message>>) -> Message {
let mut root: HashMap<CString, Message> = HashMap::new();
root.insert(Self::key("name"), Message::String(Self::key(name)));
if let Some(args) = arguments {
root.insert(Self::key("arguments"), Message::Dictionary(args));
}
Message::Dictionary(root)
}
fn get_string(map: &HashMap<CString, Message>, key: &str) -> Option<String> {
match map.get(&Self::key(key)) {
Some(Message::String(s)) => Some(s.to_string_lossy().into_owned()),
_ => None,
}
}
fn get_i64_from_str(map: &HashMap<CString, Message>, key: &str) -> i64 {
Self::get_string(map, key)
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0)
}
}
impl DaemonClient for XpcClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>> {
let mut args = HashMap::new();
args.insert(Self::key("limit"), Message::String(Self::key(&limit.to_string())));
args.insert(Self::key("offset"), Message::String(Self::key(&offset.to_string())));
let reply = self
.transport
.send_with_reply(Self::request("GetConversations", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected conversations response");
};
let Some(Message::Array(items)) = map.get(&Self::key("conversations")) else {
anyhow::bail!("Missing conversations in response");
};
let mut conversations = Vec::new();
for item in items {
let Message::Dictionary(conv) = item else { continue };
let id = Self::get_string(conv, "guid").unwrap_or_default();
let display_name = Self::get_string(conv, "display_name").unwrap_or_default();
let preview = Self::get_string(conv, "last_message_preview").unwrap_or_default();
let unread_count = Self::get_i64_from_str(conv, "unread_count") as u32;
let date_unix = Self::get_i64_from_str(conv, "date");
let participants = match conv.get(&Self::key("participants")) {
Some(Message::Array(arr)) => arr
.iter()
.filter_map(|m| match m {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
.collect::<Vec<_>>(),
_ => Vec::new(),
};
let title = if !display_name.trim().is_empty() {
display_name
} else if participants.is_empty() {
"<unknown>".to_string()
} else {
participants.join(", ")
};
conversations.push(ConversationSummary {
id,
title,
preview: preview.replace('\n', " "),
unread_count,
date_unix,
});
}
conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix));
Ok(conversations)
}
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
if let Some(last) = last_message_id {
args.insert(Self::key("last_message_id"), Message::String(Self::key(&last)));
}
let reply = self
.transport
.send_with_reply(Self::request("GetMessages", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected messages response");
};
let Some(Message::Array(items)) = map.get(&Self::key("messages")) else {
anyhow::bail!("Missing messages in response");
};
let mut messages = Vec::new();
for item in items {
let Message::Dictionary(msg) = item else { continue };
messages.push(ChatMessage {
sender: Self::get_string(msg, "sender").unwrap_or_default(),
text: Self::get_string(msg, "text").unwrap_or_default(),
date_unix: Self::get_i64_from_str(msg, "date"),
});
}
Ok(messages)
}
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
args.insert(Self::key("text"), Message::String(Self::key(&text)));
let reply = self
.transport
.send_with_reply(Self::request("SendMessage", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected send response");
};
Ok(Self::get_string(&map, "uuid"))
}
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
let _ = self
.transport
.send_with_reply(Self::request("MarkConversationAsRead", Some(args)));
Ok(())
}
fn sync_conversation(&mut self, conversation_id: String) -> Result<()> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
let _ = self
.transport
.send_with_reply(Self::request("SyncConversation", Some(args)));
Ok(())
}
}

View File

@@ -1,24 +0,0 @@
use crate::worker::DaemonClient;
use anyhow::Result;
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "macos")]
mod macos;
pub(crate) fn new_daemon_client() -> Result<Box<dyn DaemonClient>> {
#[cfg(target_os = "linux")]
{
Ok(Box::new(linux::DBusClient::new()?))
}
#[cfg(target_os = "macos")]
{
Ok(Box::new(macos::XpcClient::new()?))
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
anyhow::bail!("Unsupported platform")
}
}

View File

@@ -1,133 +0,0 @@
use crate::platform;
use anyhow::Result;
use std::sync::mpsc;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct ConversationSummary {
pub id: String,
pub title: String,
pub preview: String,
pub unread_count: u32,
pub date_unix: i64,
}
#[derive(Clone, Debug)]
pub struct ChatMessage {
pub sender: String,
pub text: String,
pub date_unix: i64,
}
pub enum Request {
RefreshConversations,
RefreshMessages { conversation_id: String },
SendMessage { conversation_id: String, text: String },
MarkRead { conversation_id: String },
SyncConversation { conversation_id: String },
}
pub enum Event {
Conversations(Vec<ConversationSummary>),
Messages {
conversation_id: String,
messages: Vec<ChatMessage>,
},
MessageSent {
conversation_id: String,
outgoing_id: Option<String>,
},
MarkedRead,
ConversationSyncTriggered { conversation_id: String },
ConversationsUpdated,
MessagesUpdated { conversation_id: String },
UpdateStreamReconnected,
Error(String),
}
pub fn spawn_worker(
request_rx: std::sync::mpsc::Receiver<Request>,
event_tx: std::sync::mpsc::Sender<Event>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
let mut client = match platform::new_daemon_client() {
Ok(c) => c,
Err(e) => {
let _ = event_tx.send(Event::Error(format!("Failed to connect to daemon: {e}")));
return;
}
};
if let Err(e) = client.install_signal_handlers(event_tx.clone()) {
let _ = event_tx.send(Event::Error(format!("Failed to install daemon signals: {e}")));
}
loop {
match request_rx.recv_timeout(Duration::from_millis(100)) {
Ok(req) => {
let res = match req {
Request::RefreshConversations => client
.get_conversations(200, 0)
.map(Event::Conversations),
Request::RefreshMessages { conversation_id } => client
.get_messages(conversation_id.clone(), None)
.map(|messages| Event::Messages {
conversation_id,
messages,
}),
Request::SendMessage {
conversation_id,
text,
} => client
.send_message(conversation_id.clone(), text)
.map(|outgoing_id| Event::MessageSent {
conversation_id,
outgoing_id,
}),
Request::MarkRead { conversation_id } => client
.mark_conversation_as_read(conversation_id.clone())
.map(|_| Event::MarkedRead),
Request::SyncConversation { conversation_id } => client
.sync_conversation(conversation_id.clone())
.map(|_| Event::ConversationSyncTriggered { conversation_id }),
};
match res {
Ok(evt) => {
let _ = event_tx.send(evt);
}
Err(e) => {
let _ = event_tx.send(Event::Error(format!("{e}")));
}
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
if let Err(e) = client.poll(Duration::from_millis(0)) {
let _ = event_tx.send(Event::Error(format!("Daemon polling error: {e}")));
}
}
})
}
pub(crate) trait DaemonClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>>;
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>>;
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>>;
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>;
fn sync_conversation(&mut self, conversation_id: String) -> Result<()>;
fn install_signal_handlers(&mut self, _event_tx: mpsc::Sender<Event>) -> Result<()> {
Ok(())
}
fn poll(&mut self, timeout: Duration) -> Result<()> {
std::thread::sleep(timeout);
Ok(())
}
}

View File

@@ -23,6 +23,7 @@ tokio-condvar = "0.3.0"
uuid = "1.16.0"
once_cell = "1.19.0"
mime_guess = "2.0"
notify = { package = "notify-rust", version = "4.10.0" }
# D-Bus dependencies only on Linux
[target.'cfg(target_os = "linux")'.dependencies]
@@ -33,7 +34,8 @@ dbus-tree = "0.9.2"
# D-Bus codegen only on Linux
[target.'cfg(target_os = "linux")'.build-dependencies]
dbus-codegen = { version = "0.10.0", default-features = false }
dbus-codegen = "0.10.0"
dbus-crossroads = "0.5.1"
# XPC (libxpc) interface for macOS IPC
[target.'cfg(target_os = "macos")'.dependencies]
@@ -47,18 +49,5 @@ serde = { version = "1.0", features = ["derive"] }
assets = [
{ source = "../target/release/kordophoned", dest = "/usr/libexec/kordophoned", mode = "755" },
{ source = "../target/release/kpcli", dest = "/usr/bin/kpcli", mode = "755" },
{ source = "../target/release/kptui", dest = "/usr/bin/kptui", mode = "755" },
{ source = "include/net.buzzert.kordophonecd.service", dest = "/usr/share/dbus-1/services/net.buzzert.kordophonecd.service", mode = "644" },
]
[package.metadata.deb]
maintainer = "James Magahern <james@magahern.com>"
copyright = "2026, James Magahern <james@magahern.com>"
section = "net"
priority = "optional"
assets = [
["../target/release/kordophoned", "usr/libexec/kordophoned", "755"],
["../target/release/kpcli", "usr/bin/kpcli", "755"],
["../target/release/kptui", "usr/bin/kptui", "755"],
["include/net.buzzert.kordophonecd.service", "usr/share/dbus-1/services/net.buzzert.kordophonecd.service", "644"],
]

View File

@@ -14,17 +14,6 @@ strip -s target/release/kordophoned
cargo generate-rpm
```
# Building DEB
Make sure cargo-deb is installed, `cargo install cargo-deb`.
Then:
```bash
cd core
cargo deb -p kordophoned
```
## Running on macOS
Before any client can talk to the kordophone daemon on macOS, the XPC service needs to be manually registered with launchd.
@@ -45,3 +34,4 @@ and the following in Info.plist:
<key>MachServices</key><dict><key>net.buzzert.kordophonecd</key><true/></dict>
<key>KeepAlive</key><true/>
```

View File

@@ -27,4 +27,3 @@ fn main() {
println!("cargo:rerun-if-changed={}", KORDOPHONE_XML);
}

View File

@@ -73,13 +73,14 @@
'sender' (string): Sender display name
'attachments' (array of dictionaries): List of attachments
'guid' (string): Attachment GUID
'path' (string): Attachment path
'preview_path' (string): Preview attachment path
'downloaded' (boolean): Whether the attachment is downloaded
'preview_downloaded' (boolean): Whether the preview is downloaded
'metadata' (dictionary, optional): Attachment metadata
'attribution_info' (dictionary, optional): Attribution info
'width' (int32, optional): Width
'height' (int32, optional): Height
Use GetAttachmentInfo for full/preview paths."/>
'height' (int32, optional): Height"/>
</arg>
</method>
@@ -102,6 +103,13 @@
"/>
</method>
<method name="TestNotification">
<arg type="s" name="summary" direction="in"/>
<arg type="s" name="body" direction="in"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Displays a test desktop notification with the provided summary and body."/>
</method>
<signal name="MessagesUpdated">
<arg type="s" name="conversation_id" direction="in"/>
<annotation name="org.freedesktop.DBus.DocString"
@@ -128,20 +136,6 @@
"/>
</method>
<method name="OpenAttachmentFd">
<arg type="s" name="attachment_id" direction="in"/>
<arg type="b" name="preview" direction="in"/>
<arg type="h" name="fd" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Opens a read-only file descriptor for an attachment path.
Arguments:
attachment_id: the attachment GUID
preview: whether to open the preview path (true) or full path (false)
Returns:
fd: a Unix file descriptor to read attachment bytes"/>
</method>
<method name="DownloadAttachment">
<arg type="s" name="attachment_id" direction="in"/>
<arg type="b" name="preview" direction="in"/>

View File

@@ -115,57 +115,39 @@ impl AttachmentStore {
base_path: base_path,
metadata: None,
mime_type: None,
cached_full_path: None,
cached_preview_path: None,
};
// Best-effort: if files already exist, cache their exact paths and infer MIME type.
// Best-effort: if a file already exists, try to infer MIME type from extension
let kind = "full";
let stem = attachment
.base_path
.file_name()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_default();
let legacy_full = attachment.base_path.with_extension("full");
if legacy_full.exists() {
attachment.cached_full_path = Some(legacy_full);
}
let legacy_preview = attachment.base_path.with_extension("preview");
if legacy_preview.exists() {
attachment.cached_preview_path = Some(legacy_preview);
}
if attachment.cached_full_path.is_none() || attachment.cached_preview_path.is_none() {
let full_prefix = format!("{}.full.", stem);
let preview_prefix = format!("{}.preview.", stem);
let legacy = attachment.base_path.with_extension(kind);
let existing_path = if legacy.exists() {
Some(legacy)
} else {
let prefix = format!("{}.{}.", stem, kind);
let parent = attachment
.base_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."));
let mut found: Option<PathBuf> = None;
if let Ok(entries) = std::fs::read_dir(parent) {
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if !name.ends_with(".download") {
if attachment.cached_full_path.is_none() && name.starts_with(&full_prefix) {
attachment.cached_full_path = Some(entry.path());
continue;
}
if attachment.cached_preview_path.is_none()
&& name.starts_with(&preview_prefix)
{
attachment.cached_preview_path = Some(entry.path());
}
if name.starts_with(&prefix) && !name.ends_with(".download") {
found = Some(entry.path());
break;
}
}
}
}
found
};
if let Some(existing_full) = &attachment.cached_full_path {
if let Some(m) = mime_guess::from_path(existing_full).first_raw() {
if let Some(existing) = existing_path {
if let Some(m) = mime_guess::from_path(&existing).first_raw() {
attachment.mime_type = Some(m.to_string());
}
}
@@ -360,9 +342,6 @@ impl AttachmentStore {
match kind {
AttachmentStoreError::AttachmentAlreadyDownloaded => {
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", &_guid);
let _ = daemon_event_sink
.send(DaemonEvent::AttachmentDownloaded(_guid.clone()))
.await;
}
AttachmentStoreError::DownloadAlreadyInProgress => {
// Already logged a warning where detected
@@ -381,10 +360,6 @@ impl AttachmentStore {
log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid);
} else {
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid);
let _ = self
.daemon_event_sink
.send(DaemonEvent::AttachmentDownloaded(guid))
.await;
}
}

View File

@@ -6,8 +6,6 @@ use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;
const LOOKUP_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Clone)]
pub struct EDSContactResolverBackend;
@@ -191,10 +189,11 @@ impl ContactResolverBackend for EDSContactResolverBackend {
None => return None,
};
let address_book_proxy =
handle
.connection
.with_proxy(&handle.bus_name, &handle.object_path, LOOKUP_TIMEOUT);
let address_book_proxy = handle.connection.with_proxy(
&handle.bus_name,
&handle.object_path,
Duration::from_secs(60),
);
ensure_address_book_open(&address_book_proxy);
@@ -256,10 +255,11 @@ impl ContactResolverBackend for EDSContactResolverBackend {
None => return None,
};
let address_book_proxy =
handle
.connection
.with_proxy(&handle.bus_name, &handle.object_path, LOOKUP_TIMEOUT);
let address_book_proxy = handle.connection.with_proxy(
&handle.bus_name,
&handle.object_path,
Duration::from_secs(60),
);
ensure_address_book_open(&address_book_proxy);

View File

@@ -47,8 +47,8 @@ pub type AnyContactID = String;
#[derive(Clone)]
pub struct ContactResolver<T: ContactResolverBackend> {
backend: T,
display_name_cache: HashMap<AnyContactID, Option<String>>,
contact_id_cache: HashMap<String, Option<AnyContactID>>,
display_name_cache: HashMap<AnyContactID, String>,
contact_id_cache: HashMap<String, AnyContactID>,
}
impl<T: ContactResolverBackend> ContactResolver<T>
@@ -67,25 +67,29 @@ where
pub fn resolve_contact_id(&mut self, address: &str) -> Option<AnyContactID> {
if let Some(id) = self.contact_id_cache.get(address) {
return id.clone();
return Some(id.clone());
}
let id = self.backend.resolve_contact_id(address).map(|id| id.into());
self.contact_id_cache
.insert(address.to_string(), id.clone());
if let Some(ref id) = id {
self.contact_id_cache
.insert(address.to_string(), id.clone());
}
id
}
pub fn get_contact_display_name(&mut self, contact_id: &AnyContactID) -> Option<String> {
if let Some(display_name) = self.display_name_cache.get(contact_id) {
return display_name.clone();
return Some(display_name.clone());
}
let backend_contact_id: T::ContactID = T::ContactID::from((*contact_id).clone());
let display_name = self.backend.get_contact_display_name(&backend_contact_id);
self.display_name_cache
.insert(contact_id.to_string(), display_name.clone());
if let Some(ref display_name) = display_name {
self.display_name_cache
.insert(contact_id.to_string(), display_name.clone());
}
display_name
}

View File

@@ -61,6 +61,9 @@ pub enum Event {
/// - reply: The outgoing message ID (not the server-assigned message ID).
SendMessage(String, String, Vec<String>, Reply<Uuid>),
/// Triggers a manual test notification.
TestNotification(String, String, Reply<Result<(), String>>),
/// Notifies the daemon that a message has been sent.
/// Parameters:
/// - message: The message that was sent.

View File

@@ -15,11 +15,13 @@ use std::collections::HashMap;
use std::error::Error;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::sync::{
broadcast,
mpsc::{Receiver, Sender},
Mutex,
};
use uuid::Uuid;
use kordophone_db::{
@@ -42,6 +44,9 @@ mod post_office;
use post_office::Event as PostOfficeEvent;
use post_office::PostOffice;
mod notifier;
use notifier::NotificationService;
mod models;
pub use models::Attachment;
pub use models::Message;
@@ -73,14 +78,11 @@ pub mod target {
pub static DAEMON: &str = "daemon";
}
const GET_MESSAGES_INITIAL_WINDOW: usize = 300;
pub struct Daemon {
pub event_sender: Sender<Event>,
event_receiver: Receiver<Event>,
signal_receiver: Option<Receiver<Signal>>,
signal_sender: Sender<Signal>,
signal_sender: broadcast::Sender<Signal>,
post_office_sink: Sender<PostOfficeEvent>,
post_office_source: Option<Receiver<PostOfficeEvent>>,
@@ -90,6 +92,7 @@ pub struct Daemon {
attachment_store_sink: Option<Sender<AttachmentStoreEvent>>,
update_monitor_command_tx: Option<Sender<UpdateMonitorCommand>>,
notifier: Arc<NotificationService>,
version: String,
database: Arc<Mutex<Database>>,
runtime: tokio::runtime::Runtime,
@@ -106,7 +109,7 @@ impl Daemon {
// Create event channels
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100);
let (signal_sender, _) = tokio::sync::broadcast::channel(100);
let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100);
// Create background task runtime
@@ -117,13 +120,14 @@ impl Daemon {
let database_impl = Database::new(&database_path.to_string_lossy())?;
let database = Arc::new(Mutex::new(database_impl));
let notifier = Arc::new(NotificationService::new());
Ok(Self {
version: env!("CARGO_PKG_VERSION").to_string(),
notifier,
database,
event_receiver,
event_sender,
signal_receiver: Some(signal_receiver),
signal_sender,
post_office_sink,
post_office_source: Some(post_office_source),
@@ -168,6 +172,16 @@ impl Daemon {
attachment_store.run().await;
});
// Notification listener
{
let notifier = self.notifier.clone();
let mut signal_rx = self.signal_sender.subscribe();
let database = self.database.clone();
tokio::spawn(async move {
notifier.listen(signal_rx, database).await;
});
}
while let Some(event) = self.event_receiver.recv().await {
log::debug!(target: target::EVENT, "Received event: {:?}", event);
self.handle_event(event).await;
@@ -188,14 +202,14 @@ impl Daemon {
async fn handle_event(&mut self, event: Event) {
match event {
Event::GetVersion(reply) => {
let _ = reply.send(self.version.clone());
reply.send(self.version.clone()).unwrap();
}
Event::SyncConversationList(reply) => {
self.spawn_conversation_list_sync();
// This is a background operation, so return right away.
let _ = reply.send(());
reply.send(()).unwrap();
}
Event::SyncAllConversations(reply) => {
@@ -210,7 +224,7 @@ impl Daemon {
});
// This is a background operation, so return right away.
let _ = reply.send(());
reply.send(()).unwrap();
}
Event::SyncConversation(conversation_id, reply) => {
@@ -228,7 +242,7 @@ impl Daemon {
}
});
let _ = reply.send(());
reply.send(()).unwrap();
}
Event::MarkConversationAsRead(conversation_id, reply) => {
@@ -240,7 +254,7 @@ impl Daemon {
}
});
let _ = reply.send(());
reply.send(()).unwrap();
}
Event::UpdateConversationMetadata(conversation, reply) => {
@@ -253,7 +267,7 @@ impl Daemon {
}
});
let _ = reply.send(());
reply.send(()).unwrap();
}
Event::UpdateStreamReconnected => {
@@ -263,15 +277,16 @@ impl Daemon {
self.spawn_conversation_list_sync();
// Send signal to the client that the update stream has been reconnected.
self.signal_sender
.send(Signal::UpdateStreamReconnected)
.await
.unwrap();
Self::send_signal(
&self.signal_sender,
Signal::UpdateStreamReconnected,
target::UPDATES,
);
}
Event::GetAllConversations(limit, offset, reply) => {
let conversations = self.get_conversations_limit_offset(limit, offset).await;
let _ = reply.send(conversations);
reply.send(conversations).unwrap();
}
Event::GetAllSettings(reply) => {
@@ -280,7 +295,7 @@ impl Daemon {
Settings::default()
});
let _ = reply.send(settings);
reply.send(settings).unwrap();
}
Event::UpdateSettings(settings, reply) => {
@@ -312,14 +327,12 @@ impl Daemon {
}
}
let _ = reply.send(());
reply.send(()).unwrap();
}
Event::GetMessages(conversation_id, last_message_id, reply) => {
let messages = self.get_messages(conversation_id, last_message_id).await;
if reply.send(messages).is_err() {
log::warn!(target: target::EVENT, "GetMessages reply receiver dropped before send");
}
reply.send(messages).unwrap();
}
Event::DeleteAllConversations(reply) => {
@@ -327,21 +340,17 @@ impl Daemon {
log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e);
});
let _ = reply.send(());
reply.send(()).unwrap();
}
Event::SendMessage(conversation_id, text, attachment_guids, reply) => {
let conversation_id = conversation_id.clone();
let uuid = self
.enqueue_outgoing_message(text, conversation_id.clone(), attachment_guids)
.await;
let _ = reply.send(uuid);
reply.send(uuid).unwrap();
// Send message updated signal, we have a placeholder message we will return.
self.signal_sender
.send(Signal::MessagesUpdated(conversation_id.clone()))
.await
.unwrap();
// Notify clients that messages have changed (e.g., to refresh placeholders).
self.emit_messages_updated(conversation_id);
}
Event::MessageSent(message, outgoing_message, conversation_id) => {
@@ -371,11 +380,20 @@ impl Daemon {
.get_mut(&conversation_id)
.map(|messages| messages.retain(|m| m.guid != outgoing_message.guid));
// Send message updated signal.
self.signal_sender
.send(Signal::MessagesUpdated(conversation_id))
.await
.unwrap();
// Notify clients to refresh the conversation after the final message arrives.
self.emit_messages_updated(conversation_id);
}
Event::TestNotification(summary, body, reply) => {
let result = self
.signal_sender
.send(Signal::Internal(InternalSignal::TestNotification {
summary,
body,
}))
.map(|_| ())
.map_err(|e| e.to_string());
reply.send(result).unwrap();
}
Event::GetAttachment(guid, reply) => {
@@ -400,17 +418,18 @@ impl Daemon {
.await
.unwrap();
let _ = reply.send(());
reply.send(()).unwrap();
}
Event::AttachmentDownloaded(attachment_id) => {
log::debug!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id);
// Send signal to the client that the attachment has been downloaded.
self.signal_sender
.send(Signal::AttachmentDownloaded(attachment_id))
.await
.unwrap();
Self::send_signal(
&self.signal_sender,
Signal::AttachmentDownloaded(attachment_id),
target::ATTACHMENTS,
);
}
Event::UploadAttachment(path, reply) => {
@@ -425,17 +444,17 @@ impl Daemon {
Event::AttachmentUploaded(upload_guid, attachment_guid) => {
log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid);
self.signal_sender
.send(Signal::AttachmentUploaded(upload_guid, attachment_guid))
.await
.unwrap();
Self::send_signal(
&self.signal_sender,
Signal::AttachmentUploaded(upload_guid, attachment_guid),
target::ATTACHMENTS,
);
}
}
}
/// Panics if the signal receiver has already been taken.
pub fn obtain_signal_receiver(&mut self) -> Receiver<Signal> {
self.signal_receiver.take().unwrap()
pub fn subscribe_signals(&self) -> broadcast::Receiver<Signal> {
self.signal_sender.subscribe()
}
async fn get_conversations_limit_offset(
@@ -450,12 +469,15 @@ impl Daemon {
.await
}
fn emit_messages_updated(&self, conversation_id: String) {
Self::send_messages_updated(&self.signal_sender, conversation_id);
}
async fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<MessageID>,
_last_message_id: Option<MessageID>,
) -> Vec<Message> {
let started = Instant::now();
// Get outgoing messages for this conversation.
let empty_vec: Vec<OutgoingMessage> = vec![];
let outgoing_messages: &Vec<OutgoingMessage> = self
@@ -477,9 +499,8 @@ impl Daemon {
.await;
// Convert DB messages to daemon model, substituting local_id when an alias exists.
let mut result: Vec<Message> = Vec::with_capacity(
db_messages.len() + outgoing_messages.len(),
);
let mut result: Vec<Message> =
Vec::with_capacity(db_messages.len() + outgoing_messages.len());
for m in db_messages.into_iter() {
let server_id = m.id.clone();
let mut dm: Message = m.into();
@@ -494,27 +515,6 @@ impl Daemon {
result.push(om.into());
}
if let Some(last_id) = last_message_id {
if let Some(last_index) = result.iter().position(|message| message.id == last_id) {
result = result.split_off(last_index + 1);
}
} else if result.len() > GET_MESSAGES_INITIAL_WINDOW {
let dropped = result.len() - GET_MESSAGES_INITIAL_WINDOW;
result = result.split_off(dropped);
log::debug!(
target: target::EVENT,
"GetMessages initial window applied: dropped {} older messages",
dropped
);
}
log::debug!(
target: target::EVENT,
"GetMessages completed in {}ms: {} messages",
started.elapsed().as_millis(),
result.len()
);
result
}
@@ -548,7 +548,7 @@ impl Daemon {
async fn sync_conversation_list(
database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>,
signal_sender: &broadcast::Sender<Signal>,
) -> Result<()> {
log::info!(target: target::SYNC, "Starting list conversation sync");
@@ -600,7 +600,7 @@ impl Daemon {
}
// Send conversations updated signal
signal_sender.send(Signal::ConversationsUpdated).await?;
Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC);
log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations);
Ok(())
@@ -608,7 +608,7 @@ impl Daemon {
async fn sync_all_conversations_impl(
database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>,
signal_sender: &broadcast::Sender<Signal>,
) -> Result<()> {
log::info!(target: target::SYNC, "Starting full conversation sync");
@@ -636,7 +636,7 @@ impl Daemon {
}
// Send conversations updated signal.
signal_sender.send(Signal::ConversationsUpdated).await?;
Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC);
log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations);
Ok(())
@@ -644,7 +644,7 @@ impl Daemon {
async fn sync_conversation_impl(
database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>,
signal_sender: &broadcast::Sender<Signal>,
conversation_id: String,
) -> Result<()> {
log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id);
@@ -702,9 +702,7 @@ impl Daemon {
// Send messages updated signal, if we actually inserted any messages.
if num_messages > 0 {
signal_sender
.send(Signal::MessagesUpdated(conversation_id.clone()))
.await?;
Self::send_messages_updated(signal_sender, conversation_id.clone());
}
log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id);
@@ -725,14 +723,14 @@ impl Daemon {
async fn update_conversation_metadata_impl(
database: &mut Arc<Mutex<Database>>,
conversation: Conversation,
signal_sender: &Sender<Signal>,
signal_sender: &broadcast::Sender<Signal>,
) -> Result<()> {
log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid);
let updated = database
.with_repository(|r| r.merge_conversation_metadata(conversation))
.await?;
if updated {
signal_sender.send(Signal::ConversationsUpdated).await?;
Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::DAEMON);
}
Ok(())
@@ -747,6 +745,40 @@ impl Daemon {
self.database.with_settings(|s| settings.save(s)).await
}
fn send_signal(sender: &broadcast::Sender<Signal>, signal: Signal, log_target: &str) {
if let Err(error) = sender.send(signal) {
log::trace!(
target: log_target,
"Signal delivery skipped (no listeners?): {}",
error
);
}
}
fn send_internal(sender: &broadcast::Sender<Signal>, signal: InternalSignal) {
if let Err(error) = sender.send(Signal::Internal(signal)) {
log::trace!(
target: target::DAEMON,
"Internal signal delivery skipped: {}",
error
);
}
}
fn send_messages_updated(sender: &broadcast::Sender<Signal>, conversation_id: String) {
Self::send_internal(
sender,
InternalSignal::MessagesUpdated(conversation_id.clone()),
);
if let Err(error) = sender.send(Signal::MessagesUpdated(conversation_id)) {
log::warn!(
target: target::DAEMON,
"Failed to send MessagesUpdated signal: {}",
error
);
}
}
async fn get_client_impl(
database: &mut Arc<Mutex<Database>>,
) -> Result<HTTPAPIClient<DatabaseAuthenticationStore>> {
@@ -779,9 +811,11 @@ impl Daemon {
})
.await?;
self.signal_sender
.send(Signal::ConversationsUpdated)
.await?;
Self::send_signal(
&self.signal_sender,
Signal::ConversationsUpdated,
target::SYNC,
);
Ok(())
}

View File

@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone)]
pub struct AttachmentMetadata {
@@ -17,8 +17,6 @@ pub struct Attachment {
pub base_path: PathBuf,
pub metadata: Option<AttachmentMetadata>,
pub mime_type: Option<String>,
pub cached_full_path: Option<PathBuf>,
pub cached_preview_path: Option<PathBuf>,
}
impl Attachment {
@@ -46,21 +44,17 @@ impl Attachment {
}
pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf {
if !scratch {
let cached = if preview {
self.cached_preview_path.as_ref()
} else {
self.cached_full_path.as_ref()
};
if let Some(path) = cached {
return path.clone();
}
}
// Determine whether this is a preview or full attachment.
let kind = if preview { "preview" } else { "full" };
// If not a scratch path, and a file already exists on disk with a concrete
// file extension (e.g., guid.full.jpg), return that existing path.
if !scratch {
if let Some(existing) = self.find_existing_path(preview) {
return existing;
}
}
// Fall back to constructing a path using known info. If we know the MIME type,
// prefer an extension guessed from it; otherwise keep legacy naming.
let ext_from_mime = self
@@ -82,15 +76,44 @@ impl Attachment {
}
pub fn is_downloaded(&self, preview: bool) -> bool {
let path = self.get_path_for_preview(preview);
std::fs::exists(&path).expect(
std::fs::exists(&self.get_path_for_preview(preview)).expect(
format!(
"Wasn't able to check for the existence of an attachment file path at {}",
path.display()
&self.get_path_for_preview(preview).display()
)
.as_str(),
)
}
fn find_existing_path(&self, preview: bool) -> Option<PathBuf> {
let kind = if preview { "preview" } else { "full" };
// First, check legacy path without a concrete file extension.
let legacy = self.base_path.with_extension(kind);
if legacy.exists() {
return Some(legacy);
}
// Next, search for a filename like: <guid>.<kind>.<ext>
let file_stem = self
.base_path
.file_name()
.map(|s| s.to_string_lossy().to_string())?;
let prefix = format!("{}.{}.", file_stem, kind);
let parent = self.base_path.parent().unwrap_or_else(|| Path::new("."));
if let Ok(dir) = std::fs::read_dir(parent) {
for entry in dir.flatten() {
let file_name = entry.file_name();
let name = file_name.to_string_lossy();
if name.starts_with(&prefix) && !name.ends_with(".download") {
return Some(entry.path());
}
}
}
None
}
}
impl From<kordophone::model::message::AttachmentMetadata> for AttachmentMetadata {

View File

@@ -0,0 +1,288 @@
use super::contact_resolver::{ContactResolver, DefaultContactResolverBackend};
use super::models::message::Participant;
use super::signals::{InternalSignal, Signal};
use super::{target, Message};
use kordophone_db::{
database::{Database, DatabaseAccess},
models::Conversation,
models::Participant as DbParticipant,
};
use notify::Notification;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};
/// Centralised notification helper used by platform transports (D-Bus, XPC, …).
pub struct NotificationService {
resolver: Mutex<ContactResolver<DefaultContactResolverBackend>>,
}
impl NotificationService {
pub fn new() -> Self {
Self {
resolver: Mutex::new(ContactResolver::new(
DefaultContactResolverBackend::default(),
)),
}
}
pub async fn listen(
self: Arc<Self>,
mut signal_rx: broadcast::Receiver<Signal>,
database: Arc<Mutex<Database>>,
) {
log::trace!(target: target::DAEMON, "NotificationService listener started");
loop {
match signal_rx.recv().await {
Ok(Signal::Internal(InternalSignal::MessagesUpdated(conversation_id))) => {
log::trace!(
target: target::DAEMON,
"NotificationService received MessagesUpdated for {}",
conversation_id
);
self.notify_new_messages(&database, &conversation_id).await;
}
Ok(Signal::Internal(InternalSignal::TestNotification { summary, body })) => {
log::trace!(
target: target::DAEMON,
"NotificationService received TestNotification"
);
if let Err(error) = self.send_manual(&summary, &body) {
log::warn!(
target: target::DAEMON,
"Failed to display test notification: {}",
error
);
}
}
Ok(other) => {
log::trace!(
target: target::DAEMON,
"NotificationService ignoring signal: {:?}",
other
);
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
target: target::DAEMON,
"NotificationService lagged; skipped {} signals",
skipped
);
}
Err(broadcast::error::RecvError::Closed) => {
log::trace!(target: target::DAEMON, "NotificationService listener exiting");
break;
}
}
}
}
/// Checks whether a new user-visible notification should be shown for the
/// given conversation and displays it if appropriate.
pub async fn notify_new_messages(
&self,
database: &Arc<Mutex<Database>>,
conversation_id: &str,
) {
log::trace!(
target: target::DAEMON,
"NotificationService preparing payload for {}",
conversation_id
);
if let Some((summary, body)) = self.prepare_payload(database, conversation_id).await {
log::trace!(
target: target::DAEMON,
"NotificationService displaying notification for {}",
conversation_id
);
if let Err(error) = self.show_notification(&summary, &body) {
log::warn!(
target: target::DAEMON,
"Failed to display notification for conversation {}: {}",
conversation_id,
error
);
}
} else {
log::trace!(
target: target::DAEMON,
"NotificationService skipping notification for {}",
conversation_id
);
}
}
/// Displays a manual test notification.
pub fn send_manual(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> {
log::trace!(
target: target::DAEMON,
"NotificationService sending manual notification"
);
self.show_notification(summary, body)
}
async fn prepare_payload(
&self,
database: &Arc<Mutex<Database>>,
conversation_id: &str,
) -> Option<(String, String)> {
let conversation_opt = database
.lock()
.await
.with_repository(|r| r.get_conversation_by_guid(conversation_id))
.await;
let conversation = match conversation_opt {
Ok(Some(conv)) => conv,
Ok(None) => {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} not found",
conversation_id
);
return None;
}
Err(err) => {
log::warn!(
target: target::DAEMON,
"Notification lookup failed for conversation {}: {}",
conversation_id,
err
);
return None;
}
};
if conversation.unread_count == 0 {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} has no unread messages",
conversation_id
);
return None;
}
let last_message_opt = database
.lock()
.await
.with_repository(|r| r.get_last_message_for_conversation(conversation_id))
.await;
let last_message: Message = match last_message_opt {
Ok(Some(message)) => message.into(),
Ok(None) => {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} has no messages",
conversation_id
);
return None;
}
Err(err) => {
log::warn!(
target: target::DAEMON,
"Notification lookup failed for conversation {}: {}",
conversation_id,
err
);
return None;
}
};
if matches!(last_message.sender, Participant::Me) {
log::trace!(
target: target::DAEMON,
"NotificationService: last message in {} was sent by self",
conversation_id
);
return None;
}
let mut resolver = self.resolver.lock().await;
let summary = self.conversation_display_name(&conversation, &mut resolver);
let sender_display_name =
self.resolve_participant_display_name(&last_message.sender, &mut resolver);
let mut message_text = last_message.text.replace('\u{FFFC}', "");
if message_text.trim().is_empty() {
if !last_message.attachments.is_empty() {
message_text = "Sent an attachment".to_string();
} else {
message_text = "Sent a message".to_string();
}
}
let body = if sender_display_name.is_empty() {
message_text
} else {
format!("{}: {}", sender_display_name, message_text)
};
Some((summary, body))
}
fn conversation_display_name(
&self,
conversation: &Conversation,
resolver: &mut ContactResolver<DefaultContactResolverBackend>,
) -> String {
if let Some(display_name) = &conversation.display_name {
if !display_name.trim().is_empty() {
return display_name.clone();
}
}
let names: Vec<String> = conversation
.participants
.iter()
.filter_map(|participant| match participant {
DbParticipant::Me => None,
DbParticipant::Remote { handle, contact_id } => {
if let Some(contact_id) = contact_id {
Some(
resolver
.get_contact_display_name(contact_id)
.unwrap_or_else(|| handle.clone()),
)
} else {
Some(handle.clone())
}
}
})
.collect();
if names.is_empty() {
"Kordophone".to_string()
} else {
names.join(", ")
}
}
fn resolve_participant_display_name(
&self,
participant: &Participant,
resolver: &mut ContactResolver<DefaultContactResolverBackend>,
) -> String {
match participant {
Participant::Me => "".to_string(),
Participant::Remote { handle, contact_id } => {
if let Some(contact_id) = contact_id {
resolver
.get_contact_display_name(contact_id)
.unwrap_or_else(|| handle.clone())
} else {
handle.clone()
}
}
}
}
fn show_notification(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> {
Notification::new()
.appname("Kordophone")
.summary(summary)
.body(body)
.show()
.map(|_| ())
}
}

View File

@@ -1,3 +1,12 @@
#[derive(Debug, Clone)]
pub enum InternalSignal {
/// Notification that new messages are available for a conversation.
MessagesUpdated(String),
/// Manual test notification request.
TestNotification { summary: String, body: String },
}
#[derive(Debug, Clone)]
pub enum Signal {
/// Emitted when the list of conversations is updated.
@@ -21,4 +30,7 @@ pub enum Signal {
/// Emitted when the update stream is reconnected after a timeout or configuration change.
UpdateStreamReconnected,
/// Internal-only signals consumed by daemon components.
Internal(InternalSignal),
}

View File

@@ -1,11 +1,8 @@
use dbus::arg;
use dbus_tree::MethodErr;
use std::fs::OpenOptions;
use std::os::fd::{FromRawFd, IntoRawFd};
use std::sync::Arc;
use std::time::Instant;
use std::{future::Future, thread};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use kordophoned::daemon::{
contact_resolver::{ContactResolver, DefaultContactResolverBackend},
@@ -25,12 +22,15 @@ use dbus_tokio::connection;
#[derive(Clone)]
pub struct DBusAgent {
event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>,
signal_receiver: Arc<Mutex<Option<broadcast::Receiver<Signal>>>>,
contact_resolver: ContactResolver<DefaultContactResolverBackend>,
}
impl DBusAgent {
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self {
pub fn new(
event_sink: mpsc::Sender<Event>,
signal_receiver: broadcast::Receiver<Signal>,
) -> Self {
Self {
event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
@@ -78,80 +78,95 @@ impl DBusAgent {
.take()
.expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await {
match signal {
Signal::ConversationsUpdated => {
log::debug!("Sending signal: ConversationsUpdated");
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::ConversationsUpdated {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::MessagesUpdated(conversation_id) => {
log::debug!(
"Sending signal: MessagesUpdated for conversation {}",
conversation_id
loop {
match receiver.recv().await {
Ok(signal) => match signal {
Signal::ConversationsUpdated => {
log::debug!("Sending signal: ConversationsUpdated");
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::ConversationsUpdated {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::MessagesUpdated(conversation_id) => {
log::debug!(
"Sending signal: MessagesUpdated for conversation {}",
conversation_id
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::MessagesUpdated { conversation_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentDownloaded(attachment_id) => {
log::debug!(
"Sending signal: AttachmentDownloaded for attachment {}",
attachment_id
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentDownloadCompleted { attachment_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!(
"Sending signal: AttachmentUploaded for upload {}, attachment {}",
upload_guid,
attachment_guid
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentUploadCompleted {
upload_guid,
attachment_guid,
},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::UpdateStreamReconnected => {
log::debug!("Sending signal: UpdateStreamReconnected");
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::UpdateStreamReconnected {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::Internal(_) => {
log::trace!("Ignoring internal signal for D-Bus transport");
}
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
"Signal receiver lagged; skipped {} daemon signals",
skipped
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::MessagesUpdated { conversation_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentDownloaded(attachment_id) => {
log::debug!(
"Sending signal: AttachmentDownloaded for attachment {}",
attachment_id
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentDownloadCompleted { attachment_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!(
"Sending signal: AttachmentUploaded for upload {}, attachment {}",
upload_guid,
attachment_guid
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentUploadCompleted {
upload_guid,
attachment_guid,
},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::UpdateStreamReconnected => {
log::debug!("Sending signal: UpdateStreamReconnected");
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::UpdateStreamReconnected {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
Err(broadcast::error::RecvError::Closed) => {
log::warn!("Signal channel closed; stopping D-Bus forwarding");
break;
}
}
}
@@ -179,8 +194,9 @@ impl DBusAgent {
&self,
make_event: impl FnOnce(Reply<T>) -> Event + Send,
) -> Result<T, MethodErr> {
let daemon_result = run_sync_future(self.send_event(make_event))?;
daemon_result.map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e)))
run_sync_future(self.send_event(make_event))
.unwrap()
.map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e)))
}
fn resolve_participant_display_name(&mut self, participant: &Participant) -> String {
@@ -280,108 +296,110 @@ impl DbusRepository for DBusAgent {
conversation_id: String,
last_message_id: String,
) -> Result<Vec<arg::PropMap>, MethodErr> {
let started = Instant::now();
let last_message_id_opt = if last_message_id.is_empty() {
None
} else {
Some(last_message_id)
};
let messages =
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))?;
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))
.map(|messages| {
messages
.into_iter()
.map(|msg| {
let mut map = arg::PropMap::new();
map.insert("id".into(), arg::Variant(Box::new(msg.id)));
let mut attachment_count: usize = 0;
let mut text_bytes: usize = 0;
// Remove the attachment placeholder here.
let text = msg.text.replace("\u{FFFC}", "");
let mapped: Vec<arg::PropMap> = messages
.into_iter()
.map(|msg| {
let mut map = arg::PropMap::new();
map.insert("id".into(), arg::Variant(Box::new(msg.id)));
// Remove the attachment placeholder here.
let text = msg.text.replace("\u{FFFC}", "");
text_bytes += text.len();
map.insert("text".into(), arg::Variant(Box::new(text)));
map.insert(
"date".into(),
arg::Variant(Box::new(msg.date.and_utc().timestamp())),
);
map.insert(
"sender".into(),
arg::Variant(Box::new(msg.sender.display_name())),
);
if !msg.attachments.is_empty() {
let attachments: Vec<arg::PropMap> = msg
.attachments
.into_iter()
.map(|attachment| {
attachment_count += 1;
let mut attachment_map = arg::PropMap::new();
attachment_map.insert(
"guid".into(),
arg::Variant(Box::new(attachment.guid.clone())),
map.insert("text".into(), arg::Variant(Box::new(text)));
map.insert(
"date".into(),
arg::Variant(Box::new(msg.date.and_utc().timestamp())),
);
attachment_map.insert(
"downloaded".into(),
arg::Variant(Box::new(attachment.is_downloaded(false))),
);
attachment_map.insert(
"preview_downloaded".into(),
arg::Variant(Box::new(attachment.is_downloaded(true))),
map.insert(
"sender".into(),
arg::Variant(Box::new(
self.resolve_participant_display_name(&msg.sender.into()),
)),
);
if let Some(ref metadata) = attachment.metadata {
let mut metadata_map = arg::PropMap::new();
if let Some(ref attribution_info) = metadata.attribution_info {
let mut attribution_map = arg::PropMap::new();
if let Some(width) = attribution_info.width {
attribution_map.insert(
"width".into(),
arg::Variant(Box::new(width as i32)),
);
}
if let Some(height) = attribution_info.height {
attribution_map.insert(
"height".into(),
arg::Variant(Box::new(height as i32)),
);
}
metadata_map.insert(
"attribution_info".into(),
arg::Variant(Box::new(attribution_map)),
// Attachments array
let attachments: Vec<arg::PropMap> = msg
.attachments
.into_iter()
.map(|attachment| {
let mut attachment_map = arg::PropMap::new();
attachment_map.insert(
"guid".into(),
arg::Variant(Box::new(attachment.guid.clone())),
);
}
attachment_map.insert(
"metadata".into(),
arg::Variant(Box::new(metadata_map)),
);
}
attachment_map
})
.collect();
// Paths and download status
let path = attachment.get_path_for_preview(false);
let preview_path = attachment.get_path_for_preview(true);
let downloaded = attachment.is_downloaded(false);
let preview_downloaded = attachment.is_downloaded(true);
map.insert("attachments".into(), arg::Variant(Box::new(attachments)));
}
attachment_map.insert(
"path".into(),
arg::Variant(Box::new(path.to_string_lossy().to_string())),
);
attachment_map.insert(
"preview_path".into(),
arg::Variant(Box::new(
preview_path.to_string_lossy().to_string(),
)),
);
attachment_map.insert(
"downloaded".into(),
arg::Variant(Box::new(downloaded)),
);
attachment_map.insert(
"preview_downloaded".into(),
arg::Variant(Box::new(preview_downloaded)),
);
map
// Metadata
if let Some(ref metadata) = attachment.metadata {
let mut metadata_map = arg::PropMap::new();
if let Some(ref attribution_info) = metadata.attribution_info {
let mut attribution_map = arg::PropMap::new();
if let Some(width) = attribution_info.width {
attribution_map.insert(
"width".into(),
arg::Variant(Box::new(width as i32)),
);
}
if let Some(height) = attribution_info.height {
attribution_map.insert(
"height".into(),
arg::Variant(Box::new(height as i32)),
);
}
metadata_map.insert(
"attribution_info".into(),
arg::Variant(Box::new(attribution_map)),
);
}
attachment_map.insert(
"metadata".into(),
arg::Variant(Box::new(metadata_map)),
);
}
attachment_map
})
.collect();
map.insert("attachments".into(), arg::Variant(Box::new(attachments)));
map
})
.collect()
})
.collect();
log::debug!(
target: "dbus",
"GetMessages mapped in {}ms: {} messages, {} attachments, {} text-bytes",
started.elapsed().as_millis(),
mapped.len(),
attachment_count,
text_bytes
);
Ok(mapped)
}
fn delete_all_conversations(&mut self) -> Result<(), MethodErr> {
@@ -398,6 +416,13 @@ impl DbusRepository for DBusAgent {
.map(|uuid| uuid.to_string())
}
fn test_notification(&mut self, summary: String, body: String) -> Result<(), MethodErr> {
match self.send_event_sync(|r| Event::TestNotification(summary, body, r))? {
Ok(()) => Ok(()),
Err(message) => Err(MethodErr::failed(&message)),
}
}
fn get_attachment_info(
&mut self,
attachment_id: String,
@@ -425,23 +450,6 @@ impl DbusRepository for DBusAgent {
self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r))
}
fn open_attachment_fd(
&mut self,
attachment_id: String,
preview: bool,
) -> Result<arg::OwnedFd, MethodErr> {
let attachment = self.send_event_sync(|r| Event::GetAttachment(attachment_id, r))?;
let path = attachment.get_path_for_preview(preview);
let file = OpenOptions::new()
.read(true)
.open(&path)
.map_err(|e| MethodErr::failed(&format!("Failed to open attachment: {}", e)))?;
let fd = file.into_raw_fd();
Ok(unsafe { arg::OwnedFd::from_raw_fd(fd) })
}
fn upload_attachment(&mut self, path: String) -> Result<String, MethodErr> {
use std::path::PathBuf;
let path = PathBuf::from(path);
@@ -513,7 +521,7 @@ where
T: Send,
F: Future<Output = T> + Send,
{
let joined = thread::scope(move |s| {
thread::scope(move |s| {
s.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -524,10 +532,6 @@ where
Ok(result)
})
.join()
});
match joined {
Ok(result) => result,
Err(_) => Err(MethodErr::failed("Error joining runtime thread")),
}
})
.expect("Error joining runtime thread")
}

View File

@@ -26,7 +26,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
use dbus::agent::DBusAgent;
// Start the D-Bus agent (events in, signals out).
let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals());
tokio::spawn(async move {
agent.run().await;
});
@@ -35,8 +35,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
#[cfg(target_os = "macos")]
async fn start_ipc_agent(daemon: &mut Daemon) {
// Start the macOS XPC agent (events in, signals out) on a dedicated thread.
let agent =
xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
let agent = xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals());
std::thread::spawn(move || {
// Use a single-threaded Tokio runtime for the XPC agent.
let rt = tokio::runtime::Builder::new_current_thread()

View File

@@ -4,7 +4,7 @@ use std::ffi::CString;
use std::os::raw::c_char;
use std::ptr;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
use xpc_connection_sys as xpc_sys;
@@ -22,11 +22,14 @@ type Subscribers = Arc<std::sync::Mutex<Vec<XpcConn>>>;
#[derive(Clone)]
pub struct XpcAgent {
event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>,
signal_receiver: Arc<Mutex<Option<broadcast::Receiver<Signal>>>>,
}
impl XpcAgent {
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self {
pub fn new(
event_sink: mpsc::Sender<Event>,
signal_receiver: broadcast::Receiver<Signal>,
) -> Self {
Self {
event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
@@ -71,7 +74,31 @@ impl XpcAgent {
.await
.take()
.expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await {
loop {
let signal = match receiver.recv().await {
Ok(signal) => signal,
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
target: LOG_TARGET,
"XPC agent lagged; skipped {} signals",
skipped
);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
log::warn!(
target: LOG_TARGET,
"Signal channel closed; stopping XPC forwarding"
);
break;
}
};
if matches!(signal, Signal::Internal(_)) {
log::trace!(target: LOG_TARGET, "Skipping internal signal for XPC");
continue;
}
log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
let msg = super::util::signal_to_message(signal);
let xobj = message_to_xpc_object(msg);
@@ -127,7 +154,7 @@ impl XpcAgent {
// Drop any cleanup resource now that payload is constructed and sent.
drop(result.cleanup);
log::trace!(target: LOG_TARGET, "XPC reply sent for method: {}", method);
} else {
log::warn!(target: LOG_TARGET, "No reply port for method: {}", method);

View File

@@ -15,10 +15,16 @@ pub struct DispatchResult {
impl DispatchResult {
pub fn new(message: Message) -> Self {
Self { message, cleanup: None }
Self {
message,
cleanup: None,
}
}
pub fn with_cleanup<T: Any + Send + 'static>(message: Message, cleanup: T) -> Self {
Self { message, cleanup: Some(Box::new(cleanup)) }
Self {
message,
cleanup: Some(Box::new(cleanup)),
}
}
}

View File

@@ -105,7 +105,12 @@ pub async fn dispatch(
.and_then(|m| dict_get_str(m, "conversation_id"))
{
Some(id) => id,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing conversation_id",
))
}
};
match agent
.send_event(|r| Event::SyncConversation(conversation_id, r))
@@ -122,7 +127,12 @@ pub async fn dispatch(
.and_then(|m| dict_get_str(m, "conversation_id"))
{
Some(id) => id,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing conversation_id",
))
}
};
match agent
.send_event(|r| Event::MarkConversationAsRead(conversation_id, r))
@@ -137,11 +147,21 @@ pub async fn dispatch(
"GetMessages" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let conversation_id = match dict_get_str(args, "conversation_id") {
Some(id) => id,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing conversation_id",
))
}
};
let last_message_id = dict_get_str(args, "last_message_id");
match agent
@@ -158,13 +178,10 @@ pub async fn dispatch(
dict_put_str(&mut m, "sender", &msg.sender.display_name());
// Include attachment GUIDs for the client to resolve/download
let attachment_guids: Vec<String> = msg
.attachments
.iter()
.map(|a| a.guid.clone())
.collect();
let attachment_guids: Vec<String> =
msg.attachments.iter().map(|a| a.guid.clone()).collect();
m.insert(cstr("attachment_guids"), array_from_strs(attachment_guids));
// Full attachments array with metadata (mirrors DBus fields)
let mut attachments_items: Vec<Message> = Vec::new();
for attachment in msg.attachments.iter() {
@@ -193,12 +210,23 @@ pub async fn dispatch(
if let Some(attribution_info) = &metadata.attribution_info {
let mut attribution_map: XpcMap = HashMap::new();
if let Some(width) = attribution_info.width {
dict_put_i64_as_str(&mut attribution_map, "width", width as i64);
dict_put_i64_as_str(
&mut attribution_map,
"width",
width as i64,
);
}
if let Some(height) = attribution_info.height {
dict_put_i64_as_str(&mut attribution_map, "height", height as i64);
dict_put_i64_as_str(
&mut attribution_map,
"height",
height as i64,
);
}
metadata_map.insert(cstr("attribution_info"), Message::Dictionary(attribution_map));
metadata_map.insert(
cstr("attribution_info"),
Message::Dictionary(attribution_map),
);
}
if !metadata_map.is_empty() {
a.insert(cstr("metadata"), Message::Dictionary(metadata_map));
@@ -208,7 +236,7 @@ pub async fn dispatch(
attachments_items.push(Message::Dictionary(a));
}
m.insert(cstr("attachments"), Message::Array(attachments_items));
items.push(Message::Dictionary(m));
}
let mut reply: XpcMap = HashMap::new();
@@ -230,11 +258,21 @@ pub async fn dispatch(
"SendMessage" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let conversation_id = match dict_get_str(args, "conversation_id") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing conversation_id",
))
}
};
let text = dict_get_str(args, "text").unwrap_or_default();
let attachment_guids: Vec<String> = match args.get(&cstr("attachment_guids")) {
@@ -265,11 +303,21 @@ pub async fn dispatch(
"GetAttachmentInfo" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing attachment_id",
))
}
};
match agent
.send_event(|r| Event::GetAttachment(attachment_id, r))
@@ -308,11 +356,21 @@ pub async fn dispatch(
"OpenAttachmentFd" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing attachment_id",
))
}
};
let preview = dict_get_str(args, "preview")
.map(|s| s == "true")
@@ -324,7 +382,7 @@ pub async fn dispatch(
{
Ok(attachment) => {
use std::os::fd::AsRawFd;
let path = attachment.get_path_for_preview(preview);
match std::fs::OpenOptions::new().read(true).open(&path) {
Ok(file) => {
@@ -335,9 +393,14 @@ pub async fn dispatch(
dict_put_str(&mut reply, "type", "OpenAttachmentFdResponse");
reply.insert(cstr("fd"), Message::Fd(fd));
DispatchResult { message: Message::Dictionary(reply), cleanup: Some(Box::new(file)) }
DispatchResult {
message: Message::Dictionary(reply),
cleanup: Some(Box::new(file)),
}
}
Err(e) => {
DispatchResult::new(make_error_reply("OpenFailed", &format!("{}", e)))
}
Err(e) => DispatchResult::new(make_error_reply("OpenFailed", &format!("{}", e))),
}
}
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
@@ -348,11 +411,21 @@ pub async fn dispatch(
"DownloadAttachment" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing attachment_id",
))
}
};
let preview = dict_get_str(args, "preview")
.map(|s| s == "true")
@@ -371,11 +444,18 @@ pub async fn dispatch(
use std::path::PathBuf;
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let path = match dict_get_str(args, "path") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing path")),
None => {
return DispatchResult::new(make_error_reply("InvalidRequest", "Missing path"))
}
};
match agent
.send_event(|r| Event::UploadAttachment(PathBuf::from(path), r))
@@ -413,7 +493,12 @@ pub async fn dispatch(
"UpdateSettings" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let server_url = dict_get_str(args, "server_url");
let username = dict_get_str(args, "username");

View File

@@ -28,7 +28,7 @@ dbus-tree = "0.9.2"
# D-Bus codegen only on Linux
[target.'cfg(target_os = "linux")'.build-dependencies]
dbus-codegen = { version = "0.10.0", default-features = false }
dbus-codegen = "0.10.0"
# XPC (libxpc) interface only on macOS
[target.'cfg(target_os = "macos")'.dependencies]

View File

@@ -146,17 +146,15 @@ impl ClientCli {
loop {
match stream.next().await.unwrap() {
Ok(update) => {
match update {
SocketUpdate::Update(updates) => {
for update in updates {
println!("Got update: {:?}", update);
}
}
SocketUpdate::Pong => {
println!("Pong");
Ok(update) => match update {
SocketUpdate::Update(updates) => {
for update in updates {
println!("Got update: {:?}", update);
}
}
SocketUpdate::Pong => {
println!("Pong");
}
},
Err(e) => {

View File

@@ -33,7 +33,7 @@ impl DBusDaemonInterface {
fn proxy(&self) -> Proxy<&Connection> {
self.conn
.with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_secs(30))
.with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_millis(5000))
}
async fn print_settings(&mut self) -> Result<()> {
@@ -209,4 +209,9 @@ impl DaemonInterface for DBusDaemonInterface {
KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id)
.map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {}", e))
}
async fn test_notification(&mut self, summary: String, body: String) -> Result<()> {
KordophoneRepository::test_notification(&self.proxy(), &summary, &body)
.map_err(|e| anyhow::anyhow!("Failed to trigger test notification: {}", e))
}
}

View File

@@ -32,6 +32,7 @@ pub trait DaemonInterface {
async fn download_attachment(&mut self, attachment_id: String) -> Result<()>;
async fn upload_attachment(&mut self, path: String) -> Result<()>;
async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>;
async fn test_notification(&mut self, summary: String, body: String) -> Result<()>;
}
struct StubDaemonInterface;
@@ -112,6 +113,11 @@ impl DaemonInterface for StubDaemonInterface {
"Daemon interface not implemented on this platform"
))
}
async fn test_notification(&mut self, _summary: String, _body: String) -> Result<()> {
Err(anyhow::anyhow!(
"Daemon interface not implemented on this platform"
))
}
}
pub fn new_daemon_interface() -> Result<Box<dyn DaemonInterface>> {
@@ -175,6 +181,9 @@ pub enum Commands {
/// Marks a conversation as read.
MarkConversationAsRead { conversation_id: String },
/// Displays a test notification using the daemon.
TestNotification { summary: String, body: String },
}
#[derive(Subcommand)]
@@ -219,6 +228,9 @@ impl Commands {
Commands::MarkConversationAsRead { conversation_id } => {
client.mark_conversation_as_read(conversation_id).await
}
Commands::TestNotification { summary, body } => {
client.test_notification(summary, body).await
}
}
}
}

View File

@@ -1,12 +0,0 @@
[package]
name = "kptui"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.93"
crossterm = "0.28.1"
kordophoned-client = { path = "../kordophoned-client" }
ratatui = { version = "0.29.0", features = ["unstable-rendered-line-info"] }
time = { version = "0.3.37", features = ["formatting"] }
unicode-width = "0.2.0"

View File

@@ -1,789 +0,0 @@
use kordophoned_client as daemon;
use anyhow::Result;
use crossterm::event::{Event as CEvent, KeyCode, KeyEvent, KeyEventKind, KeyModifiers};
use crossterm::terminal::{disable_raw_mode, enable_raw_mode};
use ratatui::prelude::*;
use ratatui::widgets::*;
use std::sync::mpsc;
use std::time::{Duration, Instant};
use unicode_width::UnicodeWidthStr;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ViewMode {
List,
Chat,
Split,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum Focus {
Navigation,
Input,
}
struct AppState {
conversations: Vec<daemon::ConversationSummary>,
selected_idx: usize,
selected_conversation_id: Option<String>,
messages: Vec<daemon::ChatMessage>,
active_conversation_id: Option<String>,
active_conversation_title: String,
status: String,
input: String,
focus: Focus,
transcript_scroll: u16,
pinned_to_bottom: bool,
refresh_conversations_in_flight: bool,
refresh_messages_in_flight: bool,
}
impl AppState {
fn new() -> Self {
Self {
conversations: Vec::new(),
selected_idx: 0,
selected_conversation_id: None,
messages: Vec::new(),
active_conversation_id: None,
active_conversation_title: String::new(),
status: String::new(),
input: String::new(),
focus: Focus::Navigation,
transcript_scroll: 0,
pinned_to_bottom: true,
refresh_conversations_in_flight: false,
refresh_messages_in_flight: false,
}
}
fn select_next(&mut self) {
if self.conversations.is_empty() {
self.selected_idx = 0;
self.selected_conversation_id = None;
return;
}
self.selected_idx = (self.selected_idx + 1).min(self.conversations.len() - 1);
self.selected_conversation_id = self
.conversations
.get(self.selected_idx)
.map(|c| c.id.clone());
}
fn select_prev(&mut self) {
if self.conversations.is_empty() {
self.selected_idx = 0;
self.selected_conversation_id = None;
return;
}
self.selected_idx = self.selected_idx.saturating_sub(1);
self.selected_conversation_id = self
.conversations
.get(self.selected_idx)
.map(|c| c.id.clone());
}
fn open_selected_conversation(&mut self) {
if let Some(conv) = self.conversations.get(self.selected_idx) {
self.active_conversation_id = Some(conv.id.clone());
self.active_conversation_title = conv.title.clone();
self.selected_conversation_id = Some(conv.id.clone());
self.messages.clear();
self.transcript_scroll = 0;
self.pinned_to_bottom = true;
self.focus = Focus::Input;
self.status = "Loading…".to_string();
}
}
}
fn view_mode(width: u16, has_active_conversation: bool, requested: ViewMode) -> ViewMode {
let min_conversations = 24u16;
let min_chat = 44u16;
let min_total = min_conversations + 1 + min_chat;
if width >= min_total {
return ViewMode::Split;
}
if has_active_conversation {
requested
} else {
ViewMode::List
}
}
fn ui(frame: &mut Frame, app: &AppState, requested_view: ViewMode) {
let area = frame.area();
let mode = view_mode(
area.width,
app.active_conversation_id.is_some(),
requested_view,
);
let show_input =
matches!(mode, ViewMode::Chat | ViewMode::Split) && app.active_conversation_id.is_some();
let chunks = if show_input {
Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Min(1),
Constraint::Length(3),
Constraint::Length(1),
])
.split(area)
} else {
Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Min(1), Constraint::Length(1)])
.split(area)
};
let (main_area, input_area, status_area) = if show_input {
(chunks[0], Some(chunks[1]), chunks[2])
} else {
(chunks[0], None, chunks[1])
};
match mode {
ViewMode::Split => {
let left_width = (main_area.width / 3).clamp(24, 40);
let cols = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Length(left_width), Constraint::Min(1)])
.split(main_area);
render_conversations(frame, app, cols[0], true);
render_transcript(frame, app, cols[1], true);
}
ViewMode::List => render_conversations(frame, app, main_area, false),
ViewMode::Chat => render_transcript(frame, app, main_area, false),
}
if let Some(input_area) = input_area {
let input_scroll_x = render_input(frame, app, input_area);
if app.focus == Focus::Input {
let cursor_col = visual_width_u16(app.input.as_str());
let mut x = input_area
.x
.saturating_add(1)
.saturating_add(cursor_col.saturating_sub(input_scroll_x));
let max_x = input_area
.x
.saturating_add(input_area.width.saturating_sub(2));
x = x.min(max_x);
let y = input_area.y + 1;
frame.set_cursor_position(Position { x, y });
}
}
render_status(frame, app, status_area, mode);
}
fn render_conversations(frame: &mut Frame, app: &AppState, area: Rect, _in_split: bool) {
let title = "Kordophone";
let items = app
.conversations
.iter()
.map(|c| {
let is_active = app.active_conversation_id.as_deref() == Some(c.id.as_str());
let header = Line::from(vec![
Span::styled(
if c.unread_count > 0 { "" } else { " " },
Style::default()
.fg(if c.unread_count > 0 {
Color::LightYellow
} else {
Color::DarkGray
})
.add_modifier(Modifier::BOLD),
),
Span::styled(
c.title.clone(),
Style::default().add_modifier(Modifier::BOLD),
),
]);
let preview = Line::from(Span::styled(
c.preview.clone(),
Style::default().fg(if is_active {
Color::Gray
} else {
Color::DarkGray
}),
));
ListItem::new(vec![header, preview]).style(if is_active {
Style::default().bg(Color::DarkGray)
} else {
Style::default()
})
})
.collect::<Vec<_>>();
let mut state = ListState::default();
state.select(if app.conversations.is_empty() {
None
} else {
Some(app.selected_idx)
});
let list = List::new(items)
.block(Block::default().borders(Borders::ALL).title(title))
.highlight_style(
Style::default()
.bg(Color::Blue)
.fg(Color::White)
.add_modifier(Modifier::BOLD),
)
.highlight_symbol("");
frame.render_stateful_widget(list, area, &mut state);
}
fn render_transcript(frame: &mut Frame, app: &AppState, area: Rect, in_split: bool) {
let title = if let Some(_) = app.active_conversation_id {
if in_split {
format!("{} (Esc: nav, Tab: focus)", app.active_conversation_title)
} else {
format!("{} (Esc: back)", app.active_conversation_title)
}
} else {
"Chat".to_string()
};
let lines = transcript_lines(&app.messages);
let paragraph = Paragraph::new(Text::from(lines))
.block(Block::default().borders(Borders::ALL).title(title))
.wrap(Wrap { trim: false })
.scroll((app.transcript_scroll, 0));
frame.render_widget(paragraph, area);
}
fn render_input(frame: &mut Frame, app: &AppState, area: Rect) -> u16 {
let title = if app.focus == Focus::Input {
"Reply (Enter to send)"
} else {
"Reply (press i to type)"
};
let inner_width = area.width.saturating_sub(2).max(1);
let cursor_col = visual_width_u16(app.input.as_str());
let input_scroll_x = cursor_col.saturating_sub(inner_width.saturating_sub(1));
let input = Paragraph::new(app.input.as_str())
.block(Block::default().borders(Borders::ALL).title(title))
.scroll((0, input_scroll_x));
frame.render_widget(input, area);
input_scroll_x
}
fn render_status(frame: &mut Frame, app: &AppState, area: Rect, mode: ViewMode) {
let mut parts = vec![
format!("{} convs", app.conversations.len()),
match mode {
ViewMode::Split => "split".to_string(),
ViewMode::List => "list".to_string(),
ViewMode::Chat => "chat".to_string(),
},
];
if !app.status.trim().is_empty() {
parts.push(app.status.clone());
}
let line = parts.join(" | ");
frame.render_widget(
Paragraph::new(line).block(Block::default().borders(Borders::TOP)),
area,
);
}
fn main() -> Result<()> {
enable_raw_mode()?;
let mut stdout = std::io::stdout();
crossterm::execute!(
stdout,
crossterm::terminal::EnterAlternateScreen,
crossterm::event::EnableMouseCapture
)?;
let backend = ratatui::backend::CrosstermBackend::new(stdout);
let mut terminal = ratatui::Terminal::new(backend)?;
let res = run_app(&mut terminal);
disable_raw_mode()?;
crossterm::execute!(
terminal.backend_mut(),
crossterm::event::DisableMouseCapture,
crossterm::terminal::LeaveAlternateScreen
)?;
terminal.show_cursor()?;
res
}
fn run_app(
terminal: &mut ratatui::Terminal<ratatui::backend::CrosstermBackend<std::io::Stdout>>,
) -> Result<()> {
let (request_tx, request_rx) = mpsc::channel::<daemon::Request>();
let (event_tx, event_rx) = mpsc::channel::<daemon::Event>();
let _worker = daemon::spawn_worker(request_rx, event_tx);
let tick_rate = Duration::from_millis(150);
let refresh_rate = Duration::from_secs(2);
let mut last_tick = Instant::now();
let mut last_refresh = Instant::now() - refresh_rate;
let mut requested_view = ViewMode::List;
let mut app = AppState::new();
app.status = "Connecting…".to_string();
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
loop {
let size = terminal.size()?;
while let Ok(evt) = event_rx.try_recv() {
match evt {
daemon::Event::Conversations(convs) => {
let keep_selected_id = app
.selected_conversation_id
.clone()
.or_else(|| app.active_conversation_id.clone());
app.refresh_conversations_in_flight = false;
app.status.clear();
app.conversations = convs;
if app.conversations.is_empty() {
app.selected_idx = 0;
app.selected_conversation_id = None;
} else if let Some(id) = keep_selected_id {
if let Some(idx) = app.conversations.iter().position(|c| c.id == id) {
app.selected_idx = idx;
app.selected_conversation_id = Some(id);
} else {
app.selected_idx = 0;
app.selected_conversation_id = Some(app.conversations[0].id.clone());
}
} else {
app.selected_idx = app.selected_idx.min(app.conversations.len() - 1);
app.selected_conversation_id =
Some(app.conversations[app.selected_idx].id.clone());
}
}
daemon::Event::Messages {
conversation_id,
messages,
} => {
app.refresh_messages_in_flight = false;
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
let was_pinned = app.pinned_to_bottom;
app.messages = messages;
app.pinned_to_bottom = was_pinned;
}
}
daemon::Event::MessageSent {
conversation_id,
outgoing_id,
} => {
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
app.status = outgoing_id
.as_deref()
.map(|id| format!("Sent ({id})"))
.unwrap_or_else(|| "Sent".to_string());
app.refresh_messages_in_flight = false;
request_tx
.send(daemon::Request::RefreshMessages { conversation_id })
.ok();
app.refresh_messages_in_flight = true;
}
}
daemon::Event::MarkedRead => {}
daemon::Event::ConversationSyncTriggered { conversation_id } => {
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
app.status = "Syncing…".to_string();
}
}
daemon::Event::ConversationsUpdated => {
if !app.refresh_conversations_in_flight {
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
}
if let Some(cid) = app.active_conversation_id.clone() {
if !app.refresh_messages_in_flight {
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
}
}
daemon::Event::MessagesUpdated { conversation_id } => {
if !app.refresh_conversations_in_flight {
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
}
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
if !app.refresh_messages_in_flight {
request_tx
.send(daemon::Request::RefreshMessages { conversation_id })
.ok();
app.refresh_messages_in_flight = true;
}
}
}
daemon::Event::UpdateStreamReconnected => {
if !app.refresh_conversations_in_flight {
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
}
if let Some(cid) = app.active_conversation_id.clone() {
if !app.refresh_messages_in_flight {
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
}
}
daemon::Event::Error(e) => {
app.refresh_conversations_in_flight = false;
app.refresh_messages_in_flight = false;
app.status = e;
}
}
}
apply_transcript_scroll_policy(&mut app, size, requested_view);
terminal.draw(|f| ui(f, &app, requested_view))?;
let timeout = tick_rate.saturating_sub(last_tick.elapsed());
if crossterm::event::poll(timeout)? {
if let CEvent::Key(key) = crossterm::event::read()? {
if key.kind != KeyEventKind::Press {
continue;
}
let ctrl = key.modifiers.contains(KeyModifiers::CONTROL);
match (key.code, ctrl) {
(KeyCode::Char('c'), true) => return Ok(()),
_ => {}
}
let screen_mode = view_mode(
size.width,
app.active_conversation_id.is_some(),
requested_view,
);
let max_scroll = max_transcript_scroll(&app, size, requested_view);
match screen_mode {
ViewMode::List => match key.code {
KeyCode::Up => app.select_prev(),
KeyCode::Down => app.select_next(),
KeyCode::Enter => {
app.open_selected_conversation();
if app.active_conversation_id.is_some() {
requested_view = ViewMode::Chat;
if let Some(cid) = app.active_conversation_id.clone() {
request_tx
.send(daemon::Request::MarkRead {
conversation_id: cid.clone(),
})
.ok();
request_tx
.send(daemon::Request::SyncConversation {
conversation_id: cid.clone(),
})
.ok();
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
}
}
_ => {}
},
ViewMode::Chat => match key.code {
KeyCode::Esc => {
requested_view = ViewMode::List;
app.focus = Focus::Navigation;
}
KeyCode::Char('i') if app.focus != Focus::Input => app.focus = Focus::Input,
_ => {
handle_chat_keys(&mut app, &request_tx, key, max_scroll);
}
},
ViewMode::Split => match key.code {
KeyCode::Tab => {
app.focus = match app.focus {
Focus::Navigation => Focus::Input,
Focus::Input => Focus::Navigation,
}
}
KeyCode::Esc => app.focus = Focus::Navigation,
KeyCode::Char('i') if app.focus != Focus::Input => app.focus = Focus::Input,
KeyCode::Up => {
if app.focus == Focus::Navigation {
app.select_prev()
} else {
scroll_up(&mut app, 1);
}
}
KeyCode::Down => {
if app.focus == Focus::Navigation {
app.select_next()
} else {
scroll_down(&mut app, 1, max_scroll);
}
}
KeyCode::Enter => {
if app.focus == Focus::Navigation {
app.open_selected_conversation();
requested_view = ViewMode::Chat;
if let Some(cid) = app.active_conversation_id.clone() {
request_tx
.send(daemon::Request::MarkRead {
conversation_id: cid.clone(),
})
.ok();
request_tx
.send(daemon::Request::SyncConversation {
conversation_id: cid.clone(),
})
.ok();
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
} else {
handle_chat_keys(&mut app, &request_tx, key, max_scroll);
}
}
_ => handle_chat_keys(&mut app, &request_tx, key, max_scroll),
},
}
}
}
if last_refresh.elapsed() >= refresh_rate {
if !app.refresh_conversations_in_flight {
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
}
if let Some(cid) = app.active_conversation_id.clone() {
if !app.refresh_messages_in_flight {
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
}
last_refresh = Instant::now();
}
if last_tick.elapsed() >= tick_rate {
last_tick = Instant::now();
}
}
}
fn handle_chat_keys(
app: &mut AppState,
request_tx: &mpsc::Sender<daemon::Request>,
key: KeyEvent,
max_scroll: u16,
) {
let code = key.code;
let modifiers = key.modifiers;
match code {
KeyCode::PageUp => scroll_up(app, 10),
KeyCode::PageDown => scroll_down(app, 10, max_scroll),
_ => {}
}
if app.focus != Focus::Input {
return;
}
match code {
KeyCode::Char('u') if modifiers.contains(KeyModifiers::CONTROL) => {
app.input.clear();
}
KeyCode::Backspace if modifiers.contains(KeyModifiers::ALT) => {
delete_prev_word(&mut app.input);
}
KeyCode::Char('w') if modifiers.contains(KeyModifiers::CONTROL) => {
delete_prev_word(&mut app.input);
}
KeyCode::Enter => {
let text = app.input.trim().to_string();
if text.is_empty() {
return;
}
let Some(conversation_id) = app.active_conversation_id.clone() else {
app.status = "No conversation selected".to_string();
return;
};
request_tx
.send(daemon::Request::SendMessage {
conversation_id,
text,
})
.ok();
app.refresh_messages_in_flight = true;
app.input.clear();
}
KeyCode::Backspace => {
app.input.pop();
}
KeyCode::Char(c) => {
if !c.is_control() {
app.input.push(c);
}
}
_ => {}
}
}
fn delete_prev_word(input: &mut String) {
while input.chars().last().is_some_and(|c| c.is_whitespace()) {
input.pop();
}
while input.chars().last().is_some_and(|c| !c.is_whitespace()) {
input.pop();
}
}
fn scroll_up(app: &mut AppState, amount: u16) {
if amount > 0 {
app.pinned_to_bottom = false;
}
app.transcript_scroll = app.transcript_scroll.saturating_sub(amount);
}
fn scroll_down(app: &mut AppState, amount: u16, max_scroll: u16) {
app.transcript_scroll = app.transcript_scroll.saturating_add(amount);
if app.transcript_scroll >= max_scroll {
app.transcript_scroll = max_scroll;
app.pinned_to_bottom = true;
}
}
fn transcript_inner_width(size: Size, app: &AppState, requested_view: ViewMode) -> u16 {
let mode = view_mode(
size.width,
app.active_conversation_id.is_some(),
requested_view,
);
let outer_width = match mode {
ViewMode::Split => {
let left_width = (size.width / 3).clamp(24, 40);
size.width.saturating_sub(left_width)
}
ViewMode::Chat => size.width,
ViewMode::List => 0,
};
outer_width.saturating_sub(2).max(1)
}
fn visual_width_u16(s: &str) -> u16 {
s.width().min(u16::MAX as usize) as u16
}
fn transcript_lines(messages: &[daemon::ChatMessage]) -> Vec<Line<'static>> {
let mut lines: Vec<Line<'static>> = Vec::new();
for message in messages {
let ts = time::OffsetDateTime::from_unix_timestamp(message.date_unix)
.unwrap_or(time::OffsetDateTime::UNIX_EPOCH)
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
lines.push(Line::from(vec![
Span::styled(
message.sender.clone(),
Style::default().add_modifier(Modifier::BOLD),
),
Span::raw(" "),
Span::styled(ts, Style::default().fg(Color::DarkGray)),
]));
let mut rendered_any_text = false;
for text_line in message.text.lines() {
rendered_any_text = true;
lines.push(Line::from(Span::raw(text_line.to_string())));
}
if !rendered_any_text {
lines.push(Line::from(Span::styled(
"<non-text message>",
Style::default().fg(Color::DarkGray),
)));
}
lines.push(Line::from(Span::raw("")));
}
if lines.is_empty() {
lines.push(Line::from(Span::styled(
"No messages.",
Style::default().fg(Color::DarkGray),
)));
}
lines
}
fn transcript_content_visual_lines(messages: &[daemon::ChatMessage], inner_width: u16) -> u16 {
let paragraph =
Paragraph::new(Text::from(transcript_lines(messages))).wrap(Wrap { trim: false });
paragraph.line_count(inner_width).min(u16::MAX as usize) as u16
}
fn transcript_viewport_height(size: Size, app: &AppState, requested_view: ViewMode) -> u16 {
let mode = view_mode(
size.width,
app.active_conversation_id.is_some(),
requested_view,
);
let show_input =
matches!(mode, ViewMode::Chat | ViewMode::Split) && app.active_conversation_id.is_some();
let transcript_height = if show_input {
size.height.saturating_sub(4) // input (3) + status (1)
} else {
size.height.saturating_sub(1) // status
};
match mode {
ViewMode::Chat | ViewMode::Split => transcript_height.saturating_sub(2), // borders
ViewMode::List => 0,
}
}
fn max_transcript_scroll(app: &AppState, size: Size, requested_view: ViewMode) -> u16 {
let viewport_height = transcript_viewport_height(size, app, requested_view);
let inner_width = transcript_inner_width(size, app, requested_view);
let content = transcript_content_visual_lines(&app.messages, inner_width);
content.saturating_sub(viewport_height)
}
fn apply_transcript_scroll_policy(app: &mut AppState, size: Size, requested_view: ViewMode) {
let max_scroll = max_transcript_scroll(app, size, requested_view);
if app.pinned_to_bottom {
app.transcript_scroll = max_scroll;
} else {
app.transcript_scroll = app.transcript_scroll.min(max_scroll);
}
}

View File

@@ -1,13 +1,13 @@
use std::env;
use std::process;
use kordophone::{
api::{HTTPAPIClient, InMemoryAuthenticationStore, EventSocket},
model::{ConversationID, event::EventData},
APIInterface,
};
use kordophone::api::http_client::Credentials;
use kordophone::api::AuthenticationStore;
use kordophone::api::http_client::Credentials;
use kordophone::{
APIInterface,
api::{EventSocket, HTTPAPIClient, InMemoryAuthenticationStore},
model::{ConversationID, event::EventData},
};
use futures_util::StreamExt;
use hyper::Uri;
@@ -18,7 +18,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <conversation_id1> [conversation_id2] [conversation_id3] ...", args[0]);
eprintln!(
"Usage: {} <conversation_id1> [conversation_id2] [conversation_id3] ...",
args[0]
);
eprintln!("Environment variables required:");
eprintln!(" KORDOPHONE_API_URL - Server URL");
eprintln!(" KORDOPHONE_USERNAME - Username for authentication");
@@ -30,65 +33,74 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server_url: Uri = env::var("KORDOPHONE_API_URL")
.map_err(|_| "KORDOPHONE_API_URL environment variable not set")?
.parse()?;
let username = env::var("KORDOPHONE_USERNAME")
.map_err(|_| "KORDOPHONE_USERNAME environment variable not set")?;
let password = env::var("KORDOPHONE_PASSWORD")
.map_err(|_| "KORDOPHONE_PASSWORD environment variable not set")?;
let credentials = Credentials { username, password };
// Collect all conversation IDs from command line arguments
let target_conversation_ids: Vec<ConversationID> = args[1..].iter()
.map(|id| id.clone())
.collect();
println!("Monitoring {} conversation(s) for updates: {:?}",
target_conversation_ids.len(), target_conversation_ids);
let target_conversation_ids: Vec<ConversationID> =
args[1..].iter().map(|id| id.clone()).collect();
println!(
"Monitoring {} conversation(s) for updates: {:?}",
target_conversation_ids.len(),
target_conversation_ids
);
let auth_store = InMemoryAuthenticationStore::new(Some(credentials.clone()));
let mut client = HTTPAPIClient::new(server_url, auth_store);
let _ = client.authenticate(credentials).await?;
// Open event socket
let event_socket = client.open_event_socket(None).await?;
let (mut stream, _sink) = event_socket.events().await;
println!("Connected to event stream, waiting for updates...");
// Process events
while let Some(event_result) = stream.next().await {
match event_result {
Ok(socket_event) => {
match socket_event {
kordophone::api::event_socket::SocketEvent::Update(event) => {
match event.data {
EventData::MessageReceived(conversation, _message) => {
if target_conversation_ids.contains(&conversation.guid) {
println!("Message update detected for conversation {}, marking as read...", conversation.guid);
match client.mark_conversation_as_read(&conversation.guid).await {
Ok(_) => println!("Successfully marked conversation {} as read", conversation.guid),
Err(e) => eprintln!("Failed to mark conversation {} as read: {:?}", conversation.guid, e),
}
kordophone::api::event_socket::SocketEvent::Update(event) => match event.data {
EventData::MessageReceived(conversation, _message) => {
if target_conversation_ids.contains(&conversation.guid) {
println!(
"Message update detected for conversation {}, marking as read...",
conversation.guid
);
match client.mark_conversation_as_read(&conversation.guid).await {
Ok(_) => println!(
"Successfully marked conversation {} as read",
conversation.guid
),
Err(e) => eprintln!(
"Failed to mark conversation {} as read: {:?}",
conversation.guid, e
),
}
},
_ => {}
}
}
_ => {}
},
kordophone::api::event_socket::SocketEvent::Pong => {
// Ignore pong messages
}
}
},
}
Err(e) => {
eprintln!("Error receiving event: {:?}", e);
break;
}
}
}
println!("Event stream ended");
Ok(())
}

2
gtk/.gitignore vendored
View File

@@ -1,3 +1 @@
build/
flatpak-build/
.flatpak-builder/

View File

@@ -1,25 +0,0 @@
FROM debian:trixie
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
dpkg \
make \
build-essential \
python3 \
imagemagick \
meson \
ninja-build \
valac \
pkg-config \
libgtk-4-dev \
libadwaita-1-dev \
libglib2.0-dev \
libgee-0.8-dev \
libsecret-1-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /workspace
COPY . .
CMD ["make", "deb"]

View File

@@ -11,13 +11,4 @@ rpm:
git -C .. archive --format=tar.gz --prefix=kordophone/ -o $(TMP)/v$(VER).tar.gz HEAD
rpmbuild -ba dist/rpm/kordophone.spec --define "_sourcedir $(TMP)"
deb:
./dist/deb/build-deb.sh $(VER)
.PHONY: flatpak
flatpak:
flatpak-builder --force-clean flatpak-build flatpak/net.buzzert.kordophone.yml
.PHONY: flatpak-install
flatpak-install:
flatpak-builder --force-clean --user --install flatpak-build flatpak/net.buzzert.kordophone.yml

View File

@@ -5,5 +5,3 @@ Libadwaita/GTK4 client for the Kordophone client daemon.
# Building
Build an RPM using `rpmbuild -ba dist/rpm/kordophone.spec`
Build a DEB using `make deb`

View File

@@ -1,6 +0,0 @@
*.deb
*.buildinfo
*.changes
*.dsc
*.tar.*
*.build

View File

@@ -1,49 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
VERSION="${1:-}"
if [[ -z "$VERSION" ]]; then
VERSION="$(sed -n "s/.*version[[:space:]]*:[[:space:]]*'\\([^']*\\)'.*/\\1/p" meson.build | head -n1)"
fi
if [[ -z "$VERSION" ]]; then
echo "Could not determine version (pass as first arg)" >&2
exit 1
fi
ARCH="$(dpkg --print-architecture)"
PKG="kordophone"
STAGE="$(mktemp -d)"
trap 'rm -rf "$STAGE"' EXIT
PKGROOT="$STAGE/pkgroot"
mkdir -p "$PKGROOT/DEBIAN"
BUILD_DIR="$STAGE/build"
meson setup "$BUILD_DIR" --prefix=/usr
ninja -C "$BUILD_DIR"
DESTDIR="$PKGROOT" ninja -C "$BUILD_DIR" install
INSTALLED_SIZE_KB="$(du -sk "$PKGROOT/usr" | awk '{print $1}')"
cat >"$PKGROOT/DEBIAN/control" <<EOF
Package: ${PKG}
Version: ${VERSION}
Section: net
Priority: optional
Architecture: ${ARCH}
Maintainer: James Magahern <james@magahern.com>
Installed-Size: ${INSTALLED_SIZE_KB}
Depends: libgtk-4-1, libadwaita-1-0, libglib2.0-0, libgee-0.8-2, libsecret-1-0, kordophoned (>= 1.0.0)
Description: GTK4/Libadwaita client for Kordophone
A GTK4/Libadwaita Linux client for the Kordophone client daemon.
EOF
OUT_DIR="$ROOT_DIR/dist/deb"
mkdir -p "$OUT_DIR"
OUT_DEB="${OUT_DIR}/${PKG}_${VERSION}_${ARCH}.deb"
dpkg-deb --root-owner-group --build "$PKGROOT" "$OUT_DEB"
echo "$OUT_DEB"

View File

@@ -1,18 +0,0 @@
# Flatpak (GTK client)
This builds the GTK client as a Flatpak **assuming `kordophoned` is installed on the host**
and reachable on the **session bus** as `net.buzzert.kordophonecd`.
## Build
```bash
cd gtk
make flatpak
```
## Install (user)
```bash
cd gtk
make flatpak-install
```

View File

@@ -1,25 +0,0 @@
app-id: net.buzzert.kordophone
runtime: org.gnome.Platform
runtime-version: "48"
sdk: org.gnome.Sdk
command: kordophone
finish-args:
- --share=ipc
- --socket=wayland
- --socket=fallback-x11
- --device=dri
# Talk to the host-installed daemon (option A).
- --socket=session-bus
- --talk-name=net.buzzert.kordophonecd
# libsecret (Secret Service) access for stored credentials.
- --talk-name=org.freedesktop.secrets
modules:
- name: kordophone
buildsystem: meson
config-opts:
- --prefix=/app
sources:
- type: dir
path: ..

View File

@@ -1,5 +1,5 @@
project('kordophone', 'vala',
version : '1.0.2',
version : '1.0.1',
meson_version : '>=0.56.0',
default_options : ['warning_level=2']
)

View File

@@ -5,7 +5,6 @@ public class MainWindow : Adw.ApplicationWindow
{
private ConversationListView conversation_list_view;
private TranscriptContainerView transcript_container_view;
private NavigationSplitView split_view;
private EventControllerMotion _motion_controller = new EventControllerMotion();
private bool _motion_queued = false;
@@ -13,15 +12,10 @@ public class MainWindow : Adw.ApplicationWindow
public MainWindow () {
Object (title: "Kordophone");
split_view = new NavigationSplitView ();
var split_view = new NavigationSplitView ();
split_view.set_min_sidebar_width (400);
split_view.show_content = false;
set_content (split_view);
var breakpoint = new Breakpoint (BreakpointCondition.parse ("max-width: 750sp"));
breakpoint.add_setter (split_view, "collapsed", true);
add_breakpoint (breakpoint);
conversation_list_view = new ConversationListView ();
conversation_list_view.conversation_selected.connect (conversation_selected);
conversation_list_view.conversation_activated.connect (open_conversation_in_new_window);
@@ -106,10 +100,6 @@ public class MainWindow : Adw.ApplicationWindow
GLib.warning("Failed to sync conversation: %s", e.message);
}
}
if (split_view.collapsed) {
split_view.show_content = true;
}
}
}

View File

@@ -14,7 +14,6 @@ public class ConversationListView : Adw.Bin
private string? selected_conversation_guid = null;
private bool selection_update_queued = false;
private bool suppress_row_selected = false;
public ConversationListView () {
container = new Adw.ToolbarView ();
@@ -30,10 +29,6 @@ public class ConversationListView : Adw.Bin
scrolled_window.set_child (list_box);
list_box.row_selected.connect ((row) => {
if (suppress_row_selected) {
return;
}
var conversation_row = (ConversationRow?) row;
if (conversation_row != null) {
selected_conversation_guid = conversation_row.conversation.guid;
@@ -117,9 +112,7 @@ public class ConversationListView : Adw.Bin
if (conversation.guid == selected_conversation_guid) {
var row = list_box.get_row_at_index((int)i);
if (row != null) {
suppress_row_selected = true;
list_box.select_row(row);
suppress_row_selected = false;
}
}
}
@@ -130,4 +123,4 @@ public class ConversationListView : Adw.Bin
Conversation conversation = (Conversation) item;
return new ConversationRow (conversation);
}
}
}

View File

@@ -44,11 +44,11 @@ public class AttachmentInfo : Object {
}
public class Attachment : Object {
public string guid = "";
public string path = "";
public string preview_path = "";
public bool downloaded = false;
public bool preview_downloaded = false;
public string guid;
public string path;
public string preview_path;
public bool downloaded;
public bool preview_downloaded;
public AttachmentMetadata? metadata;
public Attachment(string guid, AttachmentMetadata? metadata) {

View File

@@ -73,13 +73,14 @@
'sender' (string): Sender display name
'attachments' (array of dictionaries): List of attachments
'guid' (string): Attachment GUID
'path' (string): Attachment path
'preview_path' (string): Preview attachment path
'downloaded' (boolean): Whether the attachment is downloaded
'preview_downloaded' (boolean): Whether the preview is downloaded
'metadata' (dictionary, optional): Attachment metadata
'attribution_info' (dictionary, optional): Attribution info
'width' (int32, optional): Width
'height' (int32, optional): Height
Use GetAttachmentInfo for full/preview paths."/>
'height' (int32, optional): Height"/>
</arg>
</method>

View File

@@ -144,35 +144,4 @@ public class Repository : DBusServiceProxy {
var info = dbus_repository.get_attachment_info(attachment_guid);
return new AttachmentInfo(info.attr1, info.attr2, info.attr3, info.attr4);
}
public int open_attachment_fd(string attachment_guid, bool preview) throws DBusServiceProxyError, GLib.Error {
if (dbus_repository == null) {
throw new DBusServiceProxyError.NOT_CONNECTED("Repository not connected");
}
var connection = Bus.get_sync(BusType.SESSION);
UnixFDList? out_fd_list = null;
var result = connection.call_with_unix_fd_list_sync(
DBUS_NAME,
DBUS_PATH,
"net.buzzert.kordophone.Repository",
"OpenAttachmentFd",
new Variant("(sb)", attachment_guid, preview),
new VariantType("(h)"),
DBusCallFlags.NONE,
120000,
null,
out out_fd_list,
null
);
int fd_handle = -1;
result.get("(h)", out fd_handle);
if (out_fd_list == null) {
throw new DBusServiceProxyError.NOT_CONNECTED("Missing UnixFDList from OpenAttachmentFd");
}
return out_fd_list.get(fd_handle);
}
}

View File

@@ -13,52 +13,30 @@ private class SizeCache
return instance;
}
public Graphene.Size? get_size(string attachment_guid) {
return size_cache.get(attachment_guid);
public Graphene.Size? get_size(string image_path) {
return size_cache.get(image_path);
}
public void set_size(string attachment_guid, Graphene.Size size) {
size_cache.set(attachment_guid, size);
}
}
private class TextureCache
{
private static TextureCache instance = null;
private HashMap<string, Gdk.Texture> texture_cache = new HashMap<string, Gdk.Texture>();
public static TextureCache get_instance() {
if (instance == null) {
instance = new TextureCache();
}
return instance;
}
public Gdk.Texture? get_texture(string attachment_guid) {
return texture_cache.get(attachment_guid);
}
public void set_texture(string attachment_guid, Gdk.Texture texture) {
texture_cache.set(attachment_guid, texture);
public void set_size(string image_path, Graphene.Size size) {
size_cache.set(image_path, size);
}
}
private class ImageBubbleLayout : BubbleLayout
{
public string attachment_guid;
public string image_path;
public bool is_downloaded;
public string? attachment_guid;
private Graphene.Size image_size;
private Gdk.Texture? cached_texture = null;
private bool preview_download_queued = false;
public ImageBubbleLayout(string attachment_guid, bool from_me, Widget parent, float max_width, Graphene.Size? image_size = null) {
public ImageBubbleLayout(string image_path, bool from_me, Widget parent, float max_width, Graphene.Size? image_size = null) {
base(parent, max_width);
this.from_me = from_me;
this.attachment_guid = attachment_guid;
this.image_path = image_path;
this.is_downloaded = false;
this.cached_texture = TextureCache.get_instance().get_texture(attachment_guid);
// Calculate image dimensions for layout
calculate_image_dimensions(image_size);
@@ -70,25 +48,26 @@ private class ImageBubbleLayout : BubbleLayout
return;
}
var cached_size = SizeCache.get_instance().get_size(attachment_guid);
var cached_size = SizeCache.get_instance().get_size(image_path);
if (cached_size != null) {
this.image_size = cached_size;
return;
}
this.image_size = Graphene.Size() { width = 200.0f, height = 150.0f };
}
private void queue_preview_download_if_needed() {
if (is_downloaded || preview_download_queued || attachment_guid == "") {
return;
}
// Try to load the image to get its dimensions
try {
Repository.get_instance().download_attachment(attachment_guid, true);
preview_download_queued = true;
} catch (GLib.Error e) {
warning("Failed to queue preview download for %s: %s", attachment_guid, e.message);
warning("No image size provided, loading image to get dimensions");
var texture = Gdk.Texture.from_filename(image_path);
var original_width = (float)texture.get_width();
var original_height = (float)texture.get_height();
this.image_size = Graphene.Size() { width = original_width, height = original_height };
SizeCache.get_instance().set_size(image_path, this.image_size);
} catch (Error e) {
// Fallback dimensions if image can't be loaded
warning("Failed to load image %s: %s", image_path, e.message);
this.image_size = Graphene.Size() { width = 200.0f, height = 150.0f };
}
}
@@ -102,22 +81,9 @@ private class ImageBubbleLayout : BubbleLayout
}
try {
int fd = Repository.get_instance().open_attachment_fd(attachment_guid, true);
var stream = new UnixInputStream(fd, true);
var pixbuf = new Gdk.Pixbuf.from_stream(stream, null);
cached_texture = Gdk.Texture.for_pixbuf(pixbuf);
if (cached_texture != null) {
TextureCache.get_instance().set_texture(attachment_guid, cached_texture);
this.image_size = Graphene.Size() {
width = (float)cached_texture.get_width(),
height = (float)cached_texture.get_height()
};
SizeCache.get_instance().set_size(attachment_guid, this.image_size);
parent.queue_allocate();
}
cached_texture = Gdk.Texture.from_filename(image_path);
} catch (Error e) {
warning("Failed to load preview image for %s: %s", attachment_guid, e.message);
warning("Failed to load image %s: %s", image_path, e.message);
}
}
@@ -144,7 +110,6 @@ private class ImageBubbleLayout : BubbleLayout
}
public override void draw_content(Snapshot snapshot) {
queue_preview_download_if_needed();
load_image_if_needed();
snapshot.save();
@@ -172,4 +137,4 @@ private class ImageBubbleLayout : BubbleLayout
public override void copy(Gdk.Clipboard clipboard) {
clipboard.set_texture(cached_texture);
}
}
}

View File

@@ -62,65 +62,44 @@ public class MessageListModel : Object, ListModel
}
}
public void load_messages(bool force_full_reload = false) {
public void load_messages() {
var previous_messages = new HashSet<Message>();
previous_messages.add_all(_messages);
try {
bool first_load = _messages.size == 0;
string last_message_id = (first_load || force_full_reload) ? "" : _messages.get(_messages.size - 1).guid;
Message[] messages = Repository.get_instance().get_messages(conversation.guid, last_message_id);
bool fallback_full_reload = first_load || force_full_reload;
if (!first_load && messages.length > 0 && previous_messages.contains(messages[0])) {
fallback_full_reload = true;
Message[] messages = Repository.get_instance().get_messages(conversation.guid);
// Clear existing set
uint old_count = _messages.size;
_messages.clear();
participants.clear();
// Notify of removal
if (old_count > 0) {
items_changed(0, old_count, 0);
}
// Process each conversation
uint position = 0;
for (int i = 0; i < messages.length; i++) {
var message = messages[i];
participants.add(message.sender);
if (fallback_full_reload) {
uint old_count = _messages.size;
_messages.clear();
participants.clear();
if (old_count > 0) {
items_changed(0, old_count, 0);
}
uint position = 0;
for (int i = 0; i < messages.length; i++) {
var message = messages[i];
participants.add(message.sender);
if (!first_load && !previous_messages.contains(message)) {
message.should_animate = true;
}
_messages.add(message);
position++;
}
if (position > 0) {
items_changed(0, 0, position);
}
} else {
uint old_count = _messages.size;
uint appended = 0;
for (int i = 0; i < messages.length; i++) {
var message = messages[i];
if (previous_messages.contains(message)) {
continue;
}
participants.add(message.sender);
if (!first_load && !previous_messages.contains(message)) {
// This is a new message according to the UI, schedule an animation for it.
message.should_animate = true;
_messages.add(message);
appended++;
}
if (appended > 0) {
items_changed(old_count, 0, appended);
}
_messages.add(message);
position++;
}
// Notify of additions
if (position > 0) {
items_changed(0, 0, position);
}
} catch (Error e) {
warning("Failed to load messages: %s", e.message);
@@ -155,4 +134,4 @@ public class MessageListModel : Object, ListModel
public Object? get_item(uint position) {
return _messages.get((int)position);
}
}
}

View File

@@ -364,15 +364,16 @@ private class TranscriptDrawingArea : Widget
// Check for attachments. For each one, add an image layout bubble
foreach (var attachment in message.attachments) {
Graphene.Size? image_size = null;
if (attachment.metadata != null && attachment.metadata.attribution_info != null) {
if (attachment.metadata != null) {
image_size = Graphene.Size() {
width = attachment.metadata.attribution_info.width,
height = attachment.metadata.attribution_info.height
};
}
var image_layout = new ImageBubbleLayout(attachment.guid, message.from_me, this, max_width, image_size);
var image_layout = new ImageBubbleLayout(attachment.preview_path, message.from_me, this, max_width, image_size);
image_layout.id = @"image-$(attachment.guid)";
image_layout.attachment_guid = attachment.guid;
if (animate) {
start_animation(image_layout.id);
@@ -380,6 +381,16 @@ private class TranscriptDrawingArea : Widget
image_layout.is_downloaded = attachment.preview_downloaded;
items.add(image_layout);
// If the attachment isn't downloaded, queue a download since we are going to be showing it here.
// TODO: Probably would be better if we only did this for stuff in the viewport.
if (!attachment.preview_downloaded) {
try {
Repository.get_instance().download_attachment(attachment.guid, true);
} catch (GLib.Error e) {
warning("Wasn't able to message daemon about queuing attachment download: %s", e.message);
}
}
}
last_sender = message.sender;
@@ -436,4 +447,4 @@ public struct VisibleLayout {
this.bubble = bubble;
this.rect = rect;
}
}
}

View File

@@ -148,7 +148,7 @@ public class TranscriptView : Adw.Bin
GLib.Idle.add(() => {
if (needs_reload) {
debug("Reloading messages for attachment download");
model.load_messages(true);
model.load_messages();
needs_reload = false;
}