Private
Public Access
1
0

Compare commits

..

3 Commits

76 changed files with 1958 additions and 4112 deletions

2
.gitignore vendored
View File

@@ -1,2 +0,0 @@
ext/
target/

View File

@@ -1,18 +0,0 @@
[target.arm-unknown-linux-gnueabihf]
# Match Raspberry Pi Zero CPU (ARM1176JZF-S).
rustflags = [
"-C", "target-cpu=arm1176jzf-s",
"-L", "native=/usr/lib/arm-linux-gnueabihf",
"-L", "native=/lib/arm-linux-gnueabihf",
"-C", "link-arg=-Wl,-rpath-link,/usr/lib/arm-linux-gnueabihf",
"-C", "link-arg=-Wl,-rpath-link,/lib/arm-linux-gnueabihf",
]
[target.aarch64-unknown-linux-gnu]
linker = "aarch64-linux-gnu-gcc"
rustflags = [
"-L", "native=/usr/lib/aarch64-linux-gnu",
"-L", "native=/lib/aarch64-linux-gnu",
"-C", "link-arg=-Wl,-rpath-link,/usr/lib/aarch64-linux-gnu",
"-C", "link-arg=-Wl,-rpath-link,/lib/aarch64-linux-gnu",
]

1216
core/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,9 +3,7 @@ members = [
"kordophone", "kordophone",
"kordophone-db", "kordophone-db",
"kordophoned", "kordophoned",
"kordophoned-client",
"kpcli", "kpcli",
"kptui",
"utilities", "utilities",
] ]
resolver = "2" resolver = "2"

View File

@@ -1,37 +0,0 @@
[target.arm-unknown-linux-gnueabihf]
# Raspberry Pi Zero / Zero W (ARM1176JZF-S, ARMv6 + hard-float).
#
# Several workspace crates use native libs via pkg-config (dbus, sqlite, libsecret).
# Install the ARMv6/armhf -dev packages inside the cross image so they are available
# to the target linker.
pre-build = [
"dpkg --add-architecture armhf",
"apt-get update",
"apt-get install -y --no-install-recommends bash pkg-config libc6-dev:armhf libdbus-1-dev:armhf libsystemd-dev:armhf libsqlite3-dev:armhf libsecret-1-dev:armhf",
# `cross` doesn't reliably forward PKG_CONFIG_* env vars into the container, so install a tiny
# wrapper that selects the correct multiarch pkgconfig dir based on `$TARGET`.
"bash -lc 'if [ -x /usr/bin/pkg-config ] && [ ! -x /usr/bin/pkg-config.real ]; then mv /usr/bin/pkg-config /usr/bin/pkg-config.real; fi'",
"bash -lc 'printf \"%b\" \"#!/usr/bin/env bash\\nset -euo pipefail\\nREAL=/usr/bin/pkg-config.real\\ncase \\\"\\${TARGET:-}\\\" in\\n arm-unknown-linux-gnueabihf)\\n export PKG_CONFIG_ALLOW_CROSS=1\\n export PKG_CONFIG_SYSROOT_DIR=/\\n export PKG_CONFIG_LIBDIR=/usr/lib/arm-linux-gnueabihf/pkgconfig\\n export PKG_CONFIG_PATH=\\\"\\$PKG_CONFIG_LIBDIR\\\"\\n ;;\\n aarch64-unknown-linux-gnu)\\n export PKG_CONFIG_ALLOW_CROSS=1\\n export PKG_CONFIG_SYSROOT_DIR=/\\n export PKG_CONFIG_LIBDIR=/usr/lib/aarch64-linux-gnu/pkgconfig\\n export PKG_CONFIG_PATH=\\\"\\$PKG_CONFIG_LIBDIR\\\"\\n ;;\\n *)\\n ;;\\nesac\\nexec \\\"\\$REAL\\\" \\\"\\$@\\\"\\n\" > /usr/bin/pkg-config && chmod +x /usr/bin/pkg-config'",
# Sanity checks (use wrapper + armhf search path).
"bash -lc 'TARGET=arm-unknown-linux-gnueabihf pkg-config --modversion dbus-1'",
"bash -lc 'TARGET=arm-unknown-linux-gnueabihf pkg-config --modversion sqlite3'",
"bash -lc 'TARGET=arm-unknown-linux-gnueabihf pkg-config --modversion libsecret-1'",
]
[target.aarch64-unknown-linux-gnu]
# Raspberry Pi OS (64-bit) / other aarch64 Linux.
#
# Use a Debian 11 (bullseye) base so the resulting binaries are compatible with
# bullseye's glibc, and to get a system `libsqlite3` new enough for Diesel.
image = "debian:bullseye-slim"
pre-build = [
"dpkg --add-architecture arm64",
"apt-get update",
"apt-get install -y --no-install-recommends ca-certificates bash pkg-config build-essential gcc-aarch64-linux-gnu libc6-dev:arm64 libdbus-1-dev:arm64 libsystemd-dev:arm64 libsqlite3-dev:arm64 libsecret-1-dev:arm64",
# Same wrapper as above (installed once, safe to re-run).
"bash -lc 'if [ -x /usr/bin/pkg-config ] && [ ! -x /usr/bin/pkg-config.real ]; then mv /usr/bin/pkg-config /usr/bin/pkg-config.real; fi'",
"bash -lc 'printf \"%b\" \"#!/usr/bin/env bash\\nset -euo pipefail\\nREAL=/usr/bin/pkg-config.real\\ncase \\\"\\${TARGET:-}\\\" in\\n arm-unknown-linux-gnueabihf)\\n export PKG_CONFIG_ALLOW_CROSS=1\\n export PKG_CONFIG_SYSROOT_DIR=/\\n export PKG_CONFIG_LIBDIR=/usr/lib/arm-linux-gnueabihf/pkgconfig\\n export PKG_CONFIG_PATH=\\\"\\$PKG_CONFIG_LIBDIR\\\"\\n ;;\\n aarch64-unknown-linux-gnu)\\n export PKG_CONFIG_ALLOW_CROSS=1\\n export PKG_CONFIG_SYSROOT_DIR=/\\n export PKG_CONFIG_LIBDIR=/usr/lib/aarch64-linux-gnu/pkgconfig\\n export PKG_CONFIG_PATH=\\\"\\$PKG_CONFIG_LIBDIR\\\"\\n ;;\\n *)\\n ;;\\nesac\\nexec \\\"\\$REAL\\\" \\\"\\$@\\\"\\n\" > /usr/bin/pkg-config && chmod +x /usr/bin/pkg-config'",
"bash -lc 'TARGET=aarch64-unknown-linux-gnu pkg-config --modversion dbus-1'",
"bash -lc 'TARGET=aarch64-unknown-linux-gnu pkg-config --modversion sqlite3'",
"bash -lc 'TARGET=aarch64-unknown-linux-gnu pkg-config --modversion libsecret-1'",
]

View File

@@ -1,25 +0,0 @@
FROM debian:bookworm
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
curl \
build-essential \
make \
pkg-config \
libssl-dev \
libsqlite3-dev \
libdbus-1-dev \
libsystemd-dev \
dpkg \
&& rm -rf /var/lib/apt/lists/*
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
ENV PATH="/root/.cargo/bin:${PATH}"
RUN cargo install cargo-deb
WORKDIR /workspace
COPY . .
CMD ["make", "deb"]

View File

@@ -12,23 +12,5 @@ rpm:
cargo build --release --workspace cargo build --release --workspace
strip -s target/release/kordophoned strip -s target/release/kordophoned
strip -s target/release/kpcli strip -s target/release/kpcli
strip -s target/release/kptui cargo generate-rpm -p kordophoned
cargo generate-rpm -p kordophoned --auto-req builtin
.PHONY: deb
deb:
cargo build --release --workspace
strip -s target/release/kordophoned
strip -s target/release/kpcli
strip -s target/release/kptui
cargo deb -p kordophoned --no-build
.PHONY: pi-zero
pi-zero:
CARGO_TARGET_DIR=target/cross/arm-unknown-linux-gnueabihf \
cross build --release --target arm-unknown-linux-gnueabihf -p kordophoned -p kpcli -p kptui
.PHONY: pi-aarch64
pi-aarch64:
CARGO_TARGET_DIR=target/cross/aarch64-unknown-linux-gnu \
cross build --release --target aarch64-unknown-linux-gnu -p kordophoned -p kpcli -p kptui

View File

@@ -9,9 +9,7 @@ Workspace members:
- `kordophoned/` — Client daemon providing local caching and IPC - `kordophoned/` — Client daemon providing local caching and IPC
- Linux: DBus - Linux: DBus
- macOS: XPC (see notes below) - macOS: XPC (see notes below)
- `kordophoned-client/` — Cross-platform client library for talking to `kordophoned` (D-Bus/XPC).
- `kpcli/` — Commandline interface for interacting with the API, DB, and daemon. - `kpcli/` — Commandline interface for interacting with the API, DB, and daemon.
- `kptui/` — Terminal UI client (Ratatui) for reading and replying to chats via the daemon.
- `utilities/` — Small helper tools (e.g., testing utilities). - `utilities/` — Small helper tools (e.g., testing utilities).
## Build ## Build
@@ -29,42 +27,6 @@ cargo build -p kordophone
cargo build -p kordophoned --release cargo build -p kordophoned --release
``` ```
## Raspberry Pi Zero (cross build)
Recommended approach is `cross` (https://github.com/cross-rs/cross), which uses a containerized toolchain.
Prereqs:
- Install Docker or Podman
- Install `cross`: `cargo install cross`
Build ARMv6 (Pi Zero / Zero W):
```bash
cd core
make pi-zero
```
Build aarch64 (Pi OS 64-bit / Pi 3+):
```bash
cd core
make pi-aarch64
```
Notes:
- The aarch64 cross build uses a Debian 11 (bullseye) container base image to keep glibc compatible with bullseye.
Artifacts:
- `target/cross/arm-unknown-linux-gnueabihf/arm-unknown-linux-gnueabihf/release/kordophoned`
- `target/cross/arm-unknown-linux-gnueabihf/arm-unknown-linux-gnueabihf/release/kpcli`
- `target/cross/arm-unknown-linux-gnueabihf/arm-unknown-linux-gnueabihf/release/kptui`
- `target/cross/aarch64-unknown-linux-gnu/aarch64-unknown-linux-gnu/release/kordophoned`
- `target/cross/aarch64-unknown-linux-gnu/aarch64-unknown-linux-gnu/release/kpcli`
- `target/cross/aarch64-unknown-linux-gnu/aarch64-unknown-linux-gnu/release/kptui`
## `kordophoned` (Client Daemon) ## `kordophoned` (Client Daemon)
The daemon maintains a local cache, handles update cycles, and exposes IPC for GUI apps. The daemon maintains a local cache, handles update cycles, and exposes IPC for GUI apps.
@@ -91,16 +53,6 @@ strip -s target/release/kordophoned
cargo generate-rpm cargo generate-rpm
``` ```
### Packaging (DEB example)
`kordophoned` is configured for Debian packaging via `cargo-deb`.
```bash
cargo install cargo-deb
cd core
cargo deb -p kordophoned
```
## `kpcli` (CLI) ## `kpcli` (CLI)
Useful for quick testing and interacting with the daemon/cache. Useful for quick testing and interacting with the daemon/cache.
@@ -113,3 +65,4 @@ cargo run -p kpcli -- --help
- TLS/WebSocket: the `kordophone` crate includes `rustls` and installs a crypto provider at process start. - TLS/WebSocket: the `kordophone` crate includes `rustls` and installs a crypto provider at process start.
- DB: `kordophone-db` includes Diesel migrations under `kordophone-db/migrations/`. - DB: `kordophone-db` includes Diesel migrations under `kordophone-db/migrations/`.

View File

@@ -264,34 +264,16 @@ impl<'a> Repository<'a> {
.order_by(schema::messages::date.asc()) .order_by(schema::messages::date.asc())
.load::<MessageRecord>(self.connection)?; .load::<MessageRecord>(self.connection)?;
let sender_handles: Vec<String> = message_records
.iter()
.filter_map(|record| record.sender_participant_handle.clone())
.collect();
let participant_map: HashMap<String, Participant> = if sender_handles.is_empty() {
HashMap::new()
} else {
participants
.filter(handle.eq_any(&sender_handles))
.load::<ParticipantRecord>(self.connection)?
.into_iter()
.map(|participant| {
let key = participant.handle.clone();
(key, participant.into())
})
.collect()
};
let mut result = Vec::new(); let mut result = Vec::new();
for message_record in message_records { for message_record in message_records {
let mut message: Message = message_record.clone().into(); let mut message: Message = message_record.clone().into();
// If the message references a sender participant, load the participant info // If the message references a sender participant, load the participant info
if let Some(sender_handle) = message_record.sender_participant_handle { if let Some(sender_handle) = message_record.sender_participant_handle {
if let Some(participant) = participant_map.get(&sender_handle) { let participant = participants
message.sender = participant.clone(); .find(sender_handle)
} .first::<ParticipantRecord>(self.connection)?;
message.sender = participant.into();
} }
result.push(message); result.push(message);

View File

@@ -14,7 +14,7 @@ ctor = "0.2.8"
env_logger = "0.11.5" env_logger = "0.11.5"
futures-util = "0.3.31" futures-util = "0.3.31"
hyper = { version = "0.14", features = ["full"] } hyper = { version = "0.14", features = ["full"] }
hyper-rustls = { version = "0.24", default-features = false, features = ["http1", "webpki-tokio"] } hyper-tls = "0.5.0"
log = { version = "0.4.21", features = [] } log = { version = "0.4.21", features = [] }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91" serde_json = "1.0.91"

View File

@@ -7,7 +7,7 @@ use crate::api::event_socket::{EventSocket, SinkMessage, SocketEvent, SocketUpda
use crate::api::AuthenticationStore; use crate::api::AuthenticationStore;
use bytes::Bytes; use bytes::Bytes;
use hyper::{Body, Client, Method, Request, Uri}; use hyper::{Body, Client, Method, Request, Uri};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; use hyper_tls::HttpsConnector;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
@@ -24,7 +24,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use crate::{ use crate::{
model::{ model::{
Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage, Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage,
OutgoingMessageTarget, ResolveHandleResponse, SendMessageResponse, UpdateItem, UpdateItem,
}, },
APIInterface, APIInterface,
}; };
@@ -65,15 +65,7 @@ impl std::error::Error for Error {
impl std::fmt::Display for Error { impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { write!(f, "{:?}", self)
Error::ClientError(message) => write!(f, "{}", message),
Error::HTTPError(err) => write!(f, "HTTP transport error: {}", err),
Error::SerdeError(err) => write!(f, "JSON error: {}", err),
Error::DecodeError(message) => write!(f, "Decode error: {}", message),
Error::PongError(err) => write!(f, "WebSocket error: {}", err),
Error::URLError => write!(f, "Invalid URL"),
Error::Unauthorized => write!(f, "Unauthorized"),
}
} }
} }
@@ -292,17 +284,6 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
Ok(()) Ok(())
} }
async fn delete_conversation(
&mut self,
conversation_id: &ConversationID,
) -> Result<(), Self::Error> {
// SERVER JANK: This should be DELETE or POST, but it's GET for some reason.
let endpoint = format!("delete?guid={}", conversation_id);
self.response_with_body_retry(&endpoint, Method::GET, Body::empty, true)
.await?;
Ok(())
}
async fn get_messages( async fn get_messages(
&mut self, &mut self,
conversation_id: &ConversationID, conversation_id: &ConversationID,
@@ -331,46 +312,16 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
async fn send_message( async fn send_message(
&mut self, &mut self,
outgoing_message: &OutgoingMessage, outgoing_message: &OutgoingMessage,
) -> Result<SendMessageResponse, Self::Error> { ) -> Result<Message, Self::Error> {
match &outgoing_message.target { let message: Message = self
OutgoingMessageTarget::Conversation(conversation_id) => {
log::debug!(
"Sending message to conversation {} (body_length={}, attachment_count={})",
conversation_id,
outgoing_message.text.len(),
outgoing_message.file_transfer_guids.len()
);
}
OutgoingMessageTarget::Handles(handle_ids) => {
log::debug!(
"Sending message to resolved handles {:?} (body_length={}, attachment_count={})",
handle_ids,
outgoing_message.text.len(),
outgoing_message.file_transfer_guids.len()
);
}
}
let message: SendMessageResponse = self
.deserialized_response_with_body("sendMessage", Method::POST, || { .deserialized_response_with_body("sendMessage", Method::POST, || {
Self::send_message_request_body(outgoing_message) serde_json::to_string(&outgoing_message).unwrap().into()
}) })
.await?; .await?;
Ok(message) Ok(message)
} }
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error> {
log::debug!("Resolving handle {}", handle_id);
let endpoint = format!("resolveHandle?id={}", urlencoding::encode(handle_id));
let response: ResolveHandleResponse =
self.deserialized_response(&endpoint, Method::GET).await?;
Ok(response)
}
async fn fetch_attachment_data( async fn fetch_attachment_data(
&mut self, &mut self,
guid: &str, guid: &str,
@@ -474,7 +425,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
log::debug!("Websocket request: {:?}", request); log::debug!("Websocket request: {:?}", request);
let should_retry = true; // retry once after authenticating. let mut should_retry = true; // retry once after authenticating.
match connect_async(request).await.map_err(Error::from) { match connect_async(request).await.map_err(Error::from) {
Ok((socket, response)) => { Ok((socket, response)) => {
log::debug!("Websocket connected: {:?}", response.status()); log::debug!("Websocket connected: {:?}", response.status());
@@ -516,11 +467,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> { impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
pub fn new(base_url: Uri, auth_store: K) -> HTTPAPIClient<K> { pub fn new(base_url: Uri, auth_store: K) -> HTTPAPIClient<K> {
let https = HttpsConnectorBuilder::new() let https = HttpsConnector::new();
.with_webpki_roots()
.https_or_http()
.enable_http1()
.build();
let client = Client::builder().build::<_, Body>(https); let client = Client::builder().build::<_, Body>(https);
HTTPAPIClient { HTTPAPIClient {
@@ -530,34 +477,6 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
} }
} }
fn send_message_request_body(outgoing_message: &OutgoingMessage) -> Body {
#[derive(Serialize)]
struct SendMessageRequest<'a> {
#[serde(rename = "body")]
text: &'a str,
#[serde(rename = "guid", skip_serializing_if = "Option::is_none")]
conversation_id: Option<&'a ConversationID>,
#[serde(rename = "handleIDs", skip_serializing_if = "Option::is_none")]
handle_ids: Option<&'a [String]>,
#[serde(rename = "fileTransferGUIDs")]
file_transfer_guids: &'a Vec<String>,
}
let (conversation_id, handle_ids) = match &outgoing_message.target {
OutgoingMessageTarget::Conversation(conversation_id) => (Some(conversation_id), None),
OutgoingMessageTarget::Handles(handle_ids) => (None, Some(handle_ids.as_slice())),
};
serde_json::to_string(&SendMessageRequest {
text: &outgoing_message.text,
conversation_id,
handle_ids,
file_transfer_guids: &outgoing_message.file_transfer_guids,
})
.unwrap()
.into()
}
fn uri_for_endpoint(&self, endpoint: &str, scheme: Option<&str>) -> Result<Uri, Error> { fn uri_for_endpoint(&self, endpoint: &str, scheme: Option<&str>) -> Result<Uri, Error> {
let mut parts = self.base_url.clone().into_parts(); let mut parts = self.base_url.clone().into_parts();
let root_path: PathBuf = parts.path_and_query.ok_or(Error::URLError)?.path().into(); let root_path: PathBuf = parts.path_and_query.ok_or(Error::URLError)?.path().into();
@@ -581,18 +500,6 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
} }
} }
fn log_transport_error(method: &Method, target: &str, err: &hyper::Error) {
log::error!("HTTP transport error for {} {}: {}", method, target, err);
if format!("{:?}", err).contains("IncompleteMessage") {
log::error!(
"The server closed the connection before a complete response was received for {} {}.",
method,
target
);
}
}
async fn deserialized_response<T: DeserializeOwned>( async fn deserialized_response<T: DeserializeOwned>(
&mut self, &mut self,
endpoint: &str, endpoint: &str,
@@ -626,26 +533,15 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
T: DeserializeOwned, T: DeserializeOwned,
{ {
let response = self let response = self
.response_with_body_retry(endpoint, method.clone(), body_fn, retry_auth) .response_with_body_retry(endpoint, method, body_fn, retry_auth)
.await?; .await?;
// Read and parse response body // Read and parse response body
let body = match hyper::body::to_bytes(response.into_body()).await { let body = hyper::body::to_bytes(response.into_body()).await?;
Ok(body) => body,
Err(err) => {
Self::log_transport_error(&method, endpoint, &err);
return Err(Error::HTTPError(err));
}
};
let parsed: T = match serde_json::from_slice(&body) { let parsed: T = match serde_json::from_slice(&body) {
Ok(result) => Ok(result), Ok(result) => Ok(result),
Err(json_err) => { Err(json_err) => {
log::error!( log::error!("Error deserializing JSON: {:?}", json_err);
"Error deserializing JSON for {} {}: {:?}",
method,
endpoint,
json_err
);
log::error!("Body: {:?}", String::from_utf8_lossy(&body)); log::error!("Body: {:?}", String::from_utf8_lossy(&body));
// If JSON deserialization fails, try to interpret it as plain text // If JSON deserialization fails, try to interpret it as plain text
@@ -668,8 +564,7 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
use hyper::StatusCode; use hyper::StatusCode;
let uri = self.uri_for_endpoint(endpoint, None)?; let uri = self.uri_for_endpoint(endpoint, None)?;
let uri_string = uri.to_string(); log::debug!("Requesting {:?} {:?}", method, uri);
log::debug!("Requesting {} {}", method, uri_string);
let mut build_request = |auth: &Option<String>| { let mut build_request = |auth: &Option<String>| {
let body = body_fn(); let body = body_fn();
@@ -683,24 +578,13 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
log::trace!("Obtaining token from auth store"); log::trace!("Obtaining token from auth store");
let token = self.auth_store.get_token().await; let token = self.auth_store.get_token().await;
log::trace!("Token present: {}", token.is_some()); log::trace!("Token: {:?}", token);
let request = build_request(&token); let request = build_request(&token);
log::trace!( log::trace!("Request: {:?}. Sending request...", request);
"Sending request: method={} uri={} authenticated={}",
method,
uri_string,
token.is_some()
);
let mut response = match self.client.request(request).await { let mut response = self.client.request(request).await?;
Ok(response) => response, log::debug!("-> Response: {:}", response.status());
Err(err) => {
Self::log_transport_error(&method, &uri_string, &err);
return Err(Error::HTTPError(err));
}
};
log::debug!("-> Response: {}", response.status());
match response.status() { match response.status() {
StatusCode::OK => { /* cool */ } StatusCode::OK => { /* cool */ }
@@ -719,19 +603,7 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
let new_token = self.authenticate(credentials.clone()).await?; let new_token = self.authenticate(credentials.clone()).await?;
let request = build_request(&Some(new_token.to_string())); let request = build_request(&Some(new_token.to_string()));
log::trace!( response = self.client.request(request).await?;
"Retrying request after authentication: method={} uri={} authenticated=true",
method,
uri_string
);
response = match self.client.request(request).await {
Ok(response) => response,
Err(err) => {
Self::log_transport_error(&method, &uri_string, &err);
return Err(Error::HTTPError(err));
}
};
log::debug!("-> Retry response: {}", response.status());
} else { } else {
return Err(Error::ClientError( return Err(Error::ClientError(
"Unauthorized, no credentials provided".into(), "Unauthorized, no credentials provided".into(),

View File

@@ -1,7 +1,4 @@
pub use crate::model::{ pub use crate::model::{Conversation, ConversationID, Message, MessageID, OutgoingMessage};
Conversation, ConversationID, Message, MessageID, OutgoingMessage, ResolveHandleResponse,
SendMessageResponse,
};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
@@ -45,13 +42,7 @@ pub trait APIInterface {
async fn send_message( async fn send_message(
&mut self, &mut self,
outgoing_message: &OutgoingMessage, outgoing_message: &OutgoingMessage,
) -> Result<SendMessageResponse, Self::Error>; ) -> Result<Message, Self::Error>;
// (GET) /resolveHandle
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error>;
// (GET) /attachment // (GET) /attachment
async fn fetch_attachment_data( async fn fetch_attachment_data(
@@ -79,12 +70,6 @@ pub trait APIInterface {
conversation_id: &ConversationID, conversation_id: &ConversationID,
) -> Result<(), Self::Error>; ) -> Result<(), Self::Error>;
// (GET) /delete
async fn delete_conversation(
&mut self,
conversation_id: &ConversationID,
) -> Result<(), Self::Error>;
// (WS) /updates // (WS) /updates
async fn open_event_socket( async fn open_event_socket(
&mut self, &mut self,

View File

@@ -1,28 +0,0 @@
use serde::{Deserialize, Serialize};
use super::conversation::ConversationID;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolvedHandle {
pub id: String,
pub name: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum HandleResolutionStatus {
Valid,
Invalid,
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolveHandleResponse {
#[serde(rename = "resolvedHandle")]
pub resolved_handle: ResolvedHandle,
pub status: HandleResolutionStatus,
#[serde(rename = "existingChat")]
pub existing_chat: Option<ConversationID>,
}

View File

@@ -1,9 +1,7 @@
pub mod conversation; pub mod conversation;
pub mod event; pub mod event;
pub mod handle;
pub mod message; pub mod message;
pub mod outgoing_message; pub mod outgoing_message;
pub mod send_message_response;
pub mod update; pub mod update;
pub use conversation::Conversation; pub use conversation::Conversation;
@@ -12,15 +10,8 @@ pub use conversation::ConversationID;
pub use message::Message; pub use message::Message;
pub use message::MessageID; pub use message::MessageID;
pub use handle::HandleResolutionStatus;
pub use handle::ResolveHandleResponse;
pub use handle::ResolvedHandle;
pub use outgoing_message::OutgoingMessage; pub use outgoing_message::OutgoingMessage;
pub use outgoing_message::OutgoingMessageBuilder; pub use outgoing_message::OutgoingMessageBuilder;
pub use outgoing_message::OutgoingMessageTarget;
pub use send_message_response::SendMessageResponse;
pub use update::UpdateItem; pub use update::UpdateItem;

View File

@@ -1,23 +1,23 @@
use super::conversation::ConversationID; use super::conversation::ConversationID;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use serde::Serialize;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, Serialize)]
pub enum OutgoingMessageTarget {
Conversation(ConversationID),
Handles(Vec<String>),
}
#[derive(Debug, Clone)]
pub struct OutgoingMessage { pub struct OutgoingMessage {
#[serde(skip)]
pub guid: Uuid, pub guid: Uuid,
#[serde(skip)]
pub date: NaiveDateTime, pub date: NaiveDateTime,
#[serde(rename = "body")]
pub text: String, pub text: String,
pub target: OutgoingMessageTarget, #[serde(rename = "guid")]
pub conversation_id: ConversationID,
#[serde(rename = "fileTransferGUIDs")]
pub file_transfer_guids: Vec<String>, pub file_transfer_guids: Vec<String>,
} }
@@ -25,27 +25,13 @@ impl OutgoingMessage {
pub fn builder() -> OutgoingMessageBuilder { pub fn builder() -> OutgoingMessageBuilder {
OutgoingMessageBuilder::new() OutgoingMessageBuilder::new()
} }
pub fn conversation_id(&self) -> Option<&ConversationID> {
match &self.target {
OutgoingMessageTarget::Conversation(conversation_id) => Some(conversation_id),
OutgoingMessageTarget::Handles(_) => None,
}
}
pub fn handle_ids(&self) -> Option<&[String]> {
match &self.target {
OutgoingMessageTarget::Conversation(_) => None,
OutgoingMessageTarget::Handles(handle_ids) => Some(handle_ids.as_slice()),
}
}
} }
#[derive(Default)] #[derive(Default)]
pub struct OutgoingMessageBuilder { pub struct OutgoingMessageBuilder {
guid: Option<Uuid>, guid: Option<Uuid>,
text: Option<String>, text: Option<String>,
target: Option<OutgoingMessageTarget>, conversation_id: Option<ConversationID>,
file_transfer_guids: Option<Vec<String>>, file_transfer_guids: Option<Vec<String>>,
} }
@@ -64,18 +50,8 @@ impl OutgoingMessageBuilder {
self self
} }
pub fn target(mut self, target: OutgoingMessageTarget) -> Self {
self.target = Some(target);
self
}
pub fn conversation_id(mut self, conversation_id: ConversationID) -> Self { pub fn conversation_id(mut self, conversation_id: ConversationID) -> Self {
self.target = Some(OutgoingMessageTarget::Conversation(conversation_id)); self.conversation_id = Some(conversation_id);
self
}
pub fn handle_ids(mut self, handle_ids: Vec<String>) -> Self {
self.target = Some(OutgoingMessageTarget::Handles(handle_ids));
self self
} }
@@ -88,7 +64,7 @@ impl OutgoingMessageBuilder {
OutgoingMessage { OutgoingMessage {
guid: self.guid.unwrap_or_else(Uuid::new_v4), guid: self.guid.unwrap_or_else(Uuid::new_v4),
text: self.text.unwrap(), text: self.text.unwrap(),
target: self.target.unwrap(), conversation_id: self.conversation_id.unwrap(),
file_transfer_guids: self.file_transfer_guids.unwrap_or_default(), file_transfer_guids: self.file_transfer_guids.unwrap_or_default(),
date: chrono::Utc::now().naive_utc(), date: chrono::Utc::now().naive_utc(),
} }

View File

@@ -1,12 +0,0 @@
use serde::Deserialize;
use super::{conversation::ConversationID, message::Message};
#[derive(Debug, Clone, Deserialize)]
pub struct SendMessageResponse {
#[serde(flatten)]
pub message: Message,
#[serde(rename = "conversationGUID")]
pub conversation_id: Option<ConversationID>,
}

View File

@@ -3,7 +3,7 @@ use self::test_client::TestClient;
use crate::APIInterface; use crate::APIInterface;
pub mod api_interface { pub mod api_interface {
use crate::model::{Conversation, HandleResolutionStatus, OutgoingMessage}; use crate::model::Conversation;
use super::*; use super::*;
@@ -28,42 +28,4 @@ pub mod api_interface {
assert_eq!(conversations.len(), 1); assert_eq!(conversations.len(), 1);
assert_eq!(conversations[0].display_name, test_convo.display_name); assert_eq!(conversations[0].display_name, test_convo.display_name);
} }
#[tokio::test]
async fn test_resolve_handle() {
let mut client = TestClient::new();
let resolved = client.resolve_handle("user@example.com").await.unwrap();
assert_eq!(resolved.resolved_handle.id, "user@example.com");
assert_eq!(resolved.status, HandleResolutionStatus::Valid);
assert_eq!(resolved.existing_chat, None);
}
#[tokio::test]
async fn test_send_message_with_handles() {
let mut client = TestClient::new();
let outgoing_message = OutgoingMessage::builder()
.text("hello".to_string())
.handle_ids(vec!["user@example.com".to_string()])
.build();
let sent = client.send_message(&outgoing_message).await.unwrap();
assert_eq!(sent.message.text, "hello");
assert_eq!(sent.conversation_id, None);
}
#[tokio::test]
async fn test_delete_conversation() {
let mut client = TestClient::new();
let test_convo = Conversation::builder().display_name("Delete Me").build();
client.conversations.push(test_convo.clone());
client.delete_conversation(&test_convo.guid).await.unwrap();
let conversations = client.get_conversations().await.unwrap();
assert!(conversations.is_empty());
}
} }

View File

@@ -9,17 +9,14 @@ use crate::{
api::event_socket::{EventSocket, SinkMessage, SocketEvent, SocketUpdate}, api::event_socket::{EventSocket, SinkMessage, SocketEvent, SocketUpdate},
api::http_client::Credentials, api::http_client::Credentials,
model::{ model::{
Conversation, ConversationID, Event, HandleResolutionStatus, JwtToken, Message, MessageID, Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage,
OutgoingMessage, OutgoingMessageTarget, ResolveHandleResponse, ResolvedHandle, UpdateItem,
SendMessageResponse,
}, },
}; };
use bytes::Bytes; use bytes::Bytes;
use futures_util::sink::drain;
use futures_util::stream::BoxStream; use futures_util::stream::BoxStream;
use futures_util::Sink; use futures_util::Sink;
use futures_util::SinkExt;
use futures_util::StreamExt; use futures_util::StreamExt;
pub struct TestClient { pub struct TestClient {
@@ -66,18 +63,13 @@ impl EventSocket for TestEventSocket {
impl Sink<SinkMessage, Error = Self::Error>, impl Sink<SinkMessage, Error = Self::Error>,
) { ) {
( (
futures_util::stream::iter( futures_util::stream::iter(self.events.into_iter().map(Ok)).boxed(),
self.events futures_util::sink::sink(),
.into_iter()
.map(|event| Ok(SocketEvent::Update(event))),
)
.boxed(),
drain().sink_map_err(|err| match err {}),
) )
} }
async fn raw_updates(self) -> Self::UpdateStream { async fn raw_updates(self) -> Self::UpdateStream {
let results: Vec<Result<SocketUpdate, TestError>> = vec![]; let results: Vec<Result<Vec<UpdateItem>, TestError>> = vec![];
futures_util::stream::iter(results.into_iter()).boxed() futures_util::stream::iter(results.into_iter()).boxed()
} }
} }
@@ -102,9 +94,9 @@ impl APIInterface for TestClient {
async fn get_messages( async fn get_messages(
&mut self, &mut self,
conversation_id: &ConversationID, conversation_id: &ConversationID,
_limit: Option<u32>, limit: Option<u32>,
_before: Option<MessageID>, before: Option<MessageID>,
_after: Option<MessageID>, after: Option<MessageID>,
) -> Result<Vec<Message>, Self::Error> { ) -> Result<Vec<Message>, Self::Error> {
if let Some(messages) = self.messages.get(conversation_id) { if let Some(messages) = self.messages.get(conversation_id) {
return Ok(messages.clone()); return Ok(messages.clone());
@@ -116,42 +108,18 @@ impl APIInterface for TestClient {
async fn send_message( async fn send_message(
&mut self, &mut self,
outgoing_message: &OutgoingMessage, outgoing_message: &OutgoingMessage,
) -> Result<SendMessageResponse, Self::Error> { ) -> Result<Message, Self::Error> {
let message = Message::builder() let message = Message::builder()
.guid(Uuid::new_v4().to_string()) .guid(Uuid::new_v4().to_string())
.text(outgoing_message.text.clone()) .text(outgoing_message.text.clone())
.date(OffsetDateTime::now_utc()) .date(OffsetDateTime::now_utc())
.build(); .build();
let conversation_id = match &outgoing_message.target { self.messages
OutgoingMessageTarget::Conversation(conversation_id) => { .entry(outgoing_message.conversation_id.clone())
self.messages .or_insert(vec![])
.entry(conversation_id.clone()) .push(message.clone());
.or_insert(vec![]) Ok(message)
.push(message.clone());
None
}
OutgoingMessageTarget::Handles(_) => None,
};
Ok(SendMessageResponse {
message,
conversation_id,
})
}
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error> {
Ok(ResolveHandleResponse {
resolved_handle: ResolvedHandle {
id: handle_id.to_string(),
name: None,
},
status: HandleResolutionStatus::Valid,
existing_chat: None,
})
} }
async fn open_event_socket( async fn open_event_socket(
@@ -163,17 +131,17 @@ impl APIInterface for TestClient {
async fn fetch_attachment_data( async fn fetch_attachment_data(
&mut self, &mut self,
_guid: &str, guid: &str,
_preview: bool, preview: bool,
) -> Result<Self::ResponseStream, Self::Error> { ) -> Result<Self::ResponseStream, Self::Error> {
Ok(futures_util::stream::iter(vec![Ok(Bytes::from_static(b"test"))]).boxed()) Ok(futures_util::stream::iter(vec![Ok(Bytes::from_static(b"test"))]).boxed())
} }
async fn upload_attachment<R>( async fn upload_attachment<R>(
&mut self, &mut self,
_data: tokio::io::BufReader<R>, data: tokio::io::BufReader<R>,
_filename: &str, filename: &str,
_size: u64, size: u64,
) -> Result<String, Self::Error> ) -> Result<String, Self::Error>
where where
R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static, R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
@@ -182,24 +150,9 @@ impl APIInterface for TestClient {
} }
async fn mark_conversation_as_read( async fn mark_conversation_as_read(
&mut self,
_conversation_id: &ConversationID,
) -> Result<(), Self::Error> {
Ok(())
}
async fn delete_conversation(
&mut self, &mut self,
conversation_id: &ConversationID, conversation_id: &ConversationID,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let previous_len = self.conversations.len();
self.conversations.retain(|c| &c.guid != conversation_id);
self.messages.remove(conversation_id);
if self.conversations.len() == previous_len {
return Err(TestError::ConversationNotFound);
}
Ok(()) Ok(())
} }
} }

View File

@@ -1,22 +0,0 @@
[package]
name = "kordophoned-client"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.93"
log = "0.4.22"
# D-Bus dependencies only on Linux
[target.'cfg(target_os = "linux")'.dependencies]
dbus = "0.9.7"
# D-Bus codegen only on Linux
[target.'cfg(target_os = "linux")'.build-dependencies]
dbus-codegen = { version = "0.10.0", default-features = false }
# XPC (libxpc) interface only on macOS
[target.'cfg(target_os = "macos")'.dependencies]
block = "0.1.6"
xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" }
xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" }

View File

@@ -1,25 +0,0 @@
const KORDOPHONE_XML: &str = "../kordophoned/include/net.buzzert.kordophonecd.Server.xml";
#[cfg(not(target_os = "linux"))]
fn main() {
// No D-Bus codegen on non-Linux platforms.
}
#[cfg(target_os = "linux")]
fn main() {
let out_dir = std::env::var("OUT_DIR").unwrap();
let out_path = std::path::Path::new(&out_dir).join("kordophone-client.rs");
let opts = dbus_codegen::GenOpts {
connectiontype: dbus_codegen::ConnectionType::Blocking,
methodtype: None,
..Default::default()
};
let xml = std::fs::read_to_string(KORDOPHONE_XML).expect("Error reading server dbus interface");
let output =
dbus_codegen::generate(&xml, &opts).expect("Error generating client dbus interface");
std::fs::write(out_path, output).expect("Error writing client dbus code");
println!("cargo:rerun-if-changed={}", KORDOPHONE_XML);
}

View File

@@ -1,4 +0,0 @@
mod platform;
mod worker;
pub use worker::{spawn_worker, ChatMessage, ConversationSummary, Event, Request};

View File

@@ -1,200 +0,0 @@
#![cfg(target_os = "linux")]
use crate::worker::{ChatMessage, ConversationSummary, DaemonClient, Event};
use anyhow::Result;
use dbus::arg::{PropMap, RefArg};
use dbus::blocking::{Connection, Proxy};
use dbus::channel::Token;
use std::sync::mpsc::Sender;
use std::time::Duration;
const DBUS_NAME: &str = "net.buzzert.kordophonecd";
const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon";
#[allow(unused)]
mod dbus_interface {
#![allow(unused)]
include!(concat!(env!("OUT_DIR"), "/kordophone-client.rs"));
}
use dbus_interface::NetBuzzertKordophoneRepository as KordophoneRepository;
pub(crate) struct DBusClient {
conn: Connection,
signal_tokens: Vec<Token>,
}
impl DBusClient {
pub(crate) fn new() -> Result<Self> {
Ok(Self {
conn: Connection::new_session()?,
signal_tokens: Vec::new(),
})
}
fn proxy(&self) -> Proxy<&Connection> {
self.conn
.with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_millis(5000))
}
}
fn get_string(map: &PropMap, key: &str) -> String {
map.get(key)
.and_then(|v| v.0.as_str())
.unwrap_or_default()
.to_string()
}
fn get_i64(map: &PropMap, key: &str) -> i64 {
map.get(key).and_then(|v| v.0.as_i64()).unwrap_or(0)
}
fn get_u32(map: &PropMap, key: &str) -> u32 {
get_i64(map, key).try_into().unwrap_or(0)
}
fn get_vec_string(map: &PropMap, key: &str) -> Vec<String> {
map.get(key)
.and_then(|v| v.0.as_iter())
.map(|iter| {
iter.filter_map(|item| item.as_str().map(|s| s.to_string()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
}
impl DaemonClient for DBusClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>> {
let mut items = KordophoneRepository::get_conversations(&self.proxy(), limit, offset)?;
let mut conversations = items
.drain(..)
.map(|conv| {
let id = get_string(&conv, "guid");
let display_name = get_string(&conv, "display_name");
let participants = get_vec_string(&conv, "participants");
let title = if !display_name.trim().is_empty() {
display_name
} else if participants.is_empty() {
"<unknown>".to_string()
} else {
participants.join(", ")
};
ConversationSummary {
id,
title,
preview: get_string(&conv, "last_message_preview").replace('\n', " "),
unread_count: get_u32(&conv, "unread_count"),
date_unix: get_i64(&conv, "date"),
}
})
.collect::<Vec<_>>();
conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix));
Ok(conversations)
}
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>> {
let messages = KordophoneRepository::get_messages(
&self.proxy(),
&conversation_id,
&last_message_id.unwrap_or_default(),
)?;
Ok(messages
.into_iter()
.map(|msg| ChatMessage {
sender: get_string(&msg, "sender"),
text: get_string(&msg, "text"),
date_unix: get_i64(&msg, "date"),
})
.collect())
}
fn reply(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
let attachment_guids: Vec<&str> = vec![];
let outgoing_id =
KordophoneRepository::reply(&self.proxy(), &conversation_id, &text, attachment_guids)?;
Ok(Some(outgoing_id))
}
fn new_conversation(
&mut self,
handle_ids: Vec<String>,
text: String,
) -> Result<Option<String>> {
let attachment_guids: Vec<&str> = vec![];
let handle_ids: Vec<&str> = handle_ids.iter().map(String::as_str).collect();
let outgoing_id = KordophoneRepository::new_conversation(
&self.proxy(),
handle_ids,
&text,
attachment_guids,
)?;
Ok(Some(outgoing_id))
}
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id)
.map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {e}"))
}
fn sync_conversation(&mut self, conversation_id: String) -> Result<()> {
KordophoneRepository::sync_conversation(&self.proxy(), &conversation_id)
.map_err(|e| anyhow::anyhow!("Failed to sync conversation: {e}"))
}
fn install_signal_handlers(&mut self, event_tx: Sender<Event>) -> Result<()> {
let conversations_tx = event_tx.clone();
let t1 = self
.proxy()
.match_signal(
move |_: dbus_interface::NetBuzzertKordophoneRepositoryConversationsUpdated,
_: &Connection,
_: &dbus::message::Message| {
let _ = conversations_tx.send(Event::ConversationsUpdated);
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match ConversationsUpdated: {e}"))?;
let messages_tx = event_tx.clone();
let t2 = self
.proxy()
.match_signal(
move |s: dbus_interface::NetBuzzertKordophoneRepositoryMessagesUpdated,
_: &Connection,
_: &dbus::message::Message| {
let _ = messages_tx.send(Event::MessagesUpdated {
conversation_id: s.conversation_id,
});
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match MessagesUpdated: {e}"))?;
let reconnected_tx = event_tx;
let t3 = self
.proxy()
.match_signal(
move |_: dbus_interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected,
_: &Connection,
_: &dbus::message::Message| {
let _ = reconnected_tx.send(Event::UpdateStreamReconnected);
true
},
)
.map_err(|e| anyhow::anyhow!("Failed to match UpdateStreamReconnected: {e}"))?;
self.signal_tokens.extend([t1, t2, t3]);
Ok(())
}
fn poll(&mut self, timeout: Duration) -> Result<()> {
self.conn.process(timeout)?;
Ok(())
}
}

View File

@@ -1,272 +0,0 @@
#![cfg(target_os = "macos")]
use crate::worker::{ChatMessage, ConversationSummary, DaemonClient};
use anyhow::Result;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use xpc_connection::Message;
const SERVICE_NAME: &str = "net.buzzert.kordophonecd\0";
struct XpcTransport {
connection: xpc_connection_sys::xpc_connection_t,
}
impl XpcTransport {
fn connect(name: impl AsRef<CStr>) -> Self {
use xpc_connection_sys::xpc_connection_create_mach_service;
use xpc_connection_sys::xpc_connection_resume;
let name = name.as_ref();
let connection =
unsafe { xpc_connection_create_mach_service(name.as_ptr(), std::ptr::null_mut(), 0) };
unsafe { xpc_connection_resume(connection) };
Self { connection }
}
fn send_with_reply(&self, message: Message) -> Message {
use xpc_connection::message_to_xpc_object;
use xpc_connection::xpc_object_to_message;
use xpc_connection_sys::{xpc_connection_send_message_with_reply_sync, xpc_release};
unsafe {
let xobj = message_to_xpc_object(message);
let reply = xpc_connection_send_message_with_reply_sync(self.connection, xobj);
xpc_release(xobj);
let msg = xpc_object_to_message(reply);
if !reply.is_null() {
xpc_release(reply);
}
msg
}
}
}
impl Drop for XpcTransport {
fn drop(&mut self) {
use xpc_connection_sys::xpc_object_t;
use xpc_connection_sys::xpc_release;
unsafe { xpc_release(self.connection as xpc_object_t) };
}
}
pub(crate) struct XpcClient {
transport: XpcTransport,
}
impl XpcClient {
pub(crate) fn new() -> Result<Self> {
let mach_port_name = CString::new(SERVICE_NAME).unwrap();
Ok(Self {
transport: XpcTransport::connect(&mach_port_name),
})
}
fn key(s: &str) -> CString {
CString::new(s).unwrap()
}
fn request(name: &str, arguments: Option<HashMap<CString, Message>>) -> Message {
let mut root: HashMap<CString, Message> = HashMap::new();
root.insert(Self::key("name"), Message::String(Self::key(name)));
if let Some(args) = arguments {
root.insert(Self::key("arguments"), Message::Dictionary(args));
}
Message::Dictionary(root)
}
fn get_string(map: &HashMap<CString, Message>, key: &str) -> Option<String> {
match map.get(&Self::key(key)) {
Some(Message::String(s)) => Some(s.to_string_lossy().into_owned()),
_ => None,
}
}
fn get_i64_from_str(map: &HashMap<CString, Message>, key: &str) -> i64 {
Self::get_string(map, key)
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0)
}
}
impl DaemonClient for XpcClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>> {
let mut args = HashMap::new();
args.insert(
Self::key("limit"),
Message::String(Self::key(&limit.to_string())),
);
args.insert(
Self::key("offset"),
Message::String(Self::key(&offset.to_string())),
);
let reply = self
.transport
.send_with_reply(Self::request("GetConversations", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected conversations response");
};
let Some(Message::Array(items)) = map.get(&Self::key("conversations")) else {
anyhow::bail!("Missing conversations in response");
};
let mut conversations = Vec::new();
for item in items {
let Message::Dictionary(conv) = item else {
continue;
};
let id = Self::get_string(conv, "guid").unwrap_or_default();
let display_name = Self::get_string(conv, "display_name").unwrap_or_default();
let preview = Self::get_string(conv, "last_message_preview").unwrap_or_default();
let unread_count = Self::get_i64_from_str(conv, "unread_count") as u32;
let date_unix = Self::get_i64_from_str(conv, "date");
let participants = match conv.get(&Self::key("participants")) {
Some(Message::Array(arr)) => arr
.iter()
.filter_map(|m| match m {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
.collect::<Vec<_>>(),
_ => Vec::new(),
};
let title = if !display_name.trim().is_empty() {
display_name
} else if participants.is_empty() {
"<unknown>".to_string()
} else {
participants.join(", ")
};
conversations.push(ConversationSummary {
id,
title,
preview: preview.replace('\n', " "),
unread_count,
date_unix,
});
}
conversations.sort_by_key(|c| std::cmp::Reverse(c.date_unix));
Ok(conversations)
}
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
if let Some(last) = last_message_id {
args.insert(
Self::key("last_message_id"),
Message::String(Self::key(&last)),
);
}
let reply = self
.transport
.send_with_reply(Self::request("GetMessages", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected messages response");
};
let Some(Message::Array(items)) = map.get(&Self::key("messages")) else {
anyhow::bail!("Missing messages in response");
};
let mut messages = Vec::new();
for item in items {
let Message::Dictionary(msg) = item else {
continue;
};
messages.push(ChatMessage {
sender: Self::get_string(msg, "sender").unwrap_or_default(),
text: Self::get_string(msg, "text").unwrap_or_default(),
date_unix: Self::get_i64_from_str(msg, "date"),
});
}
Ok(messages)
}
fn reply(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
args.insert(Self::key("text"), Message::String(Self::key(&text)));
let reply = self
.transport
.send_with_reply(Self::request("Reply", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected send response");
};
Ok(Self::get_string(&map, "uuid"))
}
fn new_conversation(
&mut self,
handle_ids: Vec<String>,
text: String,
) -> Result<Option<String>> {
let mut args = HashMap::new();
args.insert(
Self::key("handle_ids"),
Message::Array(
handle_ids
.into_iter()
.map(|handle_id| Message::String(Self::key(&handle_id)))
.collect(),
),
);
args.insert(Self::key("text"), Message::String(Self::key(&text)));
let reply = self
.transport
.send_with_reply(Self::request("NewConversation", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected send response");
};
Ok(Self::get_string(&map, "uuid"))
}
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
let _ = self
.transport
.send_with_reply(Self::request("MarkConversationAsRead", Some(args)));
Ok(())
}
fn sync_conversation(&mut self, conversation_id: String) -> Result<()> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
Message::String(Self::key(&conversation_id)),
);
let _ = self
.transport
.send_with_reply(Self::request("SyncConversation", Some(args)));
Ok(())
}
}

View File

@@ -1,23 +0,0 @@
use crate::worker::DaemonClient;
use anyhow::Result;
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "macos")]
mod macos;
pub(crate) fn new_daemon_client() -> Result<Box<dyn DaemonClient>> {
#[cfg(target_os = "linux")]
{
Ok(Box::new(linux::DBusClient::new()?))
}
#[cfg(target_os = "macos")]
{
Ok(Box::new(macos::XpcClient::new()?))
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
anyhow::bail!("Unsupported platform")
}
}

View File

@@ -1,159 +0,0 @@
use crate::platform;
use anyhow::Result;
use std::sync::mpsc;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct ConversationSummary {
pub id: String,
pub title: String,
pub preview: String,
pub unread_count: u32,
pub date_unix: i64,
}
#[derive(Clone, Debug)]
pub struct ChatMessage {
pub sender: String,
pub text: String,
pub date_unix: i64,
}
pub enum Request {
RefreshConversations,
RefreshMessages {
conversation_id: String,
},
Reply {
conversation_id: String,
text: String,
},
NewConversation {
handle_ids: Vec<String>,
text: String,
},
MarkRead {
conversation_id: String,
},
SyncConversation {
conversation_id: String,
},
}
pub enum Event {
Conversations(Vec<ConversationSummary>),
Messages {
conversation_id: String,
messages: Vec<ChatMessage>,
},
MessageQueued {
conversation_id: Option<String>,
outgoing_id: Option<String>,
},
MarkedRead,
ConversationSyncTriggered {
conversation_id: String,
},
ConversationsUpdated,
MessagesUpdated {
conversation_id: String,
},
UpdateStreamReconnected,
Error(String),
}
pub fn spawn_worker(
request_rx: std::sync::mpsc::Receiver<Request>,
event_tx: std::sync::mpsc::Sender<Event>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
let mut client = match platform::new_daemon_client() {
Ok(c) => c,
Err(e) => {
let _ = event_tx.send(Event::Error(format!("Failed to connect to daemon: {e}")));
return;
}
};
if let Err(e) = client.install_signal_handlers(event_tx.clone()) {
let _ = event_tx.send(Event::Error(format!(
"Failed to install daemon signals: {e}"
)));
}
loop {
match request_rx.recv_timeout(Duration::from_millis(100)) {
Ok(req) => {
let res = match req {
Request::RefreshConversations => {
client.get_conversations(200, 0).map(Event::Conversations)
}
Request::RefreshMessages { conversation_id } => client
.get_messages(conversation_id.clone(), None)
.map(|messages| Event::Messages {
conversation_id,
messages,
}),
Request::Reply {
conversation_id,
text,
} => client
.reply(conversation_id.clone(), text)
.map(|outgoing_id| Event::MessageQueued {
conversation_id: Some(conversation_id),
outgoing_id,
}),
Request::NewConversation { handle_ids, text } => client
.new_conversation(handle_ids, text)
.map(|outgoing_id| Event::MessageQueued {
conversation_id: None,
outgoing_id,
}),
Request::MarkRead { conversation_id } => client
.mark_conversation_as_read(conversation_id.clone())
.map(|_| Event::MarkedRead),
Request::SyncConversation { conversation_id } => client
.sync_conversation(conversation_id.clone())
.map(|_| Event::ConversationSyncTriggered { conversation_id }),
};
match res {
Ok(evt) => {
let _ = event_tx.send(evt);
}
Err(e) => {
let _ = event_tx.send(Event::Error(format!("{e}")));
}
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
if let Err(e) = client.poll(Duration::from_millis(0)) {
let _ = event_tx.send(Event::Error(format!("Daemon polling error: {e}")));
}
}
})
}
pub(crate) trait DaemonClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>>;
fn get_messages(
&mut self,
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>>;
fn reply(&mut self, conversation_id: String, text: String) -> Result<Option<String>>;
fn new_conversation(&mut self, handle_ids: Vec<String>, text: String)
-> Result<Option<String>>;
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>;
fn sync_conversation(&mut self, conversation_id: String) -> Result<()>;
fn install_signal_handlers(&mut self, _event_tx: mpsc::Sender<Event>) -> Result<()> {
Ok(())
}
fn poll(&mut self, timeout: Duration) -> Result<()> {
std::thread::sleep(timeout);
Ok(())
}
}

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "kordophoned" name = "kordophoned"
version = "1.3.0" version = "1.0.1"
edition = "2021" edition = "2021"
license = "GPL-3.0" license = "GPL-3.0"
description = "Client daemon for the Kordophone chat protocol" description = "Client daemon for the Kordophone chat protocol"
@@ -23,6 +23,7 @@ tokio-condvar = "0.3.0"
uuid = "1.16.0" uuid = "1.16.0"
once_cell = "1.19.0" once_cell = "1.19.0"
mime_guess = "2.0" mime_guess = "2.0"
notify = { package = "notify-rust", version = "4.10.0" }
# D-Bus dependencies only on Linux # D-Bus dependencies only on Linux
[target.'cfg(target_os = "linux")'.dependencies] [target.'cfg(target_os = "linux")'.dependencies]
@@ -33,7 +34,8 @@ dbus-tree = "0.9.2"
# D-Bus codegen only on Linux # D-Bus codegen only on Linux
[target.'cfg(target_os = "linux")'.build-dependencies] [target.'cfg(target_os = "linux")'.build-dependencies]
dbus-codegen = { version = "0.10.0", default-features = false } dbus-codegen = "0.10.0"
dbus-crossroads = "0.5.1"
# XPC (libxpc) interface for macOS IPC # XPC (libxpc) interface for macOS IPC
[target.'cfg(target_os = "macos")'.dependencies] [target.'cfg(target_os = "macos")'.dependencies]
@@ -47,18 +49,5 @@ serde = { version = "1.0", features = ["derive"] }
assets = [ assets = [
{ source = "../target/release/kordophoned", dest = "/usr/libexec/kordophoned", mode = "755" }, { source = "../target/release/kordophoned", dest = "/usr/libexec/kordophoned", mode = "755" },
{ source = "../target/release/kpcli", dest = "/usr/bin/kpcli", mode = "755" }, { source = "../target/release/kpcli", dest = "/usr/bin/kpcli", mode = "755" },
{ source = "../target/release/kptui", dest = "/usr/bin/kptui", mode = "755" },
{ source = "include/net.buzzert.kordophonecd.service", dest = "/usr/share/dbus-1/services/net.buzzert.kordophonecd.service", mode = "644" }, { source = "include/net.buzzert.kordophonecd.service", dest = "/usr/share/dbus-1/services/net.buzzert.kordophonecd.service", mode = "644" },
] ]
[package.metadata.deb]
maintainer = "James Magahern <james@magahern.com>"
copyright = "2026, James Magahern <james@magahern.com>"
section = "net"
priority = "optional"
assets = [
["../target/release/kordophoned", "usr/libexec/kordophoned", "755"],
["../target/release/kpcli", "usr/bin/kpcli", "755"],
["../target/release/kptui", "usr/bin/kptui", "755"],
["include/net.buzzert.kordophonecd.service", "usr/share/dbus-1/services/net.buzzert.kordophonecd.service", "644"],
]

View File

@@ -14,17 +14,6 @@ strip -s target/release/kordophoned
cargo generate-rpm cargo generate-rpm
``` ```
# Building DEB
Make sure cargo-deb is installed, `cargo install cargo-deb`.
Then:
```bash
cd core
cargo deb -p kordophoned
```
## Running on macOS ## Running on macOS
Before any client can talk to the kordophone daemon on macOS, the XPC service needs to be manually registered with launchd. Before any client can talk to the kordophone daemon on macOS, the XPC service needs to be manually registered with launchd.
@@ -45,3 +34,4 @@ and the following in Info.plist:
<key>MachServices</key><dict><key>net.buzzert.kordophonecd</key><true/></dict> <key>MachServices</key><dict><key>net.buzzert.kordophonecd</key><true/></dict>
<key>KeepAlive</key><true/> <key>KeepAlive</key><true/>
``` ```

View File

@@ -73,17 +73,18 @@
'sender' (string): Sender display name 'sender' (string): Sender display name
'attachments' (array of dictionaries): List of attachments 'attachments' (array of dictionaries): List of attachments
'guid' (string): Attachment GUID 'guid' (string): Attachment GUID
'path' (string): Attachment path
'preview_path' (string): Preview attachment path
'downloaded' (boolean): Whether the attachment is downloaded 'downloaded' (boolean): Whether the attachment is downloaded
'preview_downloaded' (boolean): Whether the preview is downloaded 'preview_downloaded' (boolean): Whether the preview is downloaded
'metadata' (dictionary, optional): Attachment metadata 'metadata' (dictionary, optional): Attachment metadata
'attribution_info' (dictionary, optional): Attribution info 'attribution_info' (dictionary, optional): Attribution info
'width' (int32, optional): Width 'width' (int32, optional): Width
'height' (int32, optional): Height 'height' (int32, optional): Height"/>
Use GetAttachmentInfo for full/preview paths."/>
</arg> </arg>
</method> </method>
<method name="Reply"> <method name="SendMessage">
<arg type="s" name="conversation_id" direction="in"/> <arg type="s" name="conversation_id" direction="in"/>
<arg type="s" name="text" direction="in"/> <arg type="s" name="text" direction="in"/>
<arg type="as" name="attachment_guids" direction="in"/> <arg type="as" name="attachment_guids" direction="in"/>
@@ -91,9 +92,9 @@
<arg type="s" name="outgoing_message_id" direction="out"/> <arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString" <annotation name="org.freedesktop.DBus.DocString"
value="Replies to an existing conversation. Returns the outgoing message ID. value="Sends a message to the server. Returns the outgoing message ID.
Arguments: Arguments:
- conversation_id: The ID of the conversation to reply to. - conversation_id: The ID of the conversation to send the message to.
- text: The text of the message to send. - text: The text of the message to send.
- attachment_guids: The GUIDs of the attachments to send. - attachment_guids: The GUIDs of the attachments to send.
@@ -102,23 +103,11 @@
"/> "/>
</method> </method>
<method name="NewConversation"> <method name="TestNotification">
<arg type="as" name="handle_ids" direction="in"/> <arg type="s" name="summary" direction="in"/>
<arg type="s" name="text" direction="in"/> <arg type="s" name="body" direction="in"/>
<arg type="as" name="attachment_guids" direction="in"/>
<arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString" <annotation name="org.freedesktop.DBus.DocString"
value="Sends a message to a new conversation identified by resolved handles. value="Displays a test desktop notification with the provided summary and body."/>
Arguments:
- handle_ids: The resolved handles for the new conversation.
- text: The text of the message to send.
- attachment_guids: The GUIDs of the attachments to send.
Returns:
- outgoing_message_id: The ID of the outgoing message.
"/>
</method> </method>
<signal name="MessagesUpdated"> <signal name="MessagesUpdated">
@@ -147,20 +136,6 @@
"/> "/>
</method> </method>
<method name="OpenAttachmentFd">
<arg type="s" name="attachment_id" direction="in"/>
<arg type="b" name="preview" direction="in"/>
<arg type="h" name="fd" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Opens a read-only file descriptor for an attachment path.
Arguments:
attachment_id: the attachment GUID
preview: whether to open the preview path (true) or full path (false)
Returns:
fd: a Unix file descriptor to read attachment bytes"/>
</method>
<method name="DownloadAttachment"> <method name="DownloadAttachment">
<arg type="s" name="attachment_id" direction="in"/> <arg type="s" name="attachment_id" direction="in"/>
<arg type="b" name="preview" direction="in"/> <arg type="b" name="preview" direction="in"/>

View File

@@ -115,57 +115,39 @@ impl AttachmentStore {
base_path: base_path, base_path: base_path,
metadata: None, metadata: None,
mime_type: None, mime_type: None,
cached_full_path: None,
cached_preview_path: None,
}; };
// Best-effort: if files already exist, cache their exact paths and infer MIME type. // Best-effort: if a file already exists, try to infer MIME type from extension
let kind = "full";
let stem = attachment let stem = attachment
.base_path .base_path
.file_name() .file_name()
.map(|s| s.to_string_lossy().to_string()) .map(|s| s.to_string_lossy().to_string())
.unwrap_or_default(); .unwrap_or_default();
let legacy = attachment.base_path.with_extension(kind);
let legacy_full = attachment.base_path.with_extension("full"); let existing_path = if legacy.exists() {
if legacy_full.exists() { Some(legacy)
attachment.cached_full_path = Some(legacy_full); } else {
} let prefix = format!("{}.{}.", stem, kind);
let legacy_preview = attachment.base_path.with_extension("preview");
if legacy_preview.exists() {
attachment.cached_preview_path = Some(legacy_preview);
}
if attachment.cached_full_path.is_none() || attachment.cached_preview_path.is_none() {
let full_prefix = format!("{}.full.", stem);
let preview_prefix = format!("{}.preview.", stem);
let parent = attachment let parent = attachment
.base_path .base_path
.parent() .parent()
.unwrap_or_else(|| std::path::Path::new(".")); .unwrap_or_else(|| std::path::Path::new("."));
let mut found: Option<PathBuf> = None;
if let Ok(entries) = std::fs::read_dir(parent) { if let Ok(entries) = std::fs::read_dir(parent) {
for entry in entries.flatten() { for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string(); let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with(&prefix) && !name.ends_with(".download") {
if !name.ends_with(".download") { found = Some(entry.path());
if attachment.cached_full_path.is_none() && name.starts_with(&full_prefix) { break;
attachment.cached_full_path = Some(entry.path());
continue;
}
if attachment.cached_preview_path.is_none()
&& name.starts_with(&preview_prefix)
{
attachment.cached_preview_path = Some(entry.path());
}
} }
} }
} }
} found
};
if let Some(existing_full) = &attachment.cached_full_path { if let Some(existing) = existing_path {
if let Some(m) = mime_guess::from_path(existing_full).first_raw() { if let Some(m) = mime_guess::from_path(&existing).first_raw() {
attachment.mime_type = Some(m.to_string()); attachment.mime_type = Some(m.to_string());
} }
} }
@@ -360,9 +342,6 @@ impl AttachmentStore {
match kind { match kind {
AttachmentStoreError::AttachmentAlreadyDownloaded => { AttachmentStoreError::AttachmentAlreadyDownloaded => {
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", &_guid); log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", &_guid);
let _ = daemon_event_sink
.send(DaemonEvent::AttachmentDownloaded(_guid.clone()))
.await;
} }
AttachmentStoreError::DownloadAlreadyInProgress => { AttachmentStoreError::DownloadAlreadyInProgress => {
// Already logged a warning where detected // Already logged a warning where detected
@@ -381,10 +360,6 @@ impl AttachmentStore {
log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid); log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid);
} else { } else {
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid); log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid);
let _ = self
.daemon_event_sink
.send(DaemonEvent::AttachmentDownloaded(guid))
.await;
} }
} }

View File

@@ -6,8 +6,6 @@ use std::collections::HashMap;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
const LOOKUP_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Clone)] #[derive(Clone)]
pub struct EDSContactResolverBackend; pub struct EDSContactResolverBackend;
@@ -191,10 +189,11 @@ impl ContactResolverBackend for EDSContactResolverBackend {
None => return None, None => return None,
}; };
let address_book_proxy = let address_book_proxy = handle.connection.with_proxy(
handle &handle.bus_name,
.connection &handle.object_path,
.with_proxy(&handle.bus_name, &handle.object_path, LOOKUP_TIMEOUT); Duration::from_secs(60),
);
ensure_address_book_open(&address_book_proxy); ensure_address_book_open(&address_book_proxy);
@@ -256,10 +255,11 @@ impl ContactResolverBackend for EDSContactResolverBackend {
None => return None, None => return None,
}; };
let address_book_proxy = let address_book_proxy = handle.connection.with_proxy(
handle &handle.bus_name,
.connection &handle.object_path,
.with_proxy(&handle.bus_name, &handle.object_path, LOOKUP_TIMEOUT); Duration::from_secs(60),
);
ensure_address_book_open(&address_book_proxy); ensure_address_book_open(&address_book_proxy);

View File

@@ -47,8 +47,8 @@ pub type AnyContactID = String;
#[derive(Clone)] #[derive(Clone)]
pub struct ContactResolver<T: ContactResolverBackend> { pub struct ContactResolver<T: ContactResolverBackend> {
backend: T, backend: T,
display_name_cache: HashMap<AnyContactID, Option<String>>, display_name_cache: HashMap<AnyContactID, String>,
contact_id_cache: HashMap<String, Option<AnyContactID>>, contact_id_cache: HashMap<String, AnyContactID>,
} }
impl<T: ContactResolverBackend> ContactResolver<T> impl<T: ContactResolverBackend> ContactResolver<T>
@@ -67,25 +67,29 @@ where
pub fn resolve_contact_id(&mut self, address: &str) -> Option<AnyContactID> { pub fn resolve_contact_id(&mut self, address: &str) -> Option<AnyContactID> {
if let Some(id) = self.contact_id_cache.get(address) { if let Some(id) = self.contact_id_cache.get(address) {
return id.clone(); return Some(id.clone());
} }
let id = self.backend.resolve_contact_id(address).map(|id| id.into()); let id = self.backend.resolve_contact_id(address).map(|id| id.into());
self.contact_id_cache if let Some(ref id) = id {
.insert(address.to_string(), id.clone()); self.contact_id_cache
.insert(address.to_string(), id.clone());
}
id id
} }
pub fn get_contact_display_name(&mut self, contact_id: &AnyContactID) -> Option<String> { pub fn get_contact_display_name(&mut self, contact_id: &AnyContactID) -> Option<String> {
if let Some(display_name) = self.display_name_cache.get(contact_id) { if let Some(display_name) = self.display_name_cache.get(contact_id) {
return display_name.clone(); return Some(display_name.clone());
} }
let backend_contact_id: T::ContactID = T::ContactID::from((*contact_id).clone()); let backend_contact_id: T::ContactID = T::ContactID::from((*contact_id).clone());
let display_name = self.backend.get_contact_display_name(&backend_contact_id); let display_name = self.backend.get_contact_display_name(&backend_contact_id);
self.display_name_cache if let Some(ref display_name) = display_name {
.insert(contact_id.to_string(), display_name.clone()); self.display_name_cache
.insert(contact_id.to_string(), display_name.clone());
}
display_name display_name
} }

View File

@@ -53,21 +53,16 @@ pub enum Event {
/// - last_message_id: (optional) The ID of the last message to get. If None, all messages are returned. /// - last_message_id: (optional) The ID of the last message to get. If None, all messages are returned.
GetMessages(String, Option<String>, Reply<Vec<Message>>), GetMessages(String, Option<String>, Reply<Vec<Message>>),
/// Enqueues a reply to an existing conversation. /// Enqueues a message to be sent to the server.
/// Parameters: /// Parameters:
/// - conversation_id: The ID of the conversation to send the message to. /// - conversation_id: The ID of the conversation to send the message to.
/// - text: The text of the message to send. /// - text: The text of the message to send.
/// - attachment_guids: The GUIDs of the attachments to send. /// - attachment_guids: The GUIDs of the attachments to send.
/// - reply: The outgoing message ID (not the server-assigned message ID). /// - reply: The outgoing message ID (not the server-assigned message ID).
Reply(String, String, Vec<String>, Reply<Uuid>), SendMessage(String, String, Vec<String>, Reply<Uuid>),
/// Enqueues a message to one or more resolved handles. /// Triggers a manual test notification.
/// Parameters: TestNotification(String, String, Reply<Result<(), String>>),
/// - handle_ids: The resolved handle IDs for the new conversation.
/// - text: The text of the message to send.
/// - attachment_guids: The GUIDs of the attachments to send.
/// - reply: The outgoing message ID (not the server-assigned message ID).
NewConversation(Vec<String>, String, Vec<String>, Reply<Uuid>),
/// Notifies the daemon that a message has been sent. /// Notifies the daemon that a message has been sent.
/// Parameters: /// Parameters:

View File

@@ -15,11 +15,13 @@ use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{
use tokio::sync::Mutex; broadcast,
mpsc::{Receiver, Sender},
Mutex,
};
use uuid::Uuid; use uuid::Uuid;
use kordophone_db::{ use kordophone_db::{
@@ -29,7 +31,7 @@ use kordophone_db::{
use kordophone::api::http_client::HTTPAPIClient; use kordophone::api::http_client::HTTPAPIClient;
use kordophone::api::APIInterface; use kordophone::api::APIInterface;
use kordophone::model::outgoing_message::{OutgoingMessage, OutgoingMessageTarget}; use kordophone::model::outgoing_message::OutgoingMessage;
use kordophone::model::{ConversationID, MessageID}; use kordophone::model::{ConversationID, MessageID};
mod update_monitor; mod update_monitor;
@@ -42,6 +44,9 @@ mod post_office;
use post_office::Event as PostOfficeEvent; use post_office::Event as PostOfficeEvent;
use post_office::PostOffice; use post_office::PostOffice;
mod notifier;
use notifier::NotificationService;
mod models; mod models;
pub use models::Attachment; pub use models::Attachment;
pub use models::Message; pub use models::Message;
@@ -73,14 +78,11 @@ pub mod target {
pub static DAEMON: &str = "daemon"; pub static DAEMON: &str = "daemon";
} }
const GET_MESSAGES_INITIAL_WINDOW: usize = 300;
pub struct Daemon { pub struct Daemon {
pub event_sender: Sender<Event>, pub event_sender: Sender<Event>,
event_receiver: Receiver<Event>, event_receiver: Receiver<Event>,
signal_receiver: Option<Receiver<Signal>>, signal_sender: broadcast::Sender<Signal>,
signal_sender: Sender<Signal>,
post_office_sink: Sender<PostOfficeEvent>, post_office_sink: Sender<PostOfficeEvent>,
post_office_source: Option<Receiver<PostOfficeEvent>>, post_office_source: Option<Receiver<PostOfficeEvent>>,
@@ -90,6 +92,7 @@ pub struct Daemon {
attachment_store_sink: Option<Sender<AttachmentStoreEvent>>, attachment_store_sink: Option<Sender<AttachmentStoreEvent>>,
update_monitor_command_tx: Option<Sender<UpdateMonitorCommand>>, update_monitor_command_tx: Option<Sender<UpdateMonitorCommand>>,
notifier: Arc<NotificationService>,
version: String, version: String,
database: Arc<Mutex<Database>>, database: Arc<Mutex<Database>>,
runtime: tokio::runtime::Runtime, runtime: tokio::runtime::Runtime,
@@ -106,7 +109,7 @@ impl Daemon {
// Create event channels // Create event channels
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100); let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100); let (signal_sender, _) = tokio::sync::broadcast::channel(100);
let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100); let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100);
// Create background task runtime // Create background task runtime
@@ -117,13 +120,14 @@ impl Daemon {
let database_impl = Database::new(&database_path.to_string_lossy())?; let database_impl = Database::new(&database_path.to_string_lossy())?;
let database = Arc::new(Mutex::new(database_impl)); let database = Arc::new(Mutex::new(database_impl));
let notifier = Arc::new(NotificationService::new());
Ok(Self { Ok(Self {
version: env!("CARGO_PKG_VERSION").to_string(), version: env!("CARGO_PKG_VERSION").to_string(),
notifier,
database, database,
event_receiver, event_receiver,
event_sender, event_sender,
signal_receiver: Some(signal_receiver),
signal_sender, signal_sender,
post_office_sink, post_office_sink,
post_office_source: Some(post_office_source), post_office_source: Some(post_office_source),
@@ -168,6 +172,16 @@ impl Daemon {
attachment_store.run().await; attachment_store.run().await;
}); });
// Notification listener
{
let notifier = self.notifier.clone();
let mut signal_rx = self.signal_sender.subscribe();
let database = self.database.clone();
tokio::spawn(async move {
notifier.listen(signal_rx, database).await;
});
}
while let Some(event) = self.event_receiver.recv().await { while let Some(event) = self.event_receiver.recv().await {
log::debug!(target: target::EVENT, "Received event: {:?}", event); log::debug!(target: target::EVENT, "Received event: {:?}", event);
self.handle_event(event).await; self.handle_event(event).await;
@@ -188,14 +202,14 @@ impl Daemon {
async fn handle_event(&mut self, event: Event) { async fn handle_event(&mut self, event: Event) {
match event { match event {
Event::GetVersion(reply) => { Event::GetVersion(reply) => {
let _ = reply.send(self.version.clone()); reply.send(self.version.clone()).unwrap();
} }
Event::SyncConversationList(reply) => { Event::SyncConversationList(reply) => {
self.spawn_conversation_list_sync(); self.spawn_conversation_list_sync();
// This is a background operation, so return right away. // This is a background operation, so return right away.
let _ = reply.send(()); reply.send(()).unwrap();
} }
Event::SyncAllConversations(reply) => { Event::SyncAllConversations(reply) => {
@@ -210,7 +224,7 @@ impl Daemon {
}); });
// This is a background operation, so return right away. // This is a background operation, so return right away.
let _ = reply.send(()); reply.send(()).unwrap();
} }
Event::SyncConversation(conversation_id, reply) => { Event::SyncConversation(conversation_id, reply) => {
@@ -228,7 +242,7 @@ impl Daemon {
} }
}); });
let _ = reply.send(()); reply.send(()).unwrap();
} }
Event::MarkConversationAsRead(conversation_id, reply) => { Event::MarkConversationAsRead(conversation_id, reply) => {
@@ -240,7 +254,7 @@ impl Daemon {
} }
}); });
let _ = reply.send(()); reply.send(()).unwrap();
} }
Event::UpdateConversationMetadata(conversation, reply) => { Event::UpdateConversationMetadata(conversation, reply) => {
@@ -253,7 +267,7 @@ impl Daemon {
} }
}); });
let _ = reply.send(()); reply.send(()).unwrap();
} }
Event::UpdateStreamReconnected => { Event::UpdateStreamReconnected => {
@@ -263,15 +277,16 @@ impl Daemon {
self.spawn_conversation_list_sync(); self.spawn_conversation_list_sync();
// Send signal to the client that the update stream has been reconnected. // Send signal to the client that the update stream has been reconnected.
self.signal_sender Self::send_signal(
.send(Signal::UpdateStreamReconnected) &self.signal_sender,
.await Signal::UpdateStreamReconnected,
.unwrap(); target::UPDATES,
);
} }
Event::GetAllConversations(limit, offset, reply) => { Event::GetAllConversations(limit, offset, reply) => {
let conversations = self.get_conversations_limit_offset(limit, offset).await; let conversations = self.get_conversations_limit_offset(limit, offset).await;
let _ = reply.send(conversations); reply.send(conversations).unwrap();
} }
Event::GetAllSettings(reply) => { Event::GetAllSettings(reply) => {
@@ -280,7 +295,7 @@ impl Daemon {
Settings::default() Settings::default()
}); });
let _ = reply.send(settings); reply.send(settings).unwrap();
} }
Event::UpdateSettings(settings, reply) => { Event::UpdateSettings(settings, reply) => {
@@ -312,14 +327,12 @@ impl Daemon {
} }
} }
let _ = reply.send(()); reply.send(()).unwrap();
} }
Event::GetMessages(conversation_id, last_message_id, reply) => { Event::GetMessages(conversation_id, last_message_id, reply) => {
let messages = self.get_messages(conversation_id, last_message_id).await; let messages = self.get_messages(conversation_id, last_message_id).await;
if reply.send(messages).is_err() { reply.send(messages).unwrap();
log::warn!(target: target::EVENT, "GetMessages reply receiver dropped before send");
}
} }
Event::DeleteAllConversations(reply) => { Event::DeleteAllConversations(reply) => {
@@ -327,73 +340,25 @@ impl Daemon {
log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e); log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e);
}); });
let _ = reply.send(()); reply.send(()).unwrap();
} }
Event::Reply(conversation_id, text, attachment_guids, reply) => { Event::SendMessage(conversation_id, text, attachment_guids, reply) => {
let conversation_id = conversation_id.clone();
let uuid = self let uuid = self
.enqueue_outgoing_message( .enqueue_outgoing_message(text, conversation_id.clone(), attachment_guids)
text,
OutgoingMessageTarget::Conversation(conversation_id.clone()),
attachment_guids,
)
.await; .await;
let _ = reply.send(uuid); reply.send(uuid).unwrap();
// Send message updated signal, we have a placeholder message we will return. // Notify clients that messages have changed (e.g., to refresh placeholders).
self.signal_sender self.emit_messages_updated(conversation_id);
.send(Signal::MessagesUpdated(conversation_id.clone()))
.await
.unwrap();
}
Event::NewConversation(handle_ids, text, attachment_guids, reply) => {
let uuid = self
.enqueue_outgoing_message(
text,
OutgoingMessageTarget::Handles(handle_ids),
attachment_guids,
)
.await;
let _ = reply.send(uuid);
} }
Event::MessageSent(message, outgoing_message, conversation_id) => { Event::MessageSent(message, outgoing_message, conversation_id) => {
log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id); log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id);
let conversation_created = match self
.ensure_conversation_exists_for_sent_message(
&conversation_id,
&outgoing_message,
&message,
)
.await
{
Ok(created) => created,
Err(e) => {
log::error!(
target: target::EVENT,
"Failed to ensure conversation {} exists for sent message {}: {}",
conversation_id,
message.id,
e
);
return;
}
};
if conversation_created {
self.signal_sender
.send(Signal::ConversationsUpdated)
.await
.unwrap();
}
// Insert the message into the database. // Insert the message into the database.
log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id); log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id);
if let Err(e) = self self.database
.database
.lock() .lock()
.await .await
.with_repository(|r| { .with_repository(|r| {
@@ -407,30 +372,28 @@ impl Daemon {
) )
}) })
.await .await
{ .unwrap();
log::error!(
target: target::EVENT,
"Failed to persist sent message {} for conversation {}: {}",
message.id,
conversation_id,
e
);
return;
}
// Remove from outgoing messages. // Remove from outgoing messages.
log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid); log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid);
for messages in self.outgoing_messages.values_mut() {
messages.retain(|m| m.guid != outgoing_message.guid);
}
self.outgoing_messages self.outgoing_messages
.retain(|_, messages| !messages.is_empty()); .get_mut(&conversation_id)
.map(|messages| messages.retain(|m| m.guid != outgoing_message.guid));
// Send message updated signal. // Notify clients to refresh the conversation after the final message arrives.
self.signal_sender self.emit_messages_updated(conversation_id);
.send(Signal::MessagesUpdated(conversation_id)) }
.await
.unwrap(); Event::TestNotification(summary, body, reply) => {
let result = self
.signal_sender
.send(Signal::Internal(InternalSignal::TestNotification {
summary,
body,
}))
.map(|_| ())
.map_err(|e| e.to_string());
reply.send(result).unwrap();
} }
Event::GetAttachment(guid, reply) => { Event::GetAttachment(guid, reply) => {
@@ -455,17 +418,18 @@ impl Daemon {
.await .await
.unwrap(); .unwrap();
let _ = reply.send(()); reply.send(()).unwrap();
} }
Event::AttachmentDownloaded(attachment_id) => { Event::AttachmentDownloaded(attachment_id) => {
log::debug!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id); log::debug!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id);
// Send signal to the client that the attachment has been downloaded. // Send signal to the client that the attachment has been downloaded.
self.signal_sender Self::send_signal(
.send(Signal::AttachmentDownloaded(attachment_id)) &self.signal_sender,
.await Signal::AttachmentDownloaded(attachment_id),
.unwrap(); target::ATTACHMENTS,
);
} }
Event::UploadAttachment(path, reply) => { Event::UploadAttachment(path, reply) => {
@@ -480,17 +444,17 @@ impl Daemon {
Event::AttachmentUploaded(upload_guid, attachment_guid) => { Event::AttachmentUploaded(upload_guid, attachment_guid) => {
log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid); log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid);
self.signal_sender Self::send_signal(
.send(Signal::AttachmentUploaded(upload_guid, attachment_guid)) &self.signal_sender,
.await Signal::AttachmentUploaded(upload_guid, attachment_guid),
.unwrap(); target::ATTACHMENTS,
);
} }
} }
} }
/// Panics if the signal receiver has already been taken. pub fn subscribe_signals(&self) -> broadcast::Receiver<Signal> {
pub fn obtain_signal_receiver(&mut self) -> Receiver<Signal> { self.signal_sender.subscribe()
self.signal_receiver.take().unwrap()
} }
async fn get_conversations_limit_offset( async fn get_conversations_limit_offset(
@@ -505,12 +469,15 @@ impl Daemon {
.await .await
} }
fn emit_messages_updated(&self, conversation_id: String) {
Self::send_messages_updated(&self.signal_sender, conversation_id);
}
async fn get_messages( async fn get_messages(
&mut self, &mut self,
conversation_id: String, conversation_id: String,
last_message_id: Option<MessageID>, _last_message_id: Option<MessageID>,
) -> Vec<Message> { ) -> Vec<Message> {
let started = Instant::now();
// Get outgoing messages for this conversation. // Get outgoing messages for this conversation.
let empty_vec: Vec<OutgoingMessage> = vec![]; let empty_vec: Vec<OutgoingMessage> = vec![];
let outgoing_messages: &Vec<OutgoingMessage> = self let outgoing_messages: &Vec<OutgoingMessage> = self
@@ -548,111 +515,27 @@ impl Daemon {
result.push(om.into()); result.push(om.into());
} }
if let Some(last_id) = last_message_id {
if let Some(last_index) = result.iter().position(|message| message.id == last_id) {
result = result.split_off(last_index + 1);
}
} else if result.len() > GET_MESSAGES_INITIAL_WINDOW {
let dropped = result.len() - GET_MESSAGES_INITIAL_WINDOW;
result = result.split_off(dropped);
log::debug!(
target: target::EVENT,
"GetMessages initial window applied: dropped {} older messages",
dropped
);
}
log::debug!(
target: target::EVENT,
"GetMessages completed in {}ms: {} messages",
started.elapsed().as_millis(),
result.len()
);
result result
} }
async fn ensure_conversation_exists_for_sent_message(
&mut self,
conversation_id: &ConversationID,
outgoing_message: &OutgoingMessage,
message: &Message,
) -> Result<bool> {
let conversation_exists = self
.database
.lock()
.await
.with_repository(|r| r.get_conversation_by_guid(conversation_id))
.await?
.is_some();
if conversation_exists {
return Ok(false);
}
let participants = Self::participants_for_outgoing_message(outgoing_message);
let mut builder = Conversation::builder()
.guid(conversation_id)
.date(message.date)
.unread_count(0)
.participants(participants);
if !message.text.trim().is_empty() {
builder = builder.last_message_preview(&message.text);
}
let conversation = builder.build();
log::info!(
target: target::EVENT,
"Creating local conversation {} from sent message {}",
conversation_id,
message.id
);
self.database
.lock()
.await
.with_repository(|r| r.insert_conversation(conversation))
.await?;
Ok(true)
}
fn participants_for_outgoing_message(outgoing_message: &OutgoingMessage) -> Vec<DbParticipant> {
let handle_ids = match &outgoing_message.target {
OutgoingMessageTarget::Conversation(_) => return Vec::new(),
OutgoingMessageTarget::Handles(handle_ids) => handle_ids,
};
let mut contact_resolver = ContactResolver::new(DefaultContactResolverBackend::default());
handle_ids
.iter()
.map(|handle| DbParticipant::Remote {
handle: handle.clone(),
contact_id: contact_resolver.resolve_contact_id(handle),
})
.collect()
}
async fn enqueue_outgoing_message( async fn enqueue_outgoing_message(
&mut self, &mut self,
text: String, text: String,
target: OutgoingMessageTarget, conversation_id: String,
attachment_guids: Vec<String>, attachment_guids: Vec<String>,
) -> Uuid { ) -> Uuid {
let conversation_id = conversation_id.clone();
let outgoing_message = OutgoingMessage::builder() let outgoing_message = OutgoingMessage::builder()
.text(text) .text(text)
.target(target) .conversation_id(conversation_id.clone())
.file_transfer_guids(attachment_guids) .file_transfer_guids(attachment_guids)
.build(); .build();
if let Some(conversation_id) = outgoing_message.conversation_id().cloned() { // Keep a record of this so we can provide a consistent model to the client.
// Keep a record of replies so we can provide a consistent model to the client. self.outgoing_messages
self.outgoing_messages .entry(conversation_id)
.entry(conversation_id) .or_insert(vec![])
.or_insert(vec![]) .push(outgoing_message.clone());
.push(outgoing_message.clone());
}
let guid = outgoing_message.guid.clone(); let guid = outgoing_message.guid.clone();
self.post_office_sink self.post_office_sink
@@ -665,7 +548,7 @@ impl Daemon {
async fn sync_conversation_list( async fn sync_conversation_list(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>, signal_sender: &broadcast::Sender<Signal>,
) -> Result<()> { ) -> Result<()> {
log::info!(target: target::SYNC, "Starting list conversation sync"); log::info!(target: target::SYNC, "Starting list conversation sync");
@@ -717,7 +600,7 @@ impl Daemon {
} }
// Send conversations updated signal // Send conversations updated signal
signal_sender.send(Signal::ConversationsUpdated).await?; Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC);
log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations); log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations);
Ok(()) Ok(())
@@ -725,7 +608,7 @@ impl Daemon {
async fn sync_all_conversations_impl( async fn sync_all_conversations_impl(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>, signal_sender: &broadcast::Sender<Signal>,
) -> Result<()> { ) -> Result<()> {
log::info!(target: target::SYNC, "Starting full conversation sync"); log::info!(target: target::SYNC, "Starting full conversation sync");
@@ -753,7 +636,7 @@ impl Daemon {
} }
// Send conversations updated signal. // Send conversations updated signal.
signal_sender.send(Signal::ConversationsUpdated).await?; Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC);
log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations); log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations);
Ok(()) Ok(())
@@ -761,7 +644,7 @@ impl Daemon {
async fn sync_conversation_impl( async fn sync_conversation_impl(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>, signal_sender: &broadcast::Sender<Signal>,
conversation_id: String, conversation_id: String,
) -> Result<()> { ) -> Result<()> {
log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id); log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id);
@@ -819,9 +702,7 @@ impl Daemon {
// Send messages updated signal, if we actually inserted any messages. // Send messages updated signal, if we actually inserted any messages.
if num_messages > 0 { if num_messages > 0 {
signal_sender Self::send_messages_updated(signal_sender, conversation_id.clone());
.send(Signal::MessagesUpdated(conversation_id.clone()))
.await?;
} }
log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id); log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id);
@@ -842,14 +723,14 @@ impl Daemon {
async fn update_conversation_metadata_impl( async fn update_conversation_metadata_impl(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
conversation: Conversation, conversation: Conversation,
signal_sender: &Sender<Signal>, signal_sender: &broadcast::Sender<Signal>,
) -> Result<()> { ) -> Result<()> {
log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid); log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid);
let updated = database let updated = database
.with_repository(|r| r.merge_conversation_metadata(conversation)) .with_repository(|r| r.merge_conversation_metadata(conversation))
.await?; .await?;
if updated { if updated {
signal_sender.send(Signal::ConversationsUpdated).await?; Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::DAEMON);
} }
Ok(()) Ok(())
@@ -864,6 +745,40 @@ impl Daemon {
self.database.with_settings(|s| settings.save(s)).await self.database.with_settings(|s| settings.save(s)).await
} }
fn send_signal(sender: &broadcast::Sender<Signal>, signal: Signal, log_target: &str) {
if let Err(error) = sender.send(signal) {
log::trace!(
target: log_target,
"Signal delivery skipped (no listeners?): {}",
error
);
}
}
fn send_internal(sender: &broadcast::Sender<Signal>, signal: InternalSignal) {
if let Err(error) = sender.send(Signal::Internal(signal)) {
log::trace!(
target: target::DAEMON,
"Internal signal delivery skipped: {}",
error
);
}
}
fn send_messages_updated(sender: &broadcast::Sender<Signal>, conversation_id: String) {
Self::send_internal(
sender,
InternalSignal::MessagesUpdated(conversation_id.clone()),
);
if let Err(error) = sender.send(Signal::MessagesUpdated(conversation_id)) {
log::warn!(
target: target::DAEMON,
"Failed to send MessagesUpdated signal: {}",
error
);
}
}
async fn get_client_impl( async fn get_client_impl(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
) -> Result<HTTPAPIClient<DatabaseAuthenticationStore>> { ) -> Result<HTTPAPIClient<DatabaseAuthenticationStore>> {
@@ -896,9 +811,11 @@ impl Daemon {
}) })
.await?; .await?;
self.signal_sender Self::send_signal(
.send(Signal::ConversationsUpdated) &self.signal_sender,
.await?; Signal::ConversationsUpdated,
target::SYNC,
);
Ok(()) Ok(())
} }

View File

@@ -1,4 +1,4 @@
use std::path::PathBuf; use std::path::{Path, PathBuf};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AttachmentMetadata { pub struct AttachmentMetadata {
@@ -17,8 +17,6 @@ pub struct Attachment {
pub base_path: PathBuf, pub base_path: PathBuf,
pub metadata: Option<AttachmentMetadata>, pub metadata: Option<AttachmentMetadata>,
pub mime_type: Option<String>, pub mime_type: Option<String>,
pub cached_full_path: Option<PathBuf>,
pub cached_preview_path: Option<PathBuf>,
} }
impl Attachment { impl Attachment {
@@ -46,21 +44,17 @@ impl Attachment {
} }
pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf { pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf {
if !scratch {
let cached = if preview {
self.cached_preview_path.as_ref()
} else {
self.cached_full_path.as_ref()
};
if let Some(path) = cached {
return path.clone();
}
}
// Determine whether this is a preview or full attachment. // Determine whether this is a preview or full attachment.
let kind = if preview { "preview" } else { "full" }; let kind = if preview { "preview" } else { "full" };
// If not a scratch path, and a file already exists on disk with a concrete
// file extension (e.g., guid.full.jpg), return that existing path.
if !scratch {
if let Some(existing) = self.find_existing_path(preview) {
return existing;
}
}
// Fall back to constructing a path using known info. If we know the MIME type, // Fall back to constructing a path using known info. If we know the MIME type,
// prefer an extension guessed from it; otherwise keep legacy naming. // prefer an extension guessed from it; otherwise keep legacy naming.
let ext_from_mime = self let ext_from_mime = self
@@ -82,15 +76,44 @@ impl Attachment {
} }
pub fn is_downloaded(&self, preview: bool) -> bool { pub fn is_downloaded(&self, preview: bool) -> bool {
let path = self.get_path_for_preview(preview); std::fs::exists(&self.get_path_for_preview(preview)).expect(
std::fs::exists(&path).expect(
format!( format!(
"Wasn't able to check for the existence of an attachment file path at {}", "Wasn't able to check for the existence of an attachment file path at {}",
path.display() &self.get_path_for_preview(preview).display()
) )
.as_str(), .as_str(),
) )
} }
fn find_existing_path(&self, preview: bool) -> Option<PathBuf> {
let kind = if preview { "preview" } else { "full" };
// First, check legacy path without a concrete file extension.
let legacy = self.base_path.with_extension(kind);
if legacy.exists() {
return Some(legacy);
}
// Next, search for a filename like: <guid>.<kind>.<ext>
let file_stem = self
.base_path
.file_name()
.map(|s| s.to_string_lossy().to_string())?;
let prefix = format!("{}.{}.", file_stem, kind);
let parent = self.base_path.parent().unwrap_or_else(|| Path::new("."));
if let Ok(dir) = std::fs::read_dir(parent) {
for entry in dir.flatten() {
let file_name = entry.file_name();
let name = file_name.to_string_lossy();
if name.starts_with(&prefix) && !name.ends_with(".download") {
return Some(entry.path());
}
}
}
None
}
} }
impl From<kordophone::model::message::AttachmentMetadata> for AttachmentMetadata { impl From<kordophone::model::message::AttachmentMetadata> for AttachmentMetadata {

View File

@@ -0,0 +1,288 @@
use super::contact_resolver::{ContactResolver, DefaultContactResolverBackend};
use super::models::message::Participant;
use super::signals::{InternalSignal, Signal};
use super::{target, Message};
use kordophone_db::{
database::{Database, DatabaseAccess},
models::Conversation,
models::Participant as DbParticipant,
};
use notify::Notification;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};
/// Centralised notification helper used by platform transports (D-Bus, XPC, …).
pub struct NotificationService {
resolver: Mutex<ContactResolver<DefaultContactResolverBackend>>,
}
impl NotificationService {
pub fn new() -> Self {
Self {
resolver: Mutex::new(ContactResolver::new(
DefaultContactResolverBackend::default(),
)),
}
}
pub async fn listen(
self: Arc<Self>,
mut signal_rx: broadcast::Receiver<Signal>,
database: Arc<Mutex<Database>>,
) {
log::trace!(target: target::DAEMON, "NotificationService listener started");
loop {
match signal_rx.recv().await {
Ok(Signal::Internal(InternalSignal::MessagesUpdated(conversation_id))) => {
log::trace!(
target: target::DAEMON,
"NotificationService received MessagesUpdated for {}",
conversation_id
);
self.notify_new_messages(&database, &conversation_id).await;
}
Ok(Signal::Internal(InternalSignal::TestNotification { summary, body })) => {
log::trace!(
target: target::DAEMON,
"NotificationService received TestNotification"
);
if let Err(error) = self.send_manual(&summary, &body) {
log::warn!(
target: target::DAEMON,
"Failed to display test notification: {}",
error
);
}
}
Ok(other) => {
log::trace!(
target: target::DAEMON,
"NotificationService ignoring signal: {:?}",
other
);
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
target: target::DAEMON,
"NotificationService lagged; skipped {} signals",
skipped
);
}
Err(broadcast::error::RecvError::Closed) => {
log::trace!(target: target::DAEMON, "NotificationService listener exiting");
break;
}
}
}
}
/// Checks whether a new user-visible notification should be shown for the
/// given conversation and displays it if appropriate.
pub async fn notify_new_messages(
&self,
database: &Arc<Mutex<Database>>,
conversation_id: &str,
) {
log::trace!(
target: target::DAEMON,
"NotificationService preparing payload for {}",
conversation_id
);
if let Some((summary, body)) = self.prepare_payload(database, conversation_id).await {
log::trace!(
target: target::DAEMON,
"NotificationService displaying notification for {}",
conversation_id
);
if let Err(error) = self.show_notification(&summary, &body) {
log::warn!(
target: target::DAEMON,
"Failed to display notification for conversation {}: {}",
conversation_id,
error
);
}
} else {
log::trace!(
target: target::DAEMON,
"NotificationService skipping notification for {}",
conversation_id
);
}
}
/// Displays a manual test notification.
pub fn send_manual(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> {
log::trace!(
target: target::DAEMON,
"NotificationService sending manual notification"
);
self.show_notification(summary, body)
}
async fn prepare_payload(
&self,
database: &Arc<Mutex<Database>>,
conversation_id: &str,
) -> Option<(String, String)> {
let conversation_opt = database
.lock()
.await
.with_repository(|r| r.get_conversation_by_guid(conversation_id))
.await;
let conversation = match conversation_opt {
Ok(Some(conv)) => conv,
Ok(None) => {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} not found",
conversation_id
);
return None;
}
Err(err) => {
log::warn!(
target: target::DAEMON,
"Notification lookup failed for conversation {}: {}",
conversation_id,
err
);
return None;
}
};
if conversation.unread_count == 0 {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} has no unread messages",
conversation_id
);
return None;
}
let last_message_opt = database
.lock()
.await
.with_repository(|r| r.get_last_message_for_conversation(conversation_id))
.await;
let last_message: Message = match last_message_opt {
Ok(Some(message)) => message.into(),
Ok(None) => {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} has no messages",
conversation_id
);
return None;
}
Err(err) => {
log::warn!(
target: target::DAEMON,
"Notification lookup failed for conversation {}: {}",
conversation_id,
err
);
return None;
}
};
if matches!(last_message.sender, Participant::Me) {
log::trace!(
target: target::DAEMON,
"NotificationService: last message in {} was sent by self",
conversation_id
);
return None;
}
let mut resolver = self.resolver.lock().await;
let summary = self.conversation_display_name(&conversation, &mut resolver);
let sender_display_name =
self.resolve_participant_display_name(&last_message.sender, &mut resolver);
let mut message_text = last_message.text.replace('\u{FFFC}', "");
if message_text.trim().is_empty() {
if !last_message.attachments.is_empty() {
message_text = "Sent an attachment".to_string();
} else {
message_text = "Sent a message".to_string();
}
}
let body = if sender_display_name.is_empty() {
message_text
} else {
format!("{}: {}", sender_display_name, message_text)
};
Some((summary, body))
}
fn conversation_display_name(
&self,
conversation: &Conversation,
resolver: &mut ContactResolver<DefaultContactResolverBackend>,
) -> String {
if let Some(display_name) = &conversation.display_name {
if !display_name.trim().is_empty() {
return display_name.clone();
}
}
let names: Vec<String> = conversation
.participants
.iter()
.filter_map(|participant| match participant {
DbParticipant::Me => None,
DbParticipant::Remote { handle, contact_id } => {
if let Some(contact_id) = contact_id {
Some(
resolver
.get_contact_display_name(contact_id)
.unwrap_or_else(|| handle.clone()),
)
} else {
Some(handle.clone())
}
}
})
.collect();
if names.is_empty() {
"Kordophone".to_string()
} else {
names.join(", ")
}
}
fn resolve_participant_display_name(
&self,
participant: &Participant,
resolver: &mut ContactResolver<DefaultContactResolverBackend>,
) -> String {
match participant {
Participant::Me => "".to_string(),
Participant::Remote { handle, contact_id } => {
if let Some(contact_id) = contact_id {
resolver
.get_contact_display_name(contact_id)
.unwrap_or_else(|| handle.clone())
} else {
handle.clone()
}
}
}
}
fn show_notification(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> {
Notification::new()
.appname("Kordophone")
.summary(summary)
.body(body)
.show()
.map(|_| ())
}
}

View File

@@ -8,7 +8,6 @@ use tokio_condvar::Condvar;
use crate::daemon::events::Event as DaemonEvent; use crate::daemon::events::Event as DaemonEvent;
use kordophone::api::APIInterface; use kordophone::api::APIInterface;
use kordophone::model::outgoing_message::OutgoingMessage; use kordophone::model::outgoing_message::OutgoingMessage;
use kordophone::model::OutgoingMessageTarget;
use anyhow::Result; use anyhow::Result;
@@ -103,29 +102,10 @@ impl<C: APIInterface, F: AsyncFnMut() -> Result<C>> PostOffice<C, F> {
Ok(sent_message) => { Ok(sent_message) => {
log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid); log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid);
let conversation_id = sent_message.conversation_id.clone().or_else(|| { let conversation_id = message.conversation_id.clone();
match &message.target { let event =
OutgoingMessageTarget::Conversation(conversation_id) => { DaemonEvent::MessageSent(sent_message.into(), message, conversation_id);
Some(conversation_id.clone()) event_sink.send(event).await.unwrap();
}
OutgoingMessageTarget::Handles(_) => None,
}
});
if let Some(conversation_id) = conversation_id {
let event = DaemonEvent::MessageSent(
sent_message.message.into(),
message,
conversation_id,
);
event_sink.send(event).await.unwrap();
} else {
log::error!(
target: target::POST_OFFICE,
"Message sent but no conversation id was available for {}",
message.guid
);
}
} }
Err(e) => { Err(e) => {

View File

@@ -1,3 +1,12 @@
#[derive(Debug, Clone)]
pub enum InternalSignal {
/// Notification that new messages are available for a conversation.
MessagesUpdated(String),
/// Manual test notification request.
TestNotification { summary: String, body: String },
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Signal { pub enum Signal {
/// Emitted when the list of conversations is updated. /// Emitted when the list of conversations is updated.
@@ -21,4 +30,7 @@ pub enum Signal {
/// Emitted when the update stream is reconnected after a timeout or configuration change. /// Emitted when the update stream is reconnected after a timeout or configuration change.
UpdateStreamReconnected, UpdateStreamReconnected,
/// Internal-only signals consumed by daemon components.
Internal(InternalSignal),
} }

View File

@@ -1,11 +1,8 @@
use dbus::arg; use dbus::arg;
use dbus_tree::MethodErr; use dbus_tree::MethodErr;
use std::fs::OpenOptions;
use std::os::fd::{FromRawFd, IntoRawFd};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use std::{future::Future, thread}; use std::{future::Future, thread};
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use kordophoned::daemon::{ use kordophoned::daemon::{
contact_resolver::{ContactResolver, DefaultContactResolverBackend}, contact_resolver::{ContactResolver, DefaultContactResolverBackend},
@@ -25,12 +22,15 @@ use dbus_tokio::connection;
#[derive(Clone)] #[derive(Clone)]
pub struct DBusAgent { pub struct DBusAgent {
event_sink: mpsc::Sender<Event>, event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>, signal_receiver: Arc<Mutex<Option<broadcast::Receiver<Signal>>>>,
contact_resolver: ContactResolver<DefaultContactResolverBackend>, contact_resolver: ContactResolver<DefaultContactResolverBackend>,
} }
impl DBusAgent { impl DBusAgent {
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self { pub fn new(
event_sink: mpsc::Sender<Event>,
signal_receiver: broadcast::Receiver<Signal>,
) -> Self {
Self { Self {
event_sink, event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
@@ -78,80 +78,95 @@ impl DBusAgent {
.take() .take()
.expect("Signal receiver already taken"); .expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await { loop {
match signal { match receiver.recv().await {
Signal::ConversationsUpdated => { Ok(signal) => match signal {
log::debug!("Sending signal: ConversationsUpdated"); Signal::ConversationsUpdated => {
registry log::debug!("Sending signal: ConversationsUpdated");
.send_signal( registry
interface::OBJECT_PATH, .send_signal(
DbusSignals::ConversationsUpdated {}, interface::OBJECT_PATH,
) DbusSignals::ConversationsUpdated {},
.unwrap_or_else(|_| { )
log::error!("Failed to send signal"); .unwrap_or_else(|_| {
0 log::error!("Failed to send signal");
}); 0
} });
Signal::MessagesUpdated(conversation_id) => { }
log::debug!( Signal::MessagesUpdated(conversation_id) => {
"Sending signal: MessagesUpdated for conversation {}", log::debug!(
conversation_id "Sending signal: MessagesUpdated for conversation {}",
conversation_id
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::MessagesUpdated { conversation_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentDownloaded(attachment_id) => {
log::debug!(
"Sending signal: AttachmentDownloaded for attachment {}",
attachment_id
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentDownloadCompleted { attachment_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!(
"Sending signal: AttachmentUploaded for upload {}, attachment {}",
upload_guid,
attachment_guid
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentUploadCompleted {
upload_guid,
attachment_guid,
},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::UpdateStreamReconnected => {
log::debug!("Sending signal: UpdateStreamReconnected");
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::UpdateStreamReconnected {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::Internal(_) => {
log::trace!("Ignoring internal signal for D-Bus transport");
}
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
"Signal receiver lagged; skipped {} daemon signals",
skipped
); );
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::MessagesUpdated { conversation_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
} }
Signal::AttachmentDownloaded(attachment_id) => { Err(broadcast::error::RecvError::Closed) => {
log::debug!( log::warn!("Signal channel closed; stopping D-Bus forwarding");
"Sending signal: AttachmentDownloaded for attachment {}", break;
attachment_id
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentDownloadCompleted { attachment_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!(
"Sending signal: AttachmentUploaded for upload {}, attachment {}",
upload_guid,
attachment_guid
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentUploadCompleted {
upload_guid,
attachment_guid,
},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::UpdateStreamReconnected => {
log::debug!("Sending signal: UpdateStreamReconnected");
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::UpdateStreamReconnected {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
} }
} }
} }
@@ -179,8 +194,9 @@ impl DBusAgent {
&self, &self,
make_event: impl FnOnce(Reply<T>) -> Event + Send, make_event: impl FnOnce(Reply<T>) -> Event + Send,
) -> Result<T, MethodErr> { ) -> Result<T, MethodErr> {
let daemon_result = run_sync_future(self.send_event(make_event))?; run_sync_future(self.send_event(make_event))
daemon_result.map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e))) .unwrap()
.map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e)))
} }
fn resolve_participant_display_name(&mut self, participant: &Participant) -> String { fn resolve_participant_display_name(&mut self, participant: &Participant) -> String {
@@ -280,132 +296,131 @@ impl DbusRepository for DBusAgent {
conversation_id: String, conversation_id: String,
last_message_id: String, last_message_id: String,
) -> Result<Vec<arg::PropMap>, MethodErr> { ) -> Result<Vec<arg::PropMap>, MethodErr> {
let started = Instant::now();
let last_message_id_opt = if last_message_id.is_empty() { let last_message_id_opt = if last_message_id.is_empty() {
None None
} else { } else {
Some(last_message_id) Some(last_message_id)
}; };
let messages = self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))?; .map(|messages| {
messages
.into_iter()
.map(|msg| {
let mut map = arg::PropMap::new();
map.insert("id".into(), arg::Variant(Box::new(msg.id)));
let mut attachment_count: usize = 0; // Remove the attachment placeholder here.
let mut text_bytes: usize = 0; let text = msg.text.replace("\u{FFFC}", "");
let mapped: Vec<arg::PropMap> = messages map.insert("text".into(), arg::Variant(Box::new(text)));
.into_iter() map.insert(
.map(|msg| { "date".into(),
let mut map = arg::PropMap::new(); arg::Variant(Box::new(msg.date.and_utc().timestamp())),
map.insert("id".into(), arg::Variant(Box::new(msg.id))); );
map.insert(
"sender".into(),
arg::Variant(Box::new(
self.resolve_participant_display_name(&msg.sender.into()),
)),
);
// Remove the attachment placeholder here. // Attachments array
let text = msg.text.replace("\u{FFFC}", ""); let attachments: Vec<arg::PropMap> = msg
text_bytes += text.len(); .attachments
.into_iter()
.map(|attachment| {
let mut attachment_map = arg::PropMap::new();
attachment_map.insert(
"guid".into(),
arg::Variant(Box::new(attachment.guid.clone())),
);
map.insert("text".into(), arg::Variant(Box::new(text))); // Paths and download status
map.insert( let path = attachment.get_path_for_preview(false);
"date".into(), let preview_path = attachment.get_path_for_preview(true);
arg::Variant(Box::new(msg.date.and_utc().timestamp())), let downloaded = attachment.is_downloaded(false);
); let preview_downloaded = attachment.is_downloaded(true);
map.insert(
"sender".into(),
arg::Variant(Box::new(msg.sender.display_name())),
);
if !msg.attachments.is_empty() { attachment_map.insert(
let attachments: Vec<arg::PropMap> = msg "path".into(),
.attachments arg::Variant(Box::new(path.to_string_lossy().to_string())),
.into_iter() );
.map(|attachment| { attachment_map.insert(
attachment_count += 1; "preview_path".into(),
let mut attachment_map = arg::PropMap::new(); arg::Variant(Box::new(
attachment_map.insert( preview_path.to_string_lossy().to_string(),
"guid".into(), )),
arg::Variant(Box::new(attachment.guid.clone())), );
); attachment_map.insert(
attachment_map.insert( "downloaded".into(),
"downloaded".into(), arg::Variant(Box::new(downloaded)),
arg::Variant(Box::new(attachment.is_downloaded(false))), );
); attachment_map.insert(
attachment_map.insert( "preview_downloaded".into(),
"preview_downloaded".into(), arg::Variant(Box::new(preview_downloaded)),
arg::Variant(Box::new(attachment.is_downloaded(true))), );
);
if let Some(ref metadata) = attachment.metadata { // Metadata
let mut metadata_map = arg::PropMap::new(); if let Some(ref metadata) = attachment.metadata {
let mut metadata_map = arg::PropMap::new();
if let Some(ref attribution_info) = metadata.attribution_info { if let Some(ref attribution_info) = metadata.attribution_info {
let mut attribution_map = arg::PropMap::new(); let mut attribution_map = arg::PropMap::new();
if let Some(width) = attribution_info.width { if let Some(width) = attribution_info.width {
attribution_map.insert( attribution_map.insert(
"width".into(), "width".into(),
arg::Variant(Box::new(width as i32)), arg::Variant(Box::new(width as i32)),
);
}
if let Some(height) = attribution_info.height {
attribution_map.insert(
"height".into(),
arg::Variant(Box::new(height as i32)),
);
}
metadata_map.insert(
"attribution_info".into(),
arg::Variant(Box::new(attribution_map)),
); );
} }
if let Some(height) = attribution_info.height {
attribution_map.insert( attachment_map.insert(
"height".into(), "metadata".into(),
arg::Variant(Box::new(height as i32)), arg::Variant(Box::new(metadata_map)),
);
}
metadata_map.insert(
"attribution_info".into(),
arg::Variant(Box::new(attribution_map)),
); );
} }
attachment_map.insert( attachment_map
"metadata".into(), })
arg::Variant(Box::new(metadata_map)), .collect();
);
}
attachment_map
})
.collect();
map.insert("attachments".into(), arg::Variant(Box::new(attachments))); map.insert("attachments".into(), arg::Variant(Box::new(attachments)));
} map
})
map .collect()
}) })
.collect();
log::debug!(
target: "dbus",
"GetMessages mapped in {}ms: {} messages, {} attachments, {} text-bytes",
started.elapsed().as_millis(),
mapped.len(),
attachment_count,
text_bytes
);
Ok(mapped)
} }
fn delete_all_conversations(&mut self) -> Result<(), MethodErr> { fn delete_all_conversations(&mut self) -> Result<(), MethodErr> {
self.send_event_sync(Event::DeleteAllConversations) self.send_event_sync(Event::DeleteAllConversations)
} }
fn reply( fn send_message(
&mut self, &mut self,
conversation_id: String, conversation_id: String,
text: String, text: String,
attachment_guids: Vec<String>, attachment_guids: Vec<String>,
) -> Result<String, MethodErr> { ) -> Result<String, MethodErr> {
self.send_event_sync(|r| Event::Reply(conversation_id, text, attachment_guids, r)) self.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
.map(|uuid| uuid.to_string()) .map(|uuid| uuid.to_string())
} }
fn new_conversation( fn test_notification(&mut self, summary: String, body: String) -> Result<(), MethodErr> {
&mut self, match self.send_event_sync(|r| Event::TestNotification(summary, body, r))? {
handle_ids: Vec<String>, Ok(()) => Ok(()),
text: String, Err(message) => Err(MethodErr::failed(&message)),
attachment_guids: Vec<String>, }
) -> Result<String, MethodErr> {
self.send_event_sync(|r| Event::NewConversation(handle_ids, text, attachment_guids, r))
.map(|uuid| uuid.to_string())
} }
fn get_attachment_info( fn get_attachment_info(
@@ -435,23 +450,6 @@ impl DbusRepository for DBusAgent {
self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r)) self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r))
} }
fn open_attachment_fd(
&mut self,
attachment_id: String,
preview: bool,
) -> Result<arg::OwnedFd, MethodErr> {
let attachment = self.send_event_sync(|r| Event::GetAttachment(attachment_id, r))?;
let path = attachment.get_path_for_preview(preview);
let file = OpenOptions::new()
.read(true)
.open(&path)
.map_err(|e| MethodErr::failed(&format!("Failed to open attachment: {}", e)))?;
let fd = file.into_raw_fd();
Ok(unsafe { arg::OwnedFd::from_raw_fd(fd) })
}
fn upload_attachment(&mut self, path: String) -> Result<String, MethodErr> { fn upload_attachment(&mut self, path: String) -> Result<String, MethodErr> {
use std::path::PathBuf; use std::path::PathBuf;
let path = PathBuf::from(path); let path = PathBuf::from(path);
@@ -523,7 +521,7 @@ where
T: Send, T: Send,
F: Future<Output = T> + Send, F: Future<Output = T> + Send,
{ {
let joined = thread::scope(move |s| { thread::scope(move |s| {
s.spawn(move || { s.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
@@ -534,10 +532,6 @@ where
Ok(result) Ok(result)
}) })
.join() .join()
}); })
.expect("Error joining runtime thread")
match joined {
Ok(result) => result,
Err(_) => Err(MethodErr::failed("Error joining runtime thread")),
}
} }

View File

@@ -26,7 +26,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
use dbus::agent::DBusAgent; use dbus::agent::DBusAgent;
// Start the D-Bus agent (events in, signals out). // Start the D-Bus agent (events in, signals out).
let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver()); let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals());
tokio::spawn(async move { tokio::spawn(async move {
agent.run().await; agent.run().await;
}); });
@@ -35,8 +35,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
async fn start_ipc_agent(daemon: &mut Daemon) { async fn start_ipc_agent(daemon: &mut Daemon) {
// Start the macOS XPC agent (events in, signals out) on a dedicated thread. // Start the macOS XPC agent (events in, signals out) on a dedicated thread.
let agent = let agent = xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals());
xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
std::thread::spawn(move || { std::thread::spawn(move || {
// Use a single-threaded Tokio runtime for the XPC agent. // Use a single-threaded Tokio runtime for the XPC agent.
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()

View File

@@ -4,7 +4,7 @@ use std::ffi::CString;
use std::os::raw::c_char; use std::os::raw::c_char;
use std::ptr; use std::ptr;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError}; use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
use xpc_connection_sys as xpc_sys; use xpc_connection_sys as xpc_sys;
@@ -22,11 +22,14 @@ type Subscribers = Arc<std::sync::Mutex<Vec<XpcConn>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct XpcAgent { pub struct XpcAgent {
event_sink: mpsc::Sender<Event>, event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>, signal_receiver: Arc<Mutex<Option<broadcast::Receiver<Signal>>>>,
} }
impl XpcAgent { impl XpcAgent {
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self { pub fn new(
event_sink: mpsc::Sender<Event>,
signal_receiver: broadcast::Receiver<Signal>,
) -> Self {
Self { Self {
event_sink, event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
@@ -71,7 +74,31 @@ impl XpcAgent {
.await .await
.take() .take()
.expect("Signal receiver already taken"); .expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await { loop {
let signal = match receiver.recv().await {
Ok(signal) => signal,
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
target: LOG_TARGET,
"XPC agent lagged; skipped {} signals",
skipped
);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
log::warn!(
target: LOG_TARGET,
"Signal channel closed; stopping XPC forwarding"
);
break;
}
};
if matches!(signal, Signal::Internal(_)) {
log::trace!(target: LOG_TARGET, "Skipping internal signal for XPC");
continue;
}
log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal); log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
let msg = super::util::signal_to_message(signal); let msg = super::util::signal_to_message(signal);
let xobj = message_to_xpc_object(msg); let xobj = message_to_xpc_object(msg);

View File

@@ -254,8 +254,8 @@ pub async fn dispatch(
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))), Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
}, },
// Reply // SendMessage
"Reply" => { "SendMessage" => {
let args = match get_dictionary_field(root, "arguments") { let args = match get_dictionary_field(root, "arguments") {
Some(a) => a, Some(a) => a,
None => { None => {
@@ -286,64 +286,12 @@ pub async fn dispatch(
_ => Vec::new(), _ => Vec::new(),
}; };
match agent match agent
.send_event(|r| Event::Reply(conversation_id, text, attachment_guids, r)) .send_event(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
.await .await
{ {
Ok(uuid) => { Ok(uuid) => {
let mut reply: XpcMap = HashMap::new(); let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "ReplyResponse"); dict_put_str(&mut reply, "type", "SendMessageResponse");
dict_put_str(&mut reply, "uuid", &uuid.to_string());
DispatchResult::new(Message::Dictionary(reply))
}
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
}
}
// NewConversation
"NewConversation" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let handle_ids: Vec<String> = match args.get(&cstr("handle_ids")) {
Some(Message::Array(arr)) => arr
.iter()
.filter_map(|m| match m {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
.collect(),
_ => Vec::new(),
};
if handle_ids.is_empty() {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing handle_ids",
));
}
let text = dict_get_str(args, "text").unwrap_or_default();
let attachment_guids: Vec<String> = match args.get(&cstr("attachment_guids")) {
Some(Message::Array(arr)) => arr
.iter()
.filter_map(|m| match m {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
.collect(),
_ => Vec::new(),
};
match agent
.send_event(|r| Event::NewConversation(handle_ids, text, attachment_guids, r))
.await
{
Ok(uuid) => {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "NewConversationResponse");
dict_put_str(&mut reply, "uuid", &uuid.to_string()); dict_put_str(&mut reply, "uuid", &uuid.to_string());
DispatchResult::new(Message::Dictionary(reply)) DispatchResult::new(Message::Dictionary(reply))
} }

View File

@@ -28,7 +28,7 @@ dbus-tree = "0.9.2"
# D-Bus codegen only on Linux # D-Bus codegen only on Linux
[target.'cfg(target_os = "linux")'.build-dependencies] [target.'cfg(target_os = "linux")'.build-dependencies]
dbus-codegen = { version = "0.10.0", default-features = false } dbus-codegen = "0.10.0"
# XPC (libxpc) interface only on macOS # XPC (libxpc) interface only on macOS
[target.'cfg(target_os = "macos")'.dependencies] [target.'cfg(target_os = "macos")'.dependencies]

View File

@@ -5,10 +5,10 @@ use kordophone::api::InMemoryAuthenticationStore;
use kordophone::APIInterface; use kordophone::APIInterface;
use crate::printers::{ConversationPrinter, MessagePrinter}; use crate::printers::{ConversationPrinter, MessagePrinter};
use anyhow::{bail, Result}; use anyhow::Result;
use clap::Subcommand; use clap::Subcommand;
use kordophone::model::event::EventData; use kordophone::model::event::EventData;
use kordophone::model::{HandleResolutionStatus, OutgoingMessage, OutgoingMessageTarget}; use kordophone::model::outgoing_message::OutgoingMessage;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -47,29 +47,14 @@ pub enum Commands {
/// Prints all raw updates from the server. /// Prints all raw updates from the server.
RawUpdates, RawUpdates,
/// Resolves an address to a canonical handle. /// Sends a message to the server.
#[command(alias = "resolve")] SendMessage {
ResolveHandle { address: String },
/// Replies to an existing conversation.
#[command(alias = "send-message")]
Reply {
conversation_id: String, conversation_id: String,
message: String, message: String,
}, },
/// Starts a new message to one or more resolved handles.
New {
#[arg(long = "handle", required = true)]
handle_ids: Vec<String>,
message: String,
},
/// Marks a conversation as read. /// Marks a conversation as read.
Mark { conversation_id: String }, Mark { conversation_id: String },
/// Deletes a conversation from the server.
Delete { conversation_id: String },
} }
impl Commands { impl Commands {
@@ -81,21 +66,13 @@ impl Commands {
Commands::Messages { conversation_id } => client.print_messages(conversation_id).await, Commands::Messages { conversation_id } => client.print_messages(conversation_id).await,
Commands::RawUpdates => client.print_raw_updates().await, Commands::RawUpdates => client.print_raw_updates().await,
Commands::Events => client.print_events().await, Commands::Events => client.print_events().await,
Commands::ResolveHandle { address } => client.resolve_handle(address).await, Commands::SendMessage {
Commands::Reply {
conversation_id, conversation_id,
message, message,
} => client.reply(conversation_id, message).await, } => client.send_message(conversation_id, message).await,
Commands::New {
handle_ids,
message,
} => client.new_message(handle_ids, message).await,
Commands::Mark { conversation_id } => { Commands::Mark { conversation_id } => {
client.mark_conversation_as_read(conversation_id).await client.mark_conversation_as_read(conversation_id).await
} }
Commands::Delete { conversation_id } => {
client.delete_conversation(conversation_id).await
}
} }
} }
} }
@@ -190,91 +167,20 @@ impl ClientCli {
Ok(()) Ok(())
} }
pub async fn resolve_handle(&mut self, address: String) -> Result<()> { pub async fn send_message(&mut self, conversation_id: String, message: String) -> Result<()> {
let response = self.api.resolve_handle(&address).await?;
let status = match response.status {
HandleResolutionStatus::Valid => "valid",
HandleResolutionStatus::Invalid => "invalid",
HandleResolutionStatus::Unknown => "unknown",
};
println!("Resolved handle: {}", response.resolved_handle.id);
if let Some(name) = response.resolved_handle.name {
println!("Name: {}", name);
}
println!("Status: {}", status);
if let Some(conversation_id) = response.existing_chat {
println!("Existing conversation: {}", conversation_id);
}
Ok(())
}
async fn send_message(&mut self, target: OutgoingMessageTarget, message: String) -> Result<()> {
let outgoing_message = OutgoingMessage::builder() let outgoing_message = OutgoingMessage::builder()
.target(target) .conversation_id(conversation_id)
.text(message) .text(message)
.build(); .build();
let response = self.api.send_message(&outgoing_message).await?; let message = self.api.send_message(&outgoing_message).await?;
if let Some(conversation_id) = response.conversation_id { println!("Message sent: {}", message.guid);
println!(
"Message sent: {} conversation: {}",
response.message.guid, conversation_id
);
} else {
println!("Message sent: {}", response.message.guid);
}
Ok(()) Ok(())
} }
async fn resolve_handle_ids(&mut self, handle_ids: Vec<String>) -> Result<Vec<String>> {
let mut resolved_handle_ids = Vec::with_capacity(handle_ids.len());
for handle_id in handle_ids {
let response = self.api.resolve_handle(&handle_id).await?;
match response.status {
HandleResolutionStatus::Valid => {
resolved_handle_ids.push(response.resolved_handle.id);
}
HandleResolutionStatus::Invalid => {
bail!("Handle '{}' is not iMessage-capable.", handle_id);
}
HandleResolutionStatus::Unknown => {
bail!("Handle '{}' could not be resolved.", handle_id);
}
}
}
Ok(resolved_handle_ids)
}
pub async fn reply(&mut self, conversation_id: String, message: String) -> Result<()> {
self.send_message(
OutgoingMessageTarget::Conversation(conversation_id),
message,
)
.await
}
pub async fn new_message(&mut self, handle_ids: Vec<String>, message: String) -> Result<()> {
let resolved_handle_ids = self.resolve_handle_ids(handle_ids).await?;
self.send_message(OutgoingMessageTarget::Handles(resolved_handle_ids), message)
.await
}
pub async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> { pub async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
self.api.mark_conversation_as_read(&conversation_id).await?; self.api.mark_conversation_as_read(&conversation_id).await?;
println!("Conversation marked as read: {}", conversation_id); println!("Conversation marked as read: {}", conversation_id);
Ok(()) Ok(())
} }
pub async fn delete_conversation(&mut self, conversation_id: String) -> Result<()> {
self.api.delete_conversation(&conversation_id).await?;
println!("Conversation deleted: {}", conversation_id);
Ok(())
}
} }

View File

@@ -33,7 +33,7 @@ impl DBusDaemonInterface {
fn proxy(&self) -> Proxy<&Connection> { fn proxy(&self) -> Proxy<&Connection> {
self.conn self.conn
.with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_secs(30)) .with_proxy(DBUS_NAME, DBUS_PATH, std::time::Duration::from_millis(5000))
} }
async fn print_settings(&mut self) -> Result<()> { async fn print_settings(&mut self) -> Result<()> {
@@ -109,20 +109,15 @@ impl DaemonInterface for DBusDaemonInterface {
Ok(()) Ok(())
} }
async fn reply(&mut self, conversation_id: String, text: String) -> Result<()> { async fn enqueue_outgoing_message(
&mut self,
conversation_id: String,
text: String,
) -> Result<()> {
let attachment_guids: Vec<&str> = vec![]; let attachment_guids: Vec<&str> = vec![];
let outgoing_message_id = let outgoing_message_id = KordophoneRepository::send_message(
KordophoneRepository::reply(&self.proxy(), &conversation_id, &text, attachment_guids)?;
println!("Outgoing message ID: {}", outgoing_message_id);
Ok(())
}
async fn new_conversation(&mut self, handle_ids: Vec<String>, text: String) -> Result<()> {
let attachment_guids: Vec<&str> = vec![];
let handle_ids: Vec<&str> = handle_ids.iter().map(String::as_str).collect();
let outgoing_message_id = KordophoneRepository::new_conversation(
&self.proxy(), &self.proxy(),
handle_ids, &conversation_id,
&text, &text,
attachment_guids, attachment_guids,
)?; )?;
@@ -214,4 +209,9 @@ impl DaemonInterface for DBusDaemonInterface {
KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id) KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id)
.map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {}", e)) .map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {}", e))
} }
async fn test_notification(&mut self, summary: String, body: String) -> Result<()> {
KordophoneRepository::test_notification(&self.proxy(), &summary, &body)
.map_err(|e| anyhow::anyhow!("Failed to trigger test notification: {}", e))
}
} }

View File

@@ -21,14 +21,18 @@ pub trait DaemonInterface {
conversation_id: String, conversation_id: String,
last_message_id: Option<String>, last_message_id: Option<String>,
) -> Result<()>; ) -> Result<()>;
async fn reply(&mut self, conversation_id: String, text: String) -> Result<()>; async fn enqueue_outgoing_message(
async fn new_conversation(&mut self, handle_ids: Vec<String>, text: String) -> Result<()>; &mut self,
conversation_id: String,
text: String,
) -> Result<()>;
async fn wait_for_signals(&mut self) -> Result<()>; async fn wait_for_signals(&mut self) -> Result<()>;
async fn config(&mut self, cmd: ConfigCommands) -> Result<()>; async fn config(&mut self, cmd: ConfigCommands) -> Result<()>;
async fn delete_all_conversations(&mut self) -> Result<()>; async fn delete_all_conversations(&mut self) -> Result<()>;
async fn download_attachment(&mut self, attachment_id: String) -> Result<()>; async fn download_attachment(&mut self, attachment_id: String) -> Result<()>;
async fn upload_attachment(&mut self, path: String) -> Result<()>; async fn upload_attachment(&mut self, path: String) -> Result<()>;
async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>; async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>;
async fn test_notification(&mut self, summary: String, body: String) -> Result<()>;
} }
struct StubDaemonInterface; struct StubDaemonInterface;
@@ -70,12 +74,11 @@ impl DaemonInterface for StubDaemonInterface {
"Daemon interface not implemented on this platform" "Daemon interface not implemented on this platform"
)) ))
} }
async fn reply(&mut self, _conversation_id: String, _text: String) -> Result<()> { async fn enqueue_outgoing_message(
Err(anyhow::anyhow!( &mut self,
"Daemon interface not implemented on this platform" _conversation_id: String,
)) _text: String,
} ) -> Result<()> {
async fn new_conversation(&mut self, _handle_ids: Vec<String>, _text: String) -> Result<()> {
Err(anyhow::anyhow!( Err(anyhow::anyhow!(
"Daemon interface not implemented on this platform" "Daemon interface not implemented on this platform"
)) ))
@@ -110,6 +113,11 @@ impl DaemonInterface for StubDaemonInterface {
"Daemon interface not implemented on this platform" "Daemon interface not implemented on this platform"
)) ))
} }
async fn test_notification(&mut self, _summary: String, _body: String) -> Result<()> {
Err(anyhow::anyhow!(
"Daemon interface not implemented on this platform"
))
}
} }
pub fn new_daemon_interface() -> Result<Box<dyn DaemonInterface>> { pub fn new_daemon_interface() -> Result<Box<dyn DaemonInterface>> {
@@ -159,20 +167,12 @@ pub enum Commands {
/// Deletes all conversations. /// Deletes all conversations.
DeleteAllConversations, DeleteAllConversations,
/// Replies to an existing conversation. /// Enqueues an outgoing message to be sent to a conversation.
#[command(alias = "send-message")] SendMessage {
Reply {
conversation_id: String, conversation_id: String,
text: String, text: String,
}, },
/// Starts a new conversation with one or more resolved handles.
New {
#[arg(long = "handle", required = true)]
handle_ids: Vec<String>,
text: String,
},
/// Downloads an attachment from the server to the attachment store. Returns the path to the attachment. /// Downloads an attachment from the server to the attachment store. Returns the path to the attachment.
DownloadAttachment { attachment_id: String }, DownloadAttachment { attachment_id: String },
@@ -181,6 +181,9 @@ pub enum Commands {
/// Marks a conversation as read. /// Marks a conversation as read.
MarkConversationAsRead { conversation_id: String }, MarkConversationAsRead { conversation_id: String },
/// Displays a test notification using the daemon.
TestNotification { summary: String, body: String },
} }
#[derive(Subcommand)] #[derive(Subcommand)]
@@ -214,11 +217,10 @@ impl Commands {
.await .await
} }
Commands::DeleteAllConversations => client.delete_all_conversations().await, Commands::DeleteAllConversations => client.delete_all_conversations().await,
Commands::Reply { Commands::SendMessage {
conversation_id, conversation_id,
text, text,
} => client.reply(conversation_id, text).await, } => client.enqueue_outgoing_message(conversation_id, text).await,
Commands::New { handle_ids, text } => client.new_conversation(handle_ids, text).await,
Commands::UploadAttachment { path } => client.upload_attachment(path).await, Commands::UploadAttachment { path } => client.upload_attachment(path).await,
Commands::DownloadAttachment { attachment_id } => { Commands::DownloadAttachment { attachment_id } => {
client.download_attachment(attachment_id).await client.download_attachment(attachment_id).await
@@ -226,6 +228,9 @@ impl Commands {
Commands::MarkConversationAsRead { conversation_id } => { Commands::MarkConversationAsRead { conversation_id } => {
client.mark_conversation_as_read(conversation_id).await client.mark_conversation_as_read(conversation_id).await
} }
Commands::TestNotification { summary, body } => {
client.test_notification(summary, body).await
}
} }
} }
} }

View File

@@ -371,7 +371,11 @@ impl DaemonInterface for XpcDaemonInterface {
_ => Err(anyhow::anyhow!("Unexpected messages payload")), _ => Err(anyhow::anyhow!("Unexpected messages payload")),
} }
} }
async fn reply(&mut self, _conversation_id: String, _text: String) -> Result<()> { async fn enqueue_outgoing_message(
&mut self,
_conversation_id: String,
_text: String,
) -> Result<()> {
let mach_port_name = Self::build_service_name()?; let mach_port_name = Self::build_service_name()?;
let mut client = XPCClient::connect(&mach_port_name); let mut client = XPCClient::connect(&mach_port_name);
let mut args = HashMap::new(); let mut args = HashMap::new();
@@ -383,34 +387,10 @@ impl DaemonInterface for XpcDaemonInterface {
Self::key("text"), Self::key("text"),
Message::String(CString::new(_text).unwrap()), Message::String(CString::new(_text).unwrap()),
); );
let response = self.call_method(&mut client, "Reply", Some(args)).await?; let reply = self
if let Some(uuid) = Self::get_string(&response, "uuid") { .call_method(&mut client, "SendMessage", Some(args))
println!("Outgoing message ID: {}", uuid.to_string_lossy());
}
Ok(())
}
async fn new_conversation(&mut self, handle_ids: Vec<String>, text: String) -> Result<()> {
let mach_port_name = Self::build_service_name()?;
let mut client = XPCClient::connect(&mach_port_name);
let mut args = HashMap::new();
args.insert(
Self::key("handle_ids"),
Message::Array(
handle_ids
.into_iter()
.map(|handle_id| Message::String(CString::new(handle_id).unwrap()))
.collect(),
),
);
args.insert(
Self::key("text"),
Message::String(CString::new(text).unwrap()),
);
let response = self
.call_method(&mut client, "NewConversation", Some(args))
.await?; .await?;
if let Some(uuid) = Self::get_string(&response, "uuid") { if let Some(uuid) = Self::get_string(&reply, "uuid") {
println!("Outgoing message ID: {}", uuid.to_string_lossy()); println!("Outgoing message ID: {}", uuid.to_string_lossy());
} }
Ok(()) Ok(())

View File

@@ -1,12 +0,0 @@
[package]
name = "kptui"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.93"
crossterm = "0.28.1"
kordophoned-client = { path = "../kordophoned-client" }
ratatui = { version = "0.29.0", features = ["unstable-rendered-line-info"] }
time = { version = "0.3.37", features = ["formatting"] }
unicode-width = "0.2.0"

View File

@@ -1,791 +0,0 @@
use kordophoned_client as daemon;
use anyhow::Result;
use crossterm::event::{Event as CEvent, KeyCode, KeyEvent, KeyEventKind, KeyModifiers};
use crossterm::terminal::{disable_raw_mode, enable_raw_mode};
use ratatui::prelude::*;
use ratatui::widgets::*;
use std::sync::mpsc;
use std::time::{Duration, Instant};
use unicode_width::UnicodeWidthStr;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ViewMode {
List,
Chat,
Split,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum Focus {
Navigation,
Input,
}
struct AppState {
conversations: Vec<daemon::ConversationSummary>,
selected_idx: usize,
selected_conversation_id: Option<String>,
messages: Vec<daemon::ChatMessage>,
active_conversation_id: Option<String>,
active_conversation_title: String,
status: String,
input: String,
focus: Focus,
transcript_scroll: u16,
pinned_to_bottom: bool,
refresh_conversations_in_flight: bool,
refresh_messages_in_flight: bool,
}
impl AppState {
fn new() -> Self {
Self {
conversations: Vec::new(),
selected_idx: 0,
selected_conversation_id: None,
messages: Vec::new(),
active_conversation_id: None,
active_conversation_title: String::new(),
status: String::new(),
input: String::new(),
focus: Focus::Navigation,
transcript_scroll: 0,
pinned_to_bottom: true,
refresh_conversations_in_flight: false,
refresh_messages_in_flight: false,
}
}
fn select_next(&mut self) {
if self.conversations.is_empty() {
self.selected_idx = 0;
self.selected_conversation_id = None;
return;
}
self.selected_idx = (self.selected_idx + 1).min(self.conversations.len() - 1);
self.selected_conversation_id = self
.conversations
.get(self.selected_idx)
.map(|c| c.id.clone());
}
fn select_prev(&mut self) {
if self.conversations.is_empty() {
self.selected_idx = 0;
self.selected_conversation_id = None;
return;
}
self.selected_idx = self.selected_idx.saturating_sub(1);
self.selected_conversation_id = self
.conversations
.get(self.selected_idx)
.map(|c| c.id.clone());
}
fn open_selected_conversation(&mut self) {
if let Some(conv) = self.conversations.get(self.selected_idx) {
self.active_conversation_id = Some(conv.id.clone());
self.active_conversation_title = conv.title.clone();
self.selected_conversation_id = Some(conv.id.clone());
self.messages.clear();
self.transcript_scroll = 0;
self.pinned_to_bottom = true;
self.focus = Focus::Input;
self.status = "Loading…".to_string();
}
}
}
fn view_mode(width: u16, has_active_conversation: bool, requested: ViewMode) -> ViewMode {
let min_conversations = 24u16;
let min_chat = 44u16;
let min_total = min_conversations + 1 + min_chat;
if width >= min_total {
return ViewMode::Split;
}
if has_active_conversation {
requested
} else {
ViewMode::List
}
}
fn ui(frame: &mut Frame, app: &AppState, requested_view: ViewMode) {
let area = frame.area();
let mode = view_mode(
area.width,
app.active_conversation_id.is_some(),
requested_view,
);
let show_input =
matches!(mode, ViewMode::Chat | ViewMode::Split) && app.active_conversation_id.is_some();
let chunks = if show_input {
Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Min(1),
Constraint::Length(3),
Constraint::Length(1),
])
.split(area)
} else {
Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Min(1), Constraint::Length(1)])
.split(area)
};
let (main_area, input_area, status_area) = if show_input {
(chunks[0], Some(chunks[1]), chunks[2])
} else {
(chunks[0], None, chunks[1])
};
match mode {
ViewMode::Split => {
let left_width = (main_area.width / 3).clamp(24, 40);
let cols = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Length(left_width), Constraint::Min(1)])
.split(main_area);
render_conversations(frame, app, cols[0], true);
render_transcript(frame, app, cols[1], true);
}
ViewMode::List => render_conversations(frame, app, main_area, false),
ViewMode::Chat => render_transcript(frame, app, main_area, false),
}
if let Some(input_area) = input_area {
let input_scroll_x = render_input(frame, app, input_area);
if app.focus == Focus::Input {
let cursor_col = visual_width_u16(app.input.as_str());
let mut x = input_area
.x
.saturating_add(1)
.saturating_add(cursor_col.saturating_sub(input_scroll_x));
let max_x = input_area
.x
.saturating_add(input_area.width.saturating_sub(2));
x = x.min(max_x);
let y = input_area.y + 1;
frame.set_cursor_position(Position { x, y });
}
}
render_status(frame, app, status_area, mode);
}
fn render_conversations(frame: &mut Frame, app: &AppState, area: Rect, _in_split: bool) {
let title = "Kordophone";
let items = app
.conversations
.iter()
.map(|c| {
let is_active = app.active_conversation_id.as_deref() == Some(c.id.as_str());
let header = Line::from(vec![
Span::styled(
if c.unread_count > 0 { "" } else { " " },
Style::default()
.fg(if c.unread_count > 0 {
Color::LightYellow
} else {
Color::DarkGray
})
.add_modifier(Modifier::BOLD),
),
Span::styled(
c.title.clone(),
Style::default().add_modifier(Modifier::BOLD),
),
]);
let preview = Line::from(Span::styled(
c.preview.clone(),
Style::default().fg(if is_active {
Color::Gray
} else {
Color::DarkGray
}),
));
ListItem::new(vec![header, preview]).style(if is_active {
Style::default().bg(Color::DarkGray)
} else {
Style::default()
})
})
.collect::<Vec<_>>();
let mut state = ListState::default();
state.select(if app.conversations.is_empty() {
None
} else {
Some(app.selected_idx)
});
let list = List::new(items)
.block(Block::default().borders(Borders::ALL).title(title))
.highlight_style(
Style::default()
.bg(Color::Blue)
.fg(Color::White)
.add_modifier(Modifier::BOLD),
)
.highlight_symbol("");
frame.render_stateful_widget(list, area, &mut state);
}
fn render_transcript(frame: &mut Frame, app: &AppState, area: Rect, in_split: bool) {
let title = if let Some(_) = app.active_conversation_id {
if in_split {
format!("{} (Esc: nav, Tab: focus)", app.active_conversation_title)
} else {
format!("{} (Esc: back)", app.active_conversation_title)
}
} else {
"Chat".to_string()
};
let lines = transcript_lines(&app.messages);
let paragraph = Paragraph::new(Text::from(lines))
.block(Block::default().borders(Borders::ALL).title(title))
.wrap(Wrap { trim: false })
.scroll((app.transcript_scroll, 0));
frame.render_widget(paragraph, area);
}
fn render_input(frame: &mut Frame, app: &AppState, area: Rect) -> u16 {
let title = if app.focus == Focus::Input {
"Reply (Enter to send)"
} else {
"Reply (press i to type)"
};
let inner_width = area.width.saturating_sub(2).max(1);
let cursor_col = visual_width_u16(app.input.as_str());
let input_scroll_x = cursor_col.saturating_sub(inner_width.saturating_sub(1));
let input = Paragraph::new(app.input.as_str())
.block(Block::default().borders(Borders::ALL).title(title))
.scroll((0, input_scroll_x));
frame.render_widget(input, area);
input_scroll_x
}
fn render_status(frame: &mut Frame, app: &AppState, area: Rect, mode: ViewMode) {
let mut parts = vec![
format!("{} convs", app.conversations.len()),
match mode {
ViewMode::Split => "split".to_string(),
ViewMode::List => "list".to_string(),
ViewMode::Chat => "chat".to_string(),
},
];
if !app.status.trim().is_empty() {
parts.push(app.status.clone());
}
let line = parts.join(" | ");
frame.render_widget(
Paragraph::new(line).block(Block::default().borders(Borders::TOP)),
area,
);
}
fn main() -> Result<()> {
enable_raw_mode()?;
let mut stdout = std::io::stdout();
crossterm::execute!(
stdout,
crossterm::terminal::EnterAlternateScreen,
crossterm::event::EnableMouseCapture
)?;
let backend = ratatui::backend::CrosstermBackend::new(stdout);
let mut terminal = ratatui::Terminal::new(backend)?;
let res = run_app(&mut terminal);
disable_raw_mode()?;
crossterm::execute!(
terminal.backend_mut(),
crossterm::event::DisableMouseCapture,
crossterm::terminal::LeaveAlternateScreen
)?;
terminal.show_cursor()?;
res
}
fn run_app(
terminal: &mut ratatui::Terminal<ratatui::backend::CrosstermBackend<std::io::Stdout>>,
) -> Result<()> {
let (request_tx, request_rx) = mpsc::channel::<daemon::Request>();
let (event_tx, event_rx) = mpsc::channel::<daemon::Event>();
let _worker = daemon::spawn_worker(request_rx, event_tx);
let tick_rate = Duration::from_millis(150);
let refresh_rate = Duration::from_secs(2);
let mut last_tick = Instant::now();
let mut last_refresh = Instant::now() - refresh_rate;
let mut requested_view = ViewMode::List;
let mut app = AppState::new();
app.status = "Connecting…".to_string();
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
loop {
let size = terminal.size()?;
while let Ok(evt) = event_rx.try_recv() {
match evt {
daemon::Event::Conversations(convs) => {
let keep_selected_id = app
.selected_conversation_id
.clone()
.or_else(|| app.active_conversation_id.clone());
app.refresh_conversations_in_flight = false;
app.status.clear();
app.conversations = convs;
if app.conversations.is_empty() {
app.selected_idx = 0;
app.selected_conversation_id = None;
} else if let Some(id) = keep_selected_id {
if let Some(idx) = app.conversations.iter().position(|c| c.id == id) {
app.selected_idx = idx;
app.selected_conversation_id = Some(id);
} else {
app.selected_idx = 0;
app.selected_conversation_id = Some(app.conversations[0].id.clone());
}
} else {
app.selected_idx = app.selected_idx.min(app.conversations.len() - 1);
app.selected_conversation_id =
Some(app.conversations[app.selected_idx].id.clone());
}
}
daemon::Event::Messages {
conversation_id,
messages,
} => {
app.refresh_messages_in_flight = false;
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
let was_pinned = app.pinned_to_bottom;
app.messages = messages;
app.pinned_to_bottom = was_pinned;
}
}
daemon::Event::MessageQueued {
conversation_id,
outgoing_id,
} => {
if let Some(conversation_id) = conversation_id {
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
app.status = outgoing_id
.as_deref()
.map(|id| format!("Sent ({id})"))
.unwrap_or_else(|| "Sent".to_string());
app.refresh_messages_in_flight = false;
request_tx
.send(daemon::Request::RefreshMessages { conversation_id })
.ok();
app.refresh_messages_in_flight = true;
}
}
}
daemon::Event::MarkedRead => {}
daemon::Event::ConversationSyncTriggered { conversation_id } => {
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
app.status = "Syncing…".to_string();
}
}
daemon::Event::ConversationsUpdated => {
if !app.refresh_conversations_in_flight {
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
}
if let Some(cid) = app.active_conversation_id.clone() {
if !app.refresh_messages_in_flight {
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
}
}
daemon::Event::MessagesUpdated { conversation_id } => {
if !app.refresh_conversations_in_flight {
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
}
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
if !app.refresh_messages_in_flight {
request_tx
.send(daemon::Request::RefreshMessages { conversation_id })
.ok();
app.refresh_messages_in_flight = true;
}
}
}
daemon::Event::UpdateStreamReconnected => {
if !app.refresh_conversations_in_flight {
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
}
if let Some(cid) = app.active_conversation_id.clone() {
if !app.refresh_messages_in_flight {
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
}
}
daemon::Event::Error(e) => {
app.refresh_conversations_in_flight = false;
app.refresh_messages_in_flight = false;
app.status = e;
}
}
}
apply_transcript_scroll_policy(&mut app, size, requested_view);
terminal.draw(|f| ui(f, &app, requested_view))?;
let timeout = tick_rate.saturating_sub(last_tick.elapsed());
if crossterm::event::poll(timeout)? {
if let CEvent::Key(key) = crossterm::event::read()? {
if key.kind != KeyEventKind::Press {
continue;
}
let ctrl = key.modifiers.contains(KeyModifiers::CONTROL);
match (key.code, ctrl) {
(KeyCode::Char('c'), true) => return Ok(()),
_ => {}
}
let screen_mode = view_mode(
size.width,
app.active_conversation_id.is_some(),
requested_view,
);
let max_scroll = max_transcript_scroll(&app, size, requested_view);
match screen_mode {
ViewMode::List => match key.code {
KeyCode::Up => app.select_prev(),
KeyCode::Down => app.select_next(),
KeyCode::Enter => {
app.open_selected_conversation();
if app.active_conversation_id.is_some() {
requested_view = ViewMode::Chat;
if let Some(cid) = app.active_conversation_id.clone() {
request_tx
.send(daemon::Request::MarkRead {
conversation_id: cid.clone(),
})
.ok();
request_tx
.send(daemon::Request::SyncConversation {
conversation_id: cid.clone(),
})
.ok();
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
}
}
_ => {}
},
ViewMode::Chat => match key.code {
KeyCode::Esc => {
requested_view = ViewMode::List;
app.focus = Focus::Navigation;
}
KeyCode::Char('i') if app.focus != Focus::Input => app.focus = Focus::Input,
_ => {
handle_chat_keys(&mut app, &request_tx, key, max_scroll);
}
},
ViewMode::Split => match key.code {
KeyCode::Tab => {
app.focus = match app.focus {
Focus::Navigation => Focus::Input,
Focus::Input => Focus::Navigation,
}
}
KeyCode::Esc => app.focus = Focus::Navigation,
KeyCode::Char('i') if app.focus != Focus::Input => app.focus = Focus::Input,
KeyCode::Up => {
if app.focus == Focus::Navigation {
app.select_prev()
} else {
scroll_up(&mut app, 1);
}
}
KeyCode::Down => {
if app.focus == Focus::Navigation {
app.select_next()
} else {
scroll_down(&mut app, 1, max_scroll);
}
}
KeyCode::Enter => {
if app.focus == Focus::Navigation {
app.open_selected_conversation();
requested_view = ViewMode::Chat;
if let Some(cid) = app.active_conversation_id.clone() {
request_tx
.send(daemon::Request::MarkRead {
conversation_id: cid.clone(),
})
.ok();
request_tx
.send(daemon::Request::SyncConversation {
conversation_id: cid.clone(),
})
.ok();
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
} else {
handle_chat_keys(&mut app, &request_tx, key, max_scroll);
}
}
_ => handle_chat_keys(&mut app, &request_tx, key, max_scroll),
},
}
}
}
if last_refresh.elapsed() >= refresh_rate {
if !app.refresh_conversations_in_flight {
request_tx.send(daemon::Request::RefreshConversations).ok();
app.refresh_conversations_in_flight = true;
}
if let Some(cid) = app.active_conversation_id.clone() {
if !app.refresh_messages_in_flight {
request_tx
.send(daemon::Request::RefreshMessages {
conversation_id: cid,
})
.ok();
app.refresh_messages_in_flight = true;
}
}
last_refresh = Instant::now();
}
if last_tick.elapsed() >= tick_rate {
last_tick = Instant::now();
}
}
}
fn handle_chat_keys(
app: &mut AppState,
request_tx: &mpsc::Sender<daemon::Request>,
key: KeyEvent,
max_scroll: u16,
) {
let code = key.code;
let modifiers = key.modifiers;
match code {
KeyCode::PageUp => scroll_up(app, 10),
KeyCode::PageDown => scroll_down(app, 10, max_scroll),
_ => {}
}
if app.focus != Focus::Input {
return;
}
match code {
KeyCode::Char('u') if modifiers.contains(KeyModifiers::CONTROL) => {
app.input.clear();
}
KeyCode::Backspace if modifiers.contains(KeyModifiers::ALT) => {
delete_prev_word(&mut app.input);
}
KeyCode::Char('w') if modifiers.contains(KeyModifiers::CONTROL) => {
delete_prev_word(&mut app.input);
}
KeyCode::Enter => {
let text = app.input.trim().to_string();
if text.is_empty() {
return;
}
let Some(conversation_id) = app.active_conversation_id.clone() else {
app.status = "No conversation selected".to_string();
return;
};
request_tx
.send(daemon::Request::Reply {
conversation_id,
text,
})
.ok();
app.refresh_messages_in_flight = true;
app.input.clear();
}
KeyCode::Backspace => {
app.input.pop();
}
KeyCode::Char(c) => {
if !c.is_control() {
app.input.push(c);
}
}
_ => {}
}
}
fn delete_prev_word(input: &mut String) {
while input.chars().last().is_some_and(|c| c.is_whitespace()) {
input.pop();
}
while input.chars().last().is_some_and(|c| !c.is_whitespace()) {
input.pop();
}
}
fn scroll_up(app: &mut AppState, amount: u16) {
if amount > 0 {
app.pinned_to_bottom = false;
}
app.transcript_scroll = app.transcript_scroll.saturating_sub(amount);
}
fn scroll_down(app: &mut AppState, amount: u16, max_scroll: u16) {
app.transcript_scroll = app.transcript_scroll.saturating_add(amount);
if app.transcript_scroll >= max_scroll {
app.transcript_scroll = max_scroll;
app.pinned_to_bottom = true;
}
}
fn transcript_inner_width(size: Size, app: &AppState, requested_view: ViewMode) -> u16 {
let mode = view_mode(
size.width,
app.active_conversation_id.is_some(),
requested_view,
);
let outer_width = match mode {
ViewMode::Split => {
let left_width = (size.width / 3).clamp(24, 40);
size.width.saturating_sub(left_width)
}
ViewMode::Chat => size.width,
ViewMode::List => 0,
};
outer_width.saturating_sub(2).max(1)
}
fn visual_width_u16(s: &str) -> u16 {
s.width().min(u16::MAX as usize) as u16
}
fn transcript_lines(messages: &[daemon::ChatMessage]) -> Vec<Line<'static>> {
let mut lines: Vec<Line<'static>> = Vec::new();
for message in messages {
let ts = time::OffsetDateTime::from_unix_timestamp(message.date_unix)
.unwrap_or(time::OffsetDateTime::UNIX_EPOCH)
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string());
lines.push(Line::from(vec![
Span::styled(
message.sender.clone(),
Style::default().add_modifier(Modifier::BOLD),
),
Span::raw(" "),
Span::styled(ts, Style::default().fg(Color::DarkGray)),
]));
let mut rendered_any_text = false;
for text_line in message.text.lines() {
rendered_any_text = true;
lines.push(Line::from(Span::raw(text_line.to_string())));
}
if !rendered_any_text {
lines.push(Line::from(Span::styled(
"<non-text message>",
Style::default().fg(Color::DarkGray),
)));
}
lines.push(Line::from(Span::raw("")));
}
if lines.is_empty() {
lines.push(Line::from(Span::styled(
"No messages.",
Style::default().fg(Color::DarkGray),
)));
}
lines
}
fn transcript_content_visual_lines(messages: &[daemon::ChatMessage], inner_width: u16) -> u16 {
let paragraph =
Paragraph::new(Text::from(transcript_lines(messages))).wrap(Wrap { trim: false });
paragraph.line_count(inner_width).min(u16::MAX as usize) as u16
}
fn transcript_viewport_height(size: Size, app: &AppState, requested_view: ViewMode) -> u16 {
let mode = view_mode(
size.width,
app.active_conversation_id.is_some(),
requested_view,
);
let show_input =
matches!(mode, ViewMode::Chat | ViewMode::Split) && app.active_conversation_id.is_some();
let transcript_height = if show_input {
size.height.saturating_sub(4) // input (3) + status (1)
} else {
size.height.saturating_sub(1) // status
};
match mode {
ViewMode::Chat | ViewMode::Split => transcript_height.saturating_sub(2), // borders
ViewMode::List => 0,
}
}
fn max_transcript_scroll(app: &AppState, size: Size, requested_view: ViewMode) -> u16 {
let viewport_height = transcript_viewport_height(size, app, requested_view);
let inner_width = transcript_inner_width(size, app, requested_view);
let content = transcript_content_visual_lines(&app.messages, inner_width);
content.saturating_sub(viewport_height)
}
fn apply_transcript_scroll_policy(app: &mut AppState, size: Size, requested_view: ViewMode) {
let max_scroll = max_transcript_scroll(app, size, requested_view);
if app.pinned_to_bottom {
app.transcript_scroll = max_scroll;
} else {
app.transcript_scroll = app.transcript_scroll.min(max_scroll);
}
}

View File

@@ -1,263 +0,0 @@
# GLib Bindings Plan
## Status
Proposed. Not started.
## Context
Today the GTK app talks to `kordophoned` directly over D-Bus in
[`gtk/src/service/repository.vala`](/home/buzzert/src/Kordophone/gtk/src/service/repository.vala)
and the generated interface in
[`gtk/src/service/interface/dbusservice.vala`](/home/buzzert/src/Kordophone/gtk/src/service/interface/dbusservice.vala).
At the same time, the Rust-side daemon client logic already exists in
[`core/kordophoned-client/src/worker.rs`](/home/buzzert/src/Kordophone/core/kordophoned-client/src/worker.rs)
with platform backends for D-Bus and XPC. That means protocol changes currently
have to be reflected in multiple places:
- `kordophoned` D-Bus/XPC server shims
- `kordophoned-client` Rust transport layer
- GTK/Vala D-Bus interface and proxy code
- Swift XPC client code
For GTK/Vala specifically, the goal is to stop binding the application directly
to the daemon protocol surface.
## Recommendation
Add a GTK-facing GLib/GObject wrapper on top of a small C ABI exported from the
Rust daemon client stack.
Do not expose the current `kordophoned-client` Rust API directly as raw C.
The current surface uses Rust enums, `Vec<String>`, `Option`, and a threaded
worker model, which is fine internally but not a good stable FFI boundary.
The recommended layering is:
1. Keep `core/kordophoned-client` as the Rust-native transport/domain layer.
2. Add a new FFI crate with a narrow, C-safe API.
3. Add a small GLib/GObject wrapper for GTK/Vala consumption.
4. Migrate the GTK app to that wrapper and remove its direct D-Bus binding code.
This keeps one transport implementation in Rust while giving Vala a natural
GObject-style API with methods, async operations, and signals.
## Why Not Direct Rust GObject Export?
Exporting a GObject API directly from Rust is possible in principle, but the
tooling for generating the introspection artifacts that Vala wants is still much
less straightforward than plain C/GObject.
For this repo, the lower-risk path is:
- Rust for the daemon client implementation
- C ABI as the stable binary boundary
- a thin C/GObject wrapper for GI/Vala
That gives us standard GLib ownership rules, normal `.gir` / `.typelib` /
`.vapi` generation, and a cleaner Meson integration story for the GTK app.
## Proposed Layout
Add a new crate:
- `core/kordophoned-client-c`
This crate should export a small `extern "C"` interface around the existing
daemon client logic.
Add a new Linux-focused wrapper library:
- `gtk/libkordophone-client-glib` or `gtk/src/service/glib/`
This wrapper should be written in C and expose a GObject API that Vala can use.
It should depend on the Rust C ABI library, not on D-Bus directly.
## Proposed Responsibilities
### `core/kordophoned-client`
- Own request/response/signal semantics.
- Own platform transport handling:
- D-Bus on Linux
- XPC on macOS
- Stay Rust-native.
### `core/kordophoned-client-c`
- Define opaque client handles.
- Define FFI-safe request/response structs.
- Define callback registration for async completions and daemon signals.
- Marshal Rust events onto C callbacks.
- Hide Rust enums and collections from C consumers.
### GLib Wrapper
- Expose a `KpDaemonClient` GObject.
- Convert C callbacks into `GTask` completions and GObject signals.
- Marshal all callbacks onto the GLib main context.
- Expose Vala-friendly model objects or boxed structs.
## Draft Public Surface
The GTK-facing API should look like a normal GLib client, not like a transport
binding.
Suggested primary type:
- `KpDaemonClient`
Suggested async methods:
- `get_conversations_async(limit, offset, cancellable, callback)`
- `get_messages_async(conversation_id, last_message_id, cancellable, callback)`
- `reply_async(conversation_id, text, attachment_guids, cancellable, callback)`
- `new_conversation_async(handle_ids, text, attachment_guids, cancellable, callback)`
- `mark_conversation_as_read_async(conversation_id, cancellable, callback)`
- `sync_conversation_async(conversation_id, cancellable, callback)`
- `sync_conversation_list_async(cancellable, callback)`
- `upload_attachment_async(path, cancellable, callback)`
- `download_attachment_async(attachment_id, preview, cancellable, callback)`
- `get_attachment_info_async(attachment_id, cancellable, callback)`
Suggested synchronous or utility methods:
- `open_attachment_fd(attachment_id, preview, error)`
- `start()`
- `stop()`
Suggested signals:
- `conversations-updated`
- `messages-updated(conversation-id)`
- `attachment-downloaded(attachment-id)`
- `attachment-uploaded(upload-guid, attachment-guid)`
- `reconnected`
- `error(message)`
The first pass does not need to expose every daemon event. It only needs enough
surface to replace the current GTK repository layer.
## Suggested Model Types
Avoid returning raw hash tables to Vala.
Add small typed model objects or boxed structs for:
- `KpConversationSummary`
- `KpChatMessage`
- `KpAttachmentInfo`
If send acknowledgements matter to the UI, add:
- `KpQueuedMessage`
The GTK app can keep its own higher-level `Repository` wrapper initially, but it
should be wrapping typed client results instead of raw D-Bus maps.
## Signal Handling
Signals are the main reason this should be a GLib wrapper instead of plain C
calls from Vala.
Required behavior:
- daemon signal subscriptions must stay alive for the lifetime of the client
- transport callbacks must never call into GTK from a non-main thread
- all emitted GObject signals must be marshalled onto the GLib main context
The C ABI should therefore support registration of signal callbacks plus a user
data pointer, while the GLib wrapper owns the main-context handoff.
## Migration Plan
### Phase 1: Stabilize Rust FFI Boundary
- Add FFI-safe request/response types instead of exposing the current worker
enums directly.
- Keep the Rust worker and transport code internal.
- Decide which operations are callback-based and which can be blocking.
### Phase 2: Add `kordophoned-client-c`
- Expose opaque client construction/destruction.
- Expose request entry points for the operations GTK already uses.
- Expose signal subscription hooks.
- Add explicit allocation/free helpers for returned strings and arrays.
### Phase 3: Add GLib Wrapper
- Implement `KpDaemonClient` as a GObject in C.
- Convert C callbacks into `GTask`-based async completion methods.
- Emit GObject signals for daemon events.
- Generate introspection artifacts for Vala.
### Phase 4: Migrate GTK
- Replace direct use of `DBusService.Repository` in
[`gtk/src/service/repository.vala`](/home/buzzert/src/Kordophone/gtk/src/service/repository.vala).
- Remove the generated D-Bus binding dependency from the GTK app.
- Keep the existing GTK-side repository shape initially to minimize churn.
### Phase 5: Revisit Swift
Optional.
If this turns out cleaner than the current Swift XPC wrapper, add a Swift-facing
wrapper around the same C ABI later. This is not required for the GTK migration.
## Build System Notes
This plan introduces a Cargo + Meson integration boundary.
Expected follow-up work:
- decide whether the Rust C ABI library is built via `cargo build`, `cargo-c`,
or a Meson custom target
- decide where generated headers live
- decide where `.gir`, `.typelib`, and `.vapi` artifacts are produced and
installed
The cleanest packaging story is likely:
- Cargo builds the Rust library
- Meson builds the GLib wrapper and generates introspection data
- GTK links to the GLib wrapper
## Non-Goals
- replacing D-Bus and XPC with a custom socket transport
- unifying the macOS app onto GLib
- exposing the entire daemon protocol on day one
- redesigning GTK application architecture beyond the service boundary
## Risks
- FFI ownership mistakes across Rust, C, and GLib
- callback threading bugs if signal delivery is not marshalled correctly
- build complexity from mixed Cargo and Meson workflows
- over-exposing the current daemon protocol instead of defining a cleaner client
API
## Open Questions
- Should the C ABI be Linux-only at first, or cross-platform from day one?
- Should the first GTK-facing layer expose send acknowledgements, or just fire
and rely on message update signals?
- Should handle resolution be part of the GLib client API immediately, or added
only when GTK gains compose-new-conversation UI?
- Is it worth creating a higher-level shared protocol schema before building the
C ABI, or should that wait until after the GTK migration?
## Short Version
If we do this later, the best path is probably:
- Rust daemon client stays as the implementation core
- add a small C ABI on top of it
- add a tiny C/GObject wrapper for Vala
- move GTK off direct D-Bus bindings
That removes one of the protocol surfaces we currently maintain without forcing
the GTK app to consume a Rust-native API directly.

2
gtk/.gitignore vendored
View File

@@ -1,3 +1 @@
build/ build/
flatpak-build/
.flatpak-builder/

View File

@@ -1,25 +0,0 @@
FROM debian:trixie
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
dpkg \
make \
build-essential \
python3 \
imagemagick \
meson \
ninja-build \
valac \
pkg-config \
libgtk-4-dev \
libadwaita-1-dev \
libglib2.0-dev \
libgee-0.8-dev \
libsecret-1-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /workspace
COPY . .
CMD ["make", "deb"]

View File

@@ -5,20 +5,10 @@ all: setup
setup: build/ setup: build/
meson build meson build
VER_RAW := $(shell git -C .. describe --tags --abbrev=0 2>/dev/null || git -C .. describe --tags 2>/dev/null || printf '0.0.0') VER := 1.0.2
VER := $(patsubst v%,%,$(VER_RAW))
TMP := $(shell mktemp -d) TMP := $(shell mktemp -d)
rpm: rpm:
git -C .. archive --format=tar.gz --prefix=kordophone/ -o $(TMP)/v$(VER).tar.gz HEAD git -C .. archive --format=tar.gz --prefix=kordophone/ -o $(TMP)/v$(VER).tar.gz HEAD
rpmbuild -ba dist/rpm/kordophone.spec --define "_sourcedir $(TMP)" --define "app_version $(VER)" rpmbuild -ba dist/rpm/kordophone.spec --define "_sourcedir $(TMP)"
deb:
./dist/deb/build-deb.sh $(VER)
.PHONY: flatpak
flatpak:
flatpak-builder --force-clean flatpak-build flatpak/net.buzzert.kordophone.yml
.PHONY: flatpak-install
flatpak-install:
flatpak-builder --force-clean --user --install flatpak-build flatpak/net.buzzert.kordophone.yml

View File

@@ -5,5 +5,3 @@ Libadwaita/GTK4 client for the Kordophone client daemon.
# Building # Building
Build an RPM using `rpmbuild -ba dist/rpm/kordophone.spec` Build an RPM using `rpmbuild -ba dist/rpm/kordophone.spec`
Build a DEB using `make deb`

View File

@@ -1,6 +0,0 @@
*.deb
*.buildinfo
*.changes
*.dsc
*.tar.*
*.build

View File

@@ -1,49 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
VERSION="${1:-}"
if [[ -z "$VERSION" ]]; then
VERSION="$(sed -n "s/.*version[[:space:]]*:[[:space:]]*'\\([^']*\\)'.*/\\1/p" meson.build | head -n1)"
fi
if [[ -z "$VERSION" ]]; then
echo "Could not determine version (pass as first arg)" >&2
exit 1
fi
ARCH="$(dpkg --print-architecture)"
PKG="kordophone"
STAGE="$(mktemp -d)"
trap 'rm -rf "$STAGE"' EXIT
PKGROOT="$STAGE/pkgroot"
mkdir -p "$PKGROOT/DEBIAN"
BUILD_DIR="$STAGE/build"
meson setup "$BUILD_DIR" --prefix=/usr
ninja -C "$BUILD_DIR"
DESTDIR="$PKGROOT" ninja -C "$BUILD_DIR" install
INSTALLED_SIZE_KB="$(du -sk "$PKGROOT/usr" | awk '{print $1}')"
cat >"$PKGROOT/DEBIAN/control" <<EOF
Package: ${PKG}
Version: ${VERSION}
Section: net
Priority: optional
Architecture: ${ARCH}
Maintainer: James Magahern <james@magahern.com>
Installed-Size: ${INSTALLED_SIZE_KB}
Depends: libgtk-4-1, libadwaita-1-0, libglib2.0-0, libgee-0.8-2, libsecret-1-0, kordophoned (>= 1.3.0)
Description: GTK4/Libadwaita client for Kordophone
A GTK4/Libadwaita Linux client for the Kordophone client daemon.
EOF
OUT_DIR="$ROOT_DIR/dist/deb"
mkdir -p "$OUT_DIR"
OUT_DEB="${OUT_DIR}/${PKG}_${VERSION}_${ARCH}.deb"
dpkg-deb --root-owner-group --build "$PKGROOT" "$OUT_DEB"
echo "$OUT_DEB"

View File

@@ -1,5 +1,5 @@
Name: kordophone Name: kordophone
Version: %{?app_version}%{!?app_version:1.3.0} Version: 1.0.2
Release: 1%{?dist} Release: 1%{?dist}
Summary: GTK4/Libadwaita client for Kordophone Summary: GTK4/Libadwaita client for Kordophone
@@ -22,7 +22,7 @@ Requires: libadwaita
Requires: glib2 Requires: glib2
Requires: libgee Requires: libgee
Requires: libsecret Requires: libsecret
Requires: kordophoned >= 1.3.0 Requires: kordophoned >= 1.0.0
%description %description
A GTK4/Libadwaita Linux Client for the Kordophone client daemon. A GTK4/Libadwaita Linux Client for the Kordophone client daemon.
@@ -49,3 +49,4 @@ popd
%changelog %changelog
* Fri Aug 8 2025 James Magahern <james@magahern.com> * Fri Aug 8 2025 James Magahern <james@magahern.com>
- Updated rpmspec - Updated rpmspec

View File

@@ -1,18 +0,0 @@
# Flatpak (GTK client)
This builds the GTK client as a Flatpak **assuming `kordophoned` is installed on the host**
and reachable on the **session bus** as `net.buzzert.kordophonecd`.
## Build
```bash
cd gtk
make flatpak
```
## Install (user)
```bash
cd gtk
make flatpak-install
```

View File

@@ -1,25 +0,0 @@
app-id: net.buzzert.kordophone
runtime: org.gnome.Platform
runtime-version: "48"
sdk: org.gnome.Sdk
command: kordophone
finish-args:
- --share=ipc
- --socket=wayland
- --socket=fallback-x11
- --device=dri
# Talk to the host-installed daemon (option A).
- --socket=session-bus
- --talk-name=net.buzzert.kordophonecd
# libsecret (Secret Service) access for stored credentials.
- --talk-name=org.freedesktop.secrets
modules:
- name: kordophone
buildsystem: meson
config-opts:
- --prefix=/app
sources:
- type: dir
path: ..

View File

@@ -1,5 +1,5 @@
project('kordophone', 'vala', project('kordophone', 'vala',
version : '1.0.2', version : '1.0.1',
meson_version : '>=0.56.0', meson_version : '>=0.56.0',
default_options : ['warning_level=2'] default_options : ['warning_level=2']
) )

View File

@@ -5,7 +5,6 @@ public class MainWindow : Adw.ApplicationWindow
{ {
private ConversationListView conversation_list_view; private ConversationListView conversation_list_view;
private TranscriptContainerView transcript_container_view; private TranscriptContainerView transcript_container_view;
private NavigationSplitView split_view;
private EventControllerMotion _motion_controller = new EventControllerMotion(); private EventControllerMotion _motion_controller = new EventControllerMotion();
private bool _motion_queued = false; private bool _motion_queued = false;
@@ -13,15 +12,10 @@ public class MainWindow : Adw.ApplicationWindow
public MainWindow () { public MainWindow () {
Object (title: "Kordophone"); Object (title: "Kordophone");
split_view = new NavigationSplitView (); var split_view = new NavigationSplitView ();
split_view.set_min_sidebar_width (400); split_view.set_min_sidebar_width (400);
split_view.show_content = false;
set_content (split_view); set_content (split_view);
var breakpoint = new Breakpoint (BreakpointCondition.parse ("max-width: 750sp"));
breakpoint.add_setter (split_view, "collapsed", true);
add_breakpoint (breakpoint);
conversation_list_view = new ConversationListView (); conversation_list_view = new ConversationListView ();
conversation_list_view.conversation_selected.connect (conversation_selected); conversation_list_view.conversation_selected.connect (conversation_selected);
conversation_list_view.conversation_activated.connect (open_conversation_in_new_window); conversation_list_view.conversation_activated.connect (open_conversation_in_new_window);
@@ -106,10 +100,6 @@ public class MainWindow : Adw.ApplicationWindow
GLib.warning("Failed to sync conversation: %s", e.message); GLib.warning("Failed to sync conversation: %s", e.message);
} }
} }
if (split_view.collapsed) {
split_view.show_content = true;
}
} }
} }

View File

@@ -14,7 +14,6 @@ public class ConversationListView : Adw.Bin
private string? selected_conversation_guid = null; private string? selected_conversation_guid = null;
private bool selection_update_queued = false; private bool selection_update_queued = false;
private bool suppress_row_selected = false;
public ConversationListView () { public ConversationListView () {
container = new Adw.ToolbarView (); container = new Adw.ToolbarView ();
@@ -30,10 +29,6 @@ public class ConversationListView : Adw.Bin
scrolled_window.set_child (list_box); scrolled_window.set_child (list_box);
list_box.row_selected.connect ((row) => { list_box.row_selected.connect ((row) => {
if (suppress_row_selected) {
return;
}
var conversation_row = (ConversationRow?) row; var conversation_row = (ConversationRow?) row;
if (conversation_row != null) { if (conversation_row != null) {
selected_conversation_guid = conversation_row.conversation.guid; selected_conversation_guid = conversation_row.conversation.guid;
@@ -117,9 +112,7 @@ public class ConversationListView : Adw.Bin
if (conversation.guid == selected_conversation_guid) { if (conversation.guid == selected_conversation_guid) {
var row = list_box.get_row_at_index((int)i); var row = list_box.get_row_at_index((int)i);
if (row != null) { if (row != null) {
suppress_row_selected = true;
list_box.select_row(row); list_box.select_row(row);
suppress_row_selected = false;
} }
} }
} }

View File

@@ -44,11 +44,11 @@ public class AttachmentInfo : Object {
} }
public class Attachment : Object { public class Attachment : Object {
public string guid = ""; public string guid;
public string path = ""; public string path;
public string preview_path = ""; public string preview_path;
public bool downloaded = false; public bool downloaded;
public bool preview_downloaded = false; public bool preview_downloaded;
public AttachmentMetadata? metadata; public AttachmentMetadata? metadata;
public Attachment(string guid, AttachmentMetadata? metadata) { public Attachment(string guid, AttachmentMetadata? metadata) {

View File

@@ -50,11 +50,8 @@ namespace DBusService {
[DBus (name = "GetMessages")] [DBus (name = "GetMessages")]
public abstract GLib.HashTable<string, GLib.Variant>[] get_messages(string conversation_id, string last_message_id) throws DBusError, IOError; public abstract GLib.HashTable<string, GLib.Variant>[] get_messages(string conversation_id, string last_message_id) throws DBusError, IOError;
[DBus (name = "Reply")] [DBus (name = "SendMessage")]
public abstract string reply(string conversation_id, string text, string[] attachment_guids) throws DBusError, IOError; public abstract string send_message(string conversation_id, string text, string[] attachment_guids) throws DBusError, IOError;
[DBus (name = "NewConversation")]
public abstract string new_conversation(string[] handle_ids, string text, string[] attachment_guids) throws DBusError, IOError;
[DBus (name = "MessagesUpdated")] [DBus (name = "MessagesUpdated")]
public signal void messages_updated(string conversation_id); public signal void messages_updated(string conversation_id);

View File

@@ -73,17 +73,18 @@
'sender' (string): Sender display name 'sender' (string): Sender display name
'attachments' (array of dictionaries): List of attachments 'attachments' (array of dictionaries): List of attachments
'guid' (string): Attachment GUID 'guid' (string): Attachment GUID
'path' (string): Attachment path
'preview_path' (string): Preview attachment path
'downloaded' (boolean): Whether the attachment is downloaded 'downloaded' (boolean): Whether the attachment is downloaded
'preview_downloaded' (boolean): Whether the preview is downloaded 'preview_downloaded' (boolean): Whether the preview is downloaded
'metadata' (dictionary, optional): Attachment metadata 'metadata' (dictionary, optional): Attachment metadata
'attribution_info' (dictionary, optional): Attribution info 'attribution_info' (dictionary, optional): Attribution info
'width' (int32, optional): Width 'width' (int32, optional): Width
'height' (int32, optional): Height 'height' (int32, optional): Height"/>
Use GetAttachmentInfo for full/preview paths."/>
</arg> </arg>
</method> </method>
<method name="Reply"> <method name="SendMessage">
<arg type="s" name="conversation_id" direction="in"/> <arg type="s" name="conversation_id" direction="in"/>
<arg type="s" name="text" direction="in"/> <arg type="s" name="text" direction="in"/>
<arg type="as" name="attachment_guids" direction="in"/> <arg type="as" name="attachment_guids" direction="in"/>
@@ -91,28 +92,9 @@
<arg type="s" name="outgoing_message_id" direction="out"/> <arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString" <annotation name="org.freedesktop.DBus.DocString"
value="Replies to an existing conversation. Returns the outgoing message ID. value="Sends a message to the server. Returns the outgoing message ID.
Arguments: Arguments:
- conversation_id: The ID of the conversation to reply to. - conversation_id: The ID of the conversation to send the message to.
- text: The text of the message to send.
- attachment_guids: The GUIDs of the attachments to send.
Returns:
- outgoing_message_id: The ID of the outgoing message.
"/>
</method>
<method name="NewConversation">
<arg type="as" name="handle_ids" direction="in"/>
<arg type="s" name="text" direction="in"/>
<arg type="as" name="attachment_guids" direction="in"/>
<arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Sends a message to a new conversation identified by resolved handles.
Arguments:
- handle_ids: The resolved handles for the new conversation.
- text: The text of the message to send. - text: The text of the message to send.
- attachment_guids: The GUIDs of the attachments to send. - attachment_guids: The GUIDs of the attachments to send.

View File

@@ -96,20 +96,12 @@ public class Repository : DBusServiceProxy {
return returned_messages; return returned_messages;
} }
public string reply(string conversation_guid, string message, string[] attachment_guids) throws DBusServiceProxyError, GLib.Error { public string send_message(string conversation_guid, string message, string[] attachment_guids) throws DBusServiceProxyError, GLib.Error {
if (dbus_repository == null) { if (dbus_repository == null) {
throw new DBusServiceProxyError.NOT_CONNECTED("Repository not connected"); throw new DBusServiceProxyError.NOT_CONNECTED("Repository not connected");
} }
return dbus_repository.reply(conversation_guid, message, attachment_guids); return dbus_repository.send_message(conversation_guid, message, attachment_guids);
}
public string new_conversation(string[] handle_ids, string message, string[] attachment_guids) throws DBusServiceProxyError, GLib.Error {
if (dbus_repository == null) {
throw new DBusServiceProxyError.NOT_CONNECTED("Repository not connected");
}
return dbus_repository.new_conversation(handle_ids, message, attachment_guids);
} }
public void sync_conversation(string conversation_guid) throws DBusServiceProxyError, GLib.Error { public void sync_conversation(string conversation_guid) throws DBusServiceProxyError, GLib.Error {
@@ -152,35 +144,4 @@ public class Repository : DBusServiceProxy {
var info = dbus_repository.get_attachment_info(attachment_guid); var info = dbus_repository.get_attachment_info(attachment_guid);
return new AttachmentInfo(info.attr1, info.attr2, info.attr3, info.attr4); return new AttachmentInfo(info.attr1, info.attr2, info.attr3, info.attr4);
} }
public int open_attachment_fd(string attachment_guid, bool preview) throws DBusServiceProxyError, GLib.Error {
if (dbus_repository == null) {
throw new DBusServiceProxyError.NOT_CONNECTED("Repository not connected");
}
var connection = Bus.get_sync(BusType.SESSION);
UnixFDList? out_fd_list = null;
var result = connection.call_with_unix_fd_list_sync(
DBUS_NAME,
DBUS_PATH,
"net.buzzert.kordophone.Repository",
"OpenAttachmentFd",
new Variant("(sb)", attachment_guid, preview),
new VariantType("(h)"),
DBusCallFlags.NONE,
120000,
null,
out out_fd_list,
null
);
int fd_handle = -1;
result.get("(h)", out fd_handle);
if (out_fd_list == null) {
throw new DBusServiceProxyError.NOT_CONNECTED("Missing UnixFDList from OpenAttachmentFd");
}
return out_fd_list.get(fd_handle);
}
} }

View File

@@ -13,91 +13,61 @@ private class SizeCache
return instance; return instance;
} }
public Graphene.Size? get_size(string attachment_guid) { public Graphene.Size? get_size(string image_path) {
return size_cache.get(attachment_guid); return size_cache.get(image_path);
} }
public void set_size(string attachment_guid, Graphene.Size size) { public void set_size(string image_path, Graphene.Size size) {
size_cache.set(attachment_guid, size); size_cache.set(image_path, size);
}
}
private class TextureCache
{
private static TextureCache instance = null;
private HashMap<string, Gdk.Texture> texture_cache = new HashMap<string, Gdk.Texture>();
public static TextureCache get_instance() {
if (instance == null) {
instance = new TextureCache();
}
return instance;
}
public Gdk.Texture? get_texture(string attachment_guid) {
return texture_cache.get(attachment_guid);
}
public void set_texture(string attachment_guid, Gdk.Texture texture) {
texture_cache.set(attachment_guid, texture);
} }
} }
private class ImageBubbleLayout : BubbleLayout private class ImageBubbleLayout : BubbleLayout
{ {
public string attachment_guid; public string image_path;
public bool is_downloaded; public bool is_downloaded;
public string? attachment_guid;
private Graphene.Size image_size; private Graphene.Size image_size;
private Gdk.Texture? cached_texture = null; private Gdk.Texture? cached_texture = null;
private bool preview_download_queued = false;
public ImageBubbleLayout(string attachment_guid, bool from_me, Widget parent, float max_width, Graphene.Size? image_size = null) { public ImageBubbleLayout(string image_path, bool from_me, Widget parent, float max_width, Graphene.Size? image_size = null) {
base(parent, max_width); base(parent, max_width);
this.from_me = from_me; this.from_me = from_me;
this.attachment_guid = attachment_guid; this.image_path = image_path;
this.is_downloaded = false; this.is_downloaded = false;
this.cached_texture = TextureCache.get_instance().get_texture(attachment_guid);
if (this.cached_texture != null) {
this.image_size = Graphene.Size() {
width = (float)this.cached_texture.get_width(),
height = (float)this.cached_texture.get_height()
};
SizeCache.get_instance().set_size(attachment_guid, this.image_size);
return;
}
// Calculate image dimensions for layout // Calculate image dimensions for layout
calculate_image_dimensions(image_size); calculate_image_dimensions(image_size);
} }
private void calculate_image_dimensions(Graphene.Size? image_size) { private void calculate_image_dimensions(Graphene.Size? image_size) {
var cached_size = SizeCache.get_instance().get_size(attachment_guid);
if (cached_size != null) {
this.image_size = cached_size;
return;
}
if (image_size != null) { if (image_size != null) {
this.image_size = image_size; this.image_size = image_size;
return; return;
} }
this.image_size = Graphene.Size() { width = 200.0f, height = 150.0f }; var cached_size = SizeCache.get_instance().get_size(image_path);
} if (cached_size != null) {
this.image_size = cached_size;
private void queue_preview_download_if_needed() {
if (is_downloaded || preview_download_queued || attachment_guid == "") {
return; return;
} }
// Try to load the image to get its dimensions
try { try {
Repository.get_instance().download_attachment(attachment_guid, true); warning("No image size provided, loading image to get dimensions");
preview_download_queued = true;
} catch (GLib.Error e) { var texture = Gdk.Texture.from_filename(image_path);
warning("Failed to queue preview download for %s: %s", attachment_guid, e.message); var original_width = (float)texture.get_width();
var original_height = (float)texture.get_height();
this.image_size = Graphene.Size() { width = original_width, height = original_height };
SizeCache.get_instance().set_size(image_path, this.image_size);
} catch (Error e) {
// Fallback dimensions if image can't be loaded
warning("Failed to load image %s: %s", image_path, e.message);
this.image_size = Graphene.Size() { width = 200.0f, height = 150.0f };
} }
} }
@@ -111,22 +81,9 @@ private class ImageBubbleLayout : BubbleLayout
} }
try { try {
int fd = Repository.get_instance().open_attachment_fd(attachment_guid, true); cached_texture = Gdk.Texture.from_filename(image_path);
var stream = new UnixInputStream(fd, true);
var pixbuf = new Gdk.Pixbuf.from_stream(stream, null);
cached_texture = Gdk.Texture.for_pixbuf(pixbuf);
if (cached_texture != null) {
TextureCache.get_instance().set_texture(attachment_guid, cached_texture);
this.image_size = Graphene.Size() {
width = (float)cached_texture.get_width(),
height = (float)cached_texture.get_height()
};
SizeCache.get_instance().set_size(attachment_guid, this.image_size);
parent.queue_allocate();
}
} catch (Error e) { } catch (Error e) {
warning("Failed to load preview image for %s: %s", attachment_guid, e.message); warning("Failed to load image %s: %s", image_path, e.message);
} }
} }
@@ -153,7 +110,6 @@ private class ImageBubbleLayout : BubbleLayout
} }
public override void draw_content(Snapshot snapshot) { public override void draw_content(Snapshot snapshot) {
queue_preview_download_if_needed();
load_image_if_needed(); load_image_if_needed();
snapshot.save(); snapshot.save();

View File

@@ -62,65 +62,44 @@ public class MessageListModel : Object, ListModel
} }
} }
public void load_messages(bool force_full_reload = false) { public void load_messages() {
var previous_messages = new HashSet<Message>(); var previous_messages = new HashSet<Message>();
previous_messages.add_all(_messages); previous_messages.add_all(_messages);
try { try {
bool first_load = _messages.size == 0; bool first_load = _messages.size == 0;
string last_message_id = (first_load || force_full_reload) ? "" : _messages.get(_messages.size - 1).guid;
Message[] messages = Repository.get_instance().get_messages(conversation.guid, last_message_id); Message[] messages = Repository.get_instance().get_messages(conversation.guid);
bool fallback_full_reload = first_load || force_full_reload; // Clear existing set
if (!first_load && messages.length > 0 && previous_messages.contains(messages[0])) { uint old_count = _messages.size;
fallback_full_reload = true; _messages.clear();
participants.clear();
// Notify of removal
if (old_count > 0) {
items_changed(0, old_count, 0);
} }
if (fallback_full_reload) { // Process each conversation
uint old_count = _messages.size; uint position = 0;
_messages.clear();
participants.clear();
if (old_count > 0) { for (int i = 0; i < messages.length; i++) {
items_changed(0, old_count, 0); var message = messages[i];
} participants.add(message.sender);
uint position = 0; if (!first_load && !previous_messages.contains(message)) {
for (int i = 0; i < messages.length; i++) { // This is a new message according to the UI, schedule an animation for it.
var message = messages[i];
participants.add(message.sender);
if (!first_load && !previous_messages.contains(message)) {
message.should_animate = true;
}
_messages.add(message);
position++;
}
if (position > 0) {
items_changed(0, 0, position);
}
} else {
uint old_count = _messages.size;
uint appended = 0;
for (int i = 0; i < messages.length; i++) {
var message = messages[i];
if (previous_messages.contains(message)) {
continue;
}
participants.add(message.sender);
message.should_animate = true; message.should_animate = true;
_messages.add(message);
appended++;
} }
if (appended > 0) { _messages.add(message);
items_changed(old_count, 0, appended); position++;
} }
// Notify of additions
if (position > 0) {
items_changed(0, 0, position);
} }
} catch (Error e) { } catch (Error e) {
warning("Failed to load messages: %s", e.message); warning("Failed to load messages: %s", e.message);

View File

@@ -257,7 +257,7 @@ class TranscriptContainerView : Adw.Bin
} }
try { try {
Repository.get_instance().reply(selected_conversation.guid, body, attachment_guids.to_array()); Repository.get_instance().send_message(selected_conversation.guid, body, attachment_guids.to_array());
} catch (Error e) { } catch (Error e) {
GLib.warning("Failed to send message: %s", e.message); GLib.warning("Failed to send message: %s", e.message);
} }

View File

@@ -327,8 +327,7 @@ private class TranscriptDrawingArea : Widget
private void recompute_message_layouts() { private void recompute_message_layouts() {
var container_width = get_width(); var container_width = get_width();
float max_width = container_width * 0.80f; float max_width = container_width * 0.90f;
float image_max_width = max_width * 0.70f;
DateTime? last_date = null; DateTime? last_date = null;
string? last_sender = null; string? last_sender = null;
@@ -365,15 +364,16 @@ private class TranscriptDrawingArea : Widget
// Check for attachments. For each one, add an image layout bubble // Check for attachments. For each one, add an image layout bubble
foreach (var attachment in message.attachments) { foreach (var attachment in message.attachments) {
Graphene.Size? image_size = null; Graphene.Size? image_size = null;
if (attachment.metadata != null && attachment.metadata.attribution_info != null) { if (attachment.metadata != null) {
image_size = Graphene.Size() { image_size = Graphene.Size() {
width = attachment.metadata.attribution_info.width, width = attachment.metadata.attribution_info.width,
height = attachment.metadata.attribution_info.height height = attachment.metadata.attribution_info.height
}; };
} }
var image_layout = new ImageBubbleLayout(attachment.guid, message.from_me, this, image_max_width, image_size); var image_layout = new ImageBubbleLayout(attachment.preview_path, message.from_me, this, max_width, image_size);
image_layout.id = @"image-$(attachment.guid)"; image_layout.id = @"image-$(attachment.guid)";
image_layout.attachment_guid = attachment.guid;
if (animate) { if (animate) {
start_animation(image_layout.id); start_animation(image_layout.id);
@@ -381,6 +381,16 @@ private class TranscriptDrawingArea : Widget
image_layout.is_downloaded = attachment.preview_downloaded; image_layout.is_downloaded = attachment.preview_downloaded;
items.add(image_layout); items.add(image_layout);
// If the attachment isn't downloaded, queue a download since we are going to be showing it here.
// TODO: Probably would be better if we only did this for stuff in the viewport.
if (!attachment.preview_downloaded) {
try {
Repository.get_instance().download_attachment(attachment.guid, true);
} catch (GLib.Error e) {
warning("Wasn't able to message daemon about queuing attachment download: %s", e.message);
}
}
} }
last_sender = message.sender; last_sender = message.sender;

View File

@@ -148,7 +148,7 @@ public class TranscriptView : Adw.Bin
GLib.Idle.add(() => { GLib.Idle.add(() => {
if (needs_reload) { if (needs_reload) {
debug("Reloading messages for attachment download"); debug("Reloading messages for attachment download");
model.load_messages(true); model.load_messages();
needs_reload = false; needs_reload = false;
} }
@@ -159,6 +159,7 @@ public class TranscriptView : Adw.Bin
} }
delegate void OpenPath(string path); delegate void OpenPath(string path);
private ulong attachment_downloaded_handler_id = 0;
private void open_attachment(string attachment_guid) { private void open_attachment(string attachment_guid) {
OpenPath open_path = (path) => { OpenPath open_path = (path) => {
try { try {
@@ -179,17 +180,10 @@ public class TranscriptView : Adw.Bin
// TODO: Should probably indicate progress here. // TODO: Should probably indicate progress here.
ulong handler_id = 0; attachment_downloaded_handler_id = Repository.get_instance().attachment_downloaded.connect((guid) => {
handler_id = Repository.get_instance().attachment_downloaded.connect((guid) => {
if (guid == attachment_guid) { if (guid == attachment_guid) {
try { open_path(attachment_info.path);
var updated_attachment_info = Repository.get_instance().get_attachment_info(attachment_guid); Repository.get_instance().disconnect(attachment_downloaded_handler_id);
open_path(updated_attachment_info.path);
} catch (GLib.Error e) {
warning("Failed to get attachment info after download: %s", e.message);
}
Repository.get_instance().disconnect(handler_id);
} }
}); });
} }

View File

@@ -119,7 +119,7 @@ struct MessageEntryView: View
Task { Task {
do { do {
try await client.reply( try await client.sendMessage(
conversationId: convo.id, conversationId: convo.id,
message: messageText, message: messageText,
transferGuids: transferGuids transferGuids: transferGuids

View File

@@ -133,7 +133,7 @@ final class XPCClient
return results return results
} }
public func reply(conversationId: String, message: String, transferGuids: Set<String>) async throws { public func sendMessage(conversationId: String, message: String, transferGuids: Set<String>) async throws {
var args: [String: xpc_object_t] = [:] var args: [String: xpc_object_t] = [:]
args["conversation_id"] = xpcString(conversationId) args["conversation_id"] = xpcString(conversationId)
args["text"] = xpcString(message) args["text"] = xpcString(message)
@@ -142,20 +142,7 @@ final class XPCClient
args["attachment_guids"] = xpcStringArray(transferGuids) args["attachment_guids"] = xpcStringArray(transferGuids)
} }
let req = makeRequest(method: "Reply", arguments: args) let req = makeRequest(method: "SendMessage", arguments: args)
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError }
}
public func newConversation(handleIds: Set<String>, message: String, transferGuids: Set<String>) async throws {
var args: [String: xpc_object_t] = [:]
args["handle_ids"] = xpcStringArray(handleIds)
args["text"] = xpcString(message)
if !transferGuids.isEmpty {
args["attachment_guids"] = xpcStringArray(transferGuids)
}
let req = makeRequest(method: "NewConversation", arguments: args)
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError }
} }
@@ -424,3 +411,4 @@ extension xpc_object_t
) )
} }
} }

View File

@@ -1,3 +1,3 @@
[submodule "CocoaHTTPServer"] [submodule "CocoaHTTPServer"]
path = server/CocoaHTTPServer path = CocoaHTTPServer
url = https://github.com/robbiehanson/CocoaHTTPServer.git url = https://github.com/robbiehanson/CocoaHTTPServer.git

View File

@@ -14,7 +14,6 @@
#import "MBIMAuthToken.h" #import "MBIMAuthToken.h"
#import "MBIMUpdateQueue.h" #import "MBIMUpdateQueue.h"
#import "MBIMURLUtilities.h" #import "MBIMURLUtilities.h"
#import "MBIMLogging.h"
#import <Security/Security.h> #import <Security/Security.h>
#import "HTTPMessage.h" #import "HTTPMessage.h"
@@ -99,10 +98,6 @@
__block NSObject<HTTPResponse> *response = nil; __block NSObject<HTTPResponse> *response = nil;
dispatch_semaphore_t sema = dispatch_semaphore_create(0); dispatch_semaphore_t sema = dispatch_semaphore_create(0);
MBIMBridgeOperationCompletionBlock completion = ^(NSObject<HTTPResponse> *incomingResponse) { MBIMBridgeOperationCompletionBlock completion = ^(NSObject<HTTPResponse> *incomingResponse) {
if (incomingResponse == nil) {
MBIMLogError(@"Operation for %@ %@ completed with a nil response.", method, path);
}
response = incomingResponse; response = incomingResponse;
dispatch_semaphore_signal(sema); dispatch_semaphore_signal(sema);
}; };
@@ -130,11 +125,6 @@
response = [_currentOperation cancelAndReturnTimeoutResponse]; response = [_currentOperation cancelAndReturnTimeoutResponse];
} }
if (response == nil) {
MBIMLogError(@"Returning fallback 500 for %@ %@ because the operation produced no response.", method, path);
response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
}
return response; return response;
} }

View File

@@ -10,7 +10,6 @@
#import "IMCore_ClassDump.h" #import "IMCore_ClassDump.h"
#import "IMMessageItem+Encoded.h" #import "IMMessageItem+Encoded.h"
#import "MBIMErrorResponse.h"
@implementation MBIMSendMessageOperation @implementation MBIMSendMessageOperation
@@ -21,226 +20,38 @@
return @"sendMessage"; return @"sendMessage";
} }
- (nullable IMChat *)_existingSingleChatForHandle:(IMHandle *)handle registry:(IMChatRegistry *)registry - (IMMessage *)_sendMessage:(NSString *)messageBody toChatWithGUID:(NSString *)chatGUID attachmentGUIDs:(NSArray<NSString *> *)guids
{ {
if ([registry respondsToSelector:@selector(existingChatWithHandle:allowAlternativeService:)]) { __block IMMessage *result = nil;
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandle:allowAlternativeService:");
return [registry existingChatWithHandle:handle allowAlternativeService:NO];
}
if ([registry respondsToSelector:@selector(existingChatWithHandle:)]) { dispatch_sync([[self class] sharedIMAccessQueue], ^{
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandle:"); IMChat *chat = [[IMChatRegistry sharedInstance] existingChatWithGUID:chatGUID];
return [registry existingChatWithHandle:handle];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandle:allowRetargeting:)]) { // TODO: chat might not be an iMessage chat!
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandle:allowRetargeting:"); IMAccount *iMessageAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]];
return [registry existingChatForIMHandle:handle allowRetargeting:NO]; IMHandle *senderHandle = [iMessageAccount loginIMHandle];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandle:)]) { NSAttributedString *replyAttrString = [[NSAttributedString alloc] initWithString:messageBody];
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandle:"); NSAttributedString *attrStringWithFileTransfers = IMCreateSuperFormatStringWithAppendedFileTransfers(replyAttrString, guids);
return [registry existingChatForIMHandle:handle];
}
MBIMLogError(@"IMChatRegistry does not support any known single-handle existing chat lookup selector."); IMMessage *reply = [IMMessage fromMeIMHandle:senderHandle
return nil; withText:attrStringWithFileTransfers
} fileTransferGUIDs:guids
flags:(kIMMessageFinished | kIMMessageIsFromMe)];
- (nullable IMChat *)_createSingleChatForHandle:(IMHandle *)handle registry:(IMChatRegistry *)registry for (NSString *guid in [reply fileTransferGUIDs]) {
{ [[IMFileTransferCenter sharedInstance] assignTransfer:guid toHandle:chat.recipient];
if ([registry respondsToSelector:@selector(chatWithHandle:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatWithHandle:");
return [registry chatWithHandle:handle];
}
if ([registry respondsToSelector:@selector(chatForIMHandle:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatForIMHandle:");
return [registry chatForIMHandle:handle];
}
MBIMLogError(@"IMChatRegistry does not support any known single-handle chat creation selector.");
return nil;
}
- (nullable IMChat *)_existingGroupChatForHandles:(NSArray<IMHandle *> *)handles registry:(IMChatRegistry *)registry
{
if ([registry respondsToSelector:@selector(existingChatWithHandles:allowAlternativeService:groupID:displayName:joinedChatsOnly:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandles:allowAlternativeService:groupID:displayName:joinedChatsOnly:");
return [registry existingChatWithHandles:handles
allowAlternativeService:NO
groupID:nil
displayName:nil
joinedChatsOnly:YES];
}
if ([registry respondsToSelector:@selector(existingChatWithHandles:allowAlternativeService:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandles:allowAlternativeService:");
return [registry existingChatWithHandles:handles allowAlternativeService:NO];
}
if ([registry respondsToSelector:@selector(existingChatWithHandles:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandles:");
return [registry existingChatWithHandles:handles];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandles:allowRetargeting:groupID:displayName:joinedChatsOnly:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandles:allowRetargeting:groupID:displayName:joinedChatsOnly:");
return [registry existingChatForIMHandles:handles
allowRetargeting:NO
groupID:nil
displayName:nil
joinedChatsOnly:YES];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandles:allowRetargeting:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandles:allowRetargeting:");
return [registry existingChatForIMHandles:handles allowRetargeting:NO];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandles:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandles:");
return [registry existingChatForIMHandles:handles];
}
MBIMLogError(@"IMChatRegistry does not support any known multi-handle existing chat lookup selector.");
return nil;
}
- (nullable IMChat *)_createGroupChatForHandles:(NSArray<IMHandle *> *)handles registry:(IMChatRegistry *)registry
{
if ([registry respondsToSelector:@selector(chatWithHandles:displayName:joinedChatsOnly:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatWithHandles:displayName:joinedChatsOnly:");
return [registry chatWithHandles:handles displayName:nil joinedChatsOnly:YES];
}
if ([registry respondsToSelector:@selector(chatWithHandles:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatWithHandles:");
return [registry chatWithHandles:handles];
}
if ([registry respondsToSelector:@selector(chatForIMHandles:displayName:joinedChatsOnly:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatForIMHandles:displayName:joinedChatsOnly:");
return [registry chatForIMHandles:handles displayName:nil joinedChatsOnly:YES];
}
if ([registry respondsToSelector:@selector(chatForIMHandles:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatForIMHandles:");
return [registry chatForIMHandles:handles];
}
MBIMLogError(@"IMChatRegistry does not support any known multi-handle chat creation selector.");
return nil;
}
- (nullable IMChat *)_chatForHandleIDs:(NSArray<NSString *> *)handleIDs registry:(IMChatRegistry *)registry
{
MBIMLogInfo(@"Resolving send target for handles: %@", handleIDs);
IMAccount *iMessageAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]];
if (!iMessageAccount) {
MBIMLogError(@"Unable to find an iMessage account for message send.");
return nil;
}
NSMutableArray<IMHandle *> *handles = [NSMutableArray arrayWithCapacity:[handleIDs count]];
for (NSString *handleID in handleIDs) {
IMHandle *handle = [iMessageAccount imHandleWithID:handleID];
if (!handle) {
MBIMLogError(@"Couldn't resolve IMHandle for id %@", handleID);
return nil;
} }
[handles addObject:handle];
}
if ([handles count] == 1) {
IMHandle *handle = [handles firstObject];
IMChat *chat = [self _existingSingleChatForHandle:handle registry:registry];
if (!chat) { if (!chat) {
chat = [self _createSingleChatForHandle:handle registry:registry]; MBIMLogInfo(@"Chat does not exist: %@", chatGUID);
}
if (chat) {
MBIMLogInfo(@"Resolved send target %@ to chat %@", [handle ID], [chat guid] ?: @"<unknown>");
} else { } else {
MBIMLogError(@"Unable to locate or create chat for handle %@", [handle ID]); result = reply;
dispatch_async(dispatch_get_main_queue(), ^{
[chat sendMessage:reply];
});
} }
return chat;
}
IMChat *chat = [self _existingGroupChatForHandles:handles registry:registry];
if (!chat) {
chat = [self _createGroupChatForHandles:handles registry:registry];
}
if (chat) {
MBIMLogInfo(@"Resolved handles %@ to chat %@", handleIDs, [chat guid] ?: @"<unknown>");
} else {
MBIMLogError(@"Unable to locate or create chat for handles %@", handleIDs);
}
return chat;
}
- (nullable NSDictionary *)_sendMessage:(NSString *)messageBody toChat:(IMChat *)chat attachmentGUIDs:(NSArray<NSString *> *)guids includeConversationGUID:(BOOL)includeConversationGUID
{
if (!chat) {
return nil;
}
NSString *chatGUID = [chat guid];
if (!chatGUID) {
chatGUID = [[[IMChatRegistry sharedInstance] allGUIDsForChat:chat] firstObject];
}
MBIMLogInfo(@"Preparing sendMessage for chat %@ (bodyLength=%lu attachmentCount=%lu)", chatGUID ?: @"<unknown>", (unsigned long)[messageBody length], (unsigned long)[guids count]);
IMAccount *sendingAccount = [chat account];
if (!sendingAccount) {
sendingAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]];
}
IMHandle *senderHandle = [sendingAccount loginIMHandle];
if (!senderHandle) {
MBIMLogError(@"Unable to determine sender handle for message send.");
return nil;
}
NSAttributedString *replyAttrString = [[NSAttributedString alloc] initWithString:messageBody];
NSAttributedString *attrStringWithFileTransfers = IMCreateSuperFormatStringWithAppendedFileTransfers(replyAttrString, guids);
IMMessage *reply = [IMMessage fromMeIMHandle:senderHandle
withText:attrStringWithFileTransfers
fileTransferGUIDs:guids
flags:(kIMMessageFinished | kIMMessageIsFromMe)];
for (NSString *guid in [reply fileTransferGUIDs]) {
[[IMFileTransferCenter sharedInstance] assignTransfer:guid toMessage:reply account:sendingAccount];
}
NSDictionary *replyRepresentation = [reply mbim_dictionaryRepresentation];
if (![replyRepresentation isKindOfClass:[NSDictionary class]]) {
MBIMLogError(@"Unable to encode sent message for chat %@", chatGUID ?: @"<unknown>");
return nil;
}
NSMutableDictionary *result = [replyRepresentation mutableCopy];
if (includeConversationGUID) {
NSString *conversationGUID = chatGUID;
if (!conversationGUID) {
conversationGUID = [[[IMChatRegistry sharedInstance] allGUIDsForChat:chat] firstObject];
}
if (conversationGUID) {
result[@"conversationGUID"] = conversationGUID;
}
}
MBIMLogInfo(@"Dispatching IMCore send for chat %@", chatGUID ?: @"<unknown>");
dispatch_async(dispatch_get_main_queue(), ^{
[chat sendMessage:reply];
}); });
return result; return result;
@@ -268,112 +79,41 @@
- (void)main - (void)main
{ {
__block NSObject<HTTPResponse> *response = [[HTTPErrorResponse alloc] initWithErrorCode:500]; NSObject<HTTPResponse> *response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
NSError *error = nil; NSError *error = nil;
NSDictionary *args = [NSJSONSerialization JSONObjectWithData:self.requestBodyData options:0 error:&error]; NSDictionary *args = [NSJSONSerialization JSONObjectWithData:self.requestBodyData options:0 error:&error];
if (error || args.count == 0) { if (error || args.count == 0) {
MBIMLogError(@"Unable to parse sendMessage request body: %@", error);
self.serverCompletionBlock(response); self.serverCompletionBlock(response);
return; return;
} }
NSString *guid = [args objectForKey:@"guid"]; NSString *guid = [args objectForKey:@"guid"];
NSString *messageBody = [args objectForKey:@"body"]; NSString *messageBody = [args objectForKey:@"body"];
NSArray *rawHandleIDs = [args objectForKey:@"handleIDs"]; if (!guid || !messageBody) {
BOOL hasGUID = [guid isKindOfClass:[NSString class]] && [guid length] > 0;
BOOL hasHandleIDs = [rawHandleIDs isKindOfClass:[NSArray class]] && [rawHandleIDs count] > 0;
if (![messageBody isKindOfClass:[NSString class]] || (!hasGUID && !hasHandleIDs) || (hasGUID && hasHandleIDs)) {
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"sendMessage requires body and exactly one of guid or handleIDs."];
self.serverCompletionBlock(response); self.serverCompletionBlock(response);
return; return;
} }
NSMutableArray<NSString *> *handleIDs = [NSMutableArray array]; // tapbacks
if (hasHandleIDs) { #if 0
for (id handleID in rawHandleIDs) { IMMessage *acknowledgment = [IMMessage instantMessageWithAssociatedMessageContent: /* [NSString stringWithFormat:@"%@ \"%%@\"", tapbackAction] */
if ([handleID isKindOfClass:[NSString class]] && [handleID length] > 0) { flags:0
[handleIDs addObject:handleID]; associatedMessageGUID:guid
} associatedMessageType:IMAssociatedMessageTypeAcknowledgmentHeart
} associatedMessageRange:[imMessage messagePartRange]
messageSummaryInfo:[self adjustMessageSummaryInfoForSending:message]
threadIdentifier:[imMessage threadIdentifier]];
#endif
handleIDs = [[[NSOrderedSet orderedSetWithArray:handleIDs] array] mutableCopy]; NSArray *transferGUIDs = [args objectForKey:@"fileTransferGUIDs"];
if ([handleIDs count] == 0) { if (!transferGUIDs) {
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"No valid handle IDs provided."]; transferGUIDs = @[];
self.serverCompletionBlock(response);
return;
}
} }
NSArray *rawTransferGUIDs = [args objectForKey:@"fileTransferGUIDs"]; IMMessage *result = [self _sendMessage:messageBody toChatWithGUID:guid attachmentGUIDs:transferGUIDs];
NSMutableArray<NSString *> *transferGUIDs = [NSMutableArray array]; if (result) {
if ([rawTransferGUIDs isKindOfClass:[NSArray class]]) { response = [MBIMJSONDataResponse responseWithJSONObject:[result mbim_dictionaryRepresentation]];
for (id transferGUID in rawTransferGUIDs) {
if ([transferGUID isKindOfClass:[NSString class]] && [transferGUID length] > 0) {
[transferGUIDs addObject:transferGUID];
}
}
}
MBIMLogInfo(@"sendMessage request received. guid=%@ handleIDs=%@ bodyLength=%lu attachmentGUIDs=%@", hasGUID ? guid : @"<none>", handleIDs, (unsigned long)[messageBody length], transferGUIDs);
@try {
dispatch_sync([[self class] sharedIMAccessQueue], ^{
IMChatRegistry *registry = [IMChatRegistry sharedInstance];
IMChat *chat = nil;
BOOL includeConversationGUID = NO;
if (hasGUID) {
MBIMLogInfo(@"sendMessage targeting existing conversation %@", guid);
chat = [registry existingChatWithGUID:guid];
if (!chat) {
MBIMLogError(@"Chat does not exist for guid %@", guid);
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Chat does not exist for the provided guid."];
return;
}
} else {
MBIMLogInfo(@"sendMessage targeting handles %@", handleIDs);
chat = [self _chatForHandleIDs:handleIDs registry:registry];
includeConversationGUID = YES;
if (!chat) {
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Unable to create or locate a chat for the provided handles."];
return;
}
}
NSString *resolvedChatGUID = [chat guid];
if (!resolvedChatGUID) {
resolvedChatGUID = [[[IMChatRegistry sharedInstance] allGUIDsForChat:chat] firstObject];
}
MBIMLogInfo(@"sendMessage resolved target chat %@", resolvedChatGUID ?: @"<unknown>");
NSDictionary *result = [self _sendMessage:messageBody
toChat:chat
attachmentGUIDs:transferGUIDs
includeConversationGUID:includeConversationGUID];
if (!result) {
MBIMLogError(@"sendMessage failed before a response payload could be encoded for chat %@", resolvedChatGUID ?: @"<unknown>");
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Unable to construct sent message response."];
return;
}
NSObject<HTTPResponse> *jsonResponse = [MBIMJSONDataResponse responseWithJSONObject:result];
if (jsonResponse) {
response = jsonResponse;
} else {
MBIMLogError(@"Unable to encode sendMessage JSON response for chat %@", resolvedChatGUID ?: @"<unknown>");
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Unable to encode sendMessage response."];
}
});
} @catch (NSException *exception) {
MBIMLogError(@"Unhandled exception during sendMessage. name=%@ reason=%@ userInfo=%@", exception.name, exception.reason, exception.userInfo);
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Unhandled exception while sending message. Check server logs."];
}
if (response == nil) {
MBIMLogError(@"sendMessage completed without producing a response. guid=%@ handleIDs=%@", hasGUID ? guid : @"<none>", handleIDs);
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"sendMessage did not produce a response. Check server logs."];
} }
self.serverCompletionBlock(response); self.serverCompletionBlock(response);