Private
Public Access
1
0

Compare commits

..

9 Commits

44 changed files with 1767 additions and 361 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
ext/
target/

View File

@@ -1,3 +1,3 @@
[submodule "CocoaHTTPServer"]
path = CocoaHTTPServer
path = server/CocoaHTTPServer
url = https://github.com/robbiehanson/CocoaHTTPServer.git

View File

@@ -24,7 +24,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use crate::{
model::{
Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage,
UpdateItem,
OutgoingMessageTarget, ResolveHandleResponse, SendMessageResponse, UpdateItem,
},
APIInterface,
};
@@ -65,7 +65,15 @@ impl std::error::Error for Error {
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
match self {
Error::ClientError(message) => write!(f, "{}", message),
Error::HTTPError(err) => write!(f, "HTTP transport error: {}", err),
Error::SerdeError(err) => write!(f, "JSON error: {}", err),
Error::DecodeError(message) => write!(f, "Decode error: {}", message),
Error::PongError(err) => write!(f, "WebSocket error: {}", err),
Error::URLError => write!(f, "Invalid URL"),
Error::Unauthorized => write!(f, "Unauthorized"),
}
}
}
@@ -284,6 +292,17 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
Ok(())
}
async fn delete_conversation(
&mut self,
conversation_id: &ConversationID,
) -> Result<(), Self::Error> {
// SERVER JANK: This should be DELETE or POST, but it's GET for some reason.
let endpoint = format!("delete?guid={}", conversation_id);
self.response_with_body_retry(&endpoint, Method::GET, Body::empty, true)
.await?;
Ok(())
}
async fn get_messages(
&mut self,
conversation_id: &ConversationID,
@@ -312,16 +331,46 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
async fn send_message(
&mut self,
outgoing_message: &OutgoingMessage,
) -> Result<Message, Self::Error> {
let message: Message = self
) -> Result<SendMessageResponse, Self::Error> {
match &outgoing_message.target {
OutgoingMessageTarget::Conversation(conversation_id) => {
log::debug!(
"Sending message to conversation {} (body_length={}, attachment_count={})",
conversation_id,
outgoing_message.text.len(),
outgoing_message.file_transfer_guids.len()
);
}
OutgoingMessageTarget::Handles(handle_ids) => {
log::debug!(
"Sending message to resolved handles {:?} (body_length={}, attachment_count={})",
handle_ids,
outgoing_message.text.len(),
outgoing_message.file_transfer_guids.len()
);
}
}
let message: SendMessageResponse = self
.deserialized_response_with_body("sendMessage", Method::POST, || {
serde_json::to_string(&outgoing_message).unwrap().into()
Self::send_message_request_body(outgoing_message)
})
.await?;
Ok(message)
}
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error> {
log::debug!("Resolving handle {}", handle_id);
let endpoint = format!("resolveHandle?id={}", urlencoding::encode(handle_id));
let response: ResolveHandleResponse =
self.deserialized_response(&endpoint, Method::GET).await?;
Ok(response)
}
async fn fetch_attachment_data(
&mut self,
guid: &str,
@@ -394,8 +443,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
None => "updates".to_string(),
};
let uri = self
.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
let uri = self.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
loop {
log::debug!("Connecting to websocket: {:?}", uri);
@@ -426,18 +474,20 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
log::debug!("Websocket request: {:?}", request);
let mut should_retry = true; // retry once after authenticating.
let should_retry = true; // retry once after authenticating.
match connect_async(request).await.map_err(Error::from) {
Ok((socket, response)) => {
log::debug!("Websocket connected: {:?}", response.status());
break Ok(WebsocketEventSocket::new(socket))
break Ok(WebsocketEventSocket::new(socket));
}
Err(e) => match &e {
Error::ClientError(ce) => match ce.as_str() {
"HTTP error: 401 Unauthorized" | "Unauthorized" => {
// Try to authenticate
if let Some(credentials) = &self.auth_store.get_credentials().await {
log::warn!("Websocket connection failed, attempting to authenticate");
log::warn!(
"Websocket connection failed, attempting to authenticate"
);
let new_token = self.authenticate(credentials.clone()).await?;
self.auth_store.set_token(new_token.to_string()).await;
@@ -473,16 +523,44 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
.build();
let client = Client::builder().build::<_, Body>(https);
HTTPAPIClient { base_url, auth_store, client }
HTTPAPIClient {
base_url,
auth_store,
client,
}
}
fn send_message_request_body(outgoing_message: &OutgoingMessage) -> Body {
#[derive(Serialize)]
struct SendMessageRequest<'a> {
#[serde(rename = "body")]
text: &'a str,
#[serde(rename = "guid", skip_serializing_if = "Option::is_none")]
conversation_id: Option<&'a ConversationID>,
#[serde(rename = "handleIDs", skip_serializing_if = "Option::is_none")]
handle_ids: Option<&'a [String]>,
#[serde(rename = "fileTransferGUIDs")]
file_transfer_guids: &'a Vec<String>,
}
let (conversation_id, handle_ids) = match &outgoing_message.target {
OutgoingMessageTarget::Conversation(conversation_id) => (Some(conversation_id), None),
OutgoingMessageTarget::Handles(handle_ids) => (None, Some(handle_ids.as_slice())),
};
serde_json::to_string(&SendMessageRequest {
text: &outgoing_message.text,
conversation_id,
handle_ids,
file_transfer_guids: &outgoing_message.file_transfer_guids,
})
.unwrap()
.into()
}
fn uri_for_endpoint(&self, endpoint: &str, scheme: Option<&str>) -> Result<Uri, Error> {
let mut parts = self.base_url.clone().into_parts();
let root_path: PathBuf = parts
.path_and_query
.ok_or(Error::URLError)?
.path()
.into();
let root_path: PathBuf = parts.path_and_query.ok_or(Error::URLError)?.path().into();
let path = root_path.join(endpoint);
let path_str = path.to_str().ok_or(Error::URLError)?;
@@ -503,6 +581,18 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
}
}
fn log_transport_error(method: &Method, target: &str, err: &hyper::Error) {
log::error!("HTTP transport error for {} {}: {}", method, target, err);
if format!("{:?}", err).contains("IncompleteMessage") {
log::error!(
"The server closed the connection before a complete response was received for {} {}.",
method,
target
);
}
}
async fn deserialized_response<T: DeserializeOwned>(
&mut self,
endpoint: &str,
@@ -536,15 +626,26 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
T: DeserializeOwned,
{
let response = self
.response_with_body_retry(endpoint, method, body_fn, retry_auth)
.response_with_body_retry(endpoint, method.clone(), body_fn, retry_auth)
.await?;
// Read and parse response body
let body = hyper::body::to_bytes(response.into_body()).await?;
let body = match hyper::body::to_bytes(response.into_body()).await {
Ok(body) => body,
Err(err) => {
Self::log_transport_error(&method, endpoint, &err);
return Err(Error::HTTPError(err));
}
};
let parsed: T = match serde_json::from_slice(&body) {
Ok(result) => Ok(result),
Err(json_err) => {
log::error!("Error deserializing JSON: {:?}", json_err);
log::error!(
"Error deserializing JSON for {} {}: {:?}",
method,
endpoint,
json_err
);
log::error!("Body: {:?}", String::from_utf8_lossy(&body));
// If JSON deserialization fails, try to interpret it as plain text
@@ -567,7 +668,8 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
use hyper::StatusCode;
let uri = self.uri_for_endpoint(endpoint, None)?;
log::debug!("Requesting {:?} {:?}", method, uri);
let uri_string = uri.to_string();
log::debug!("Requesting {} {}", method, uri_string);
let mut build_request = |auth: &Option<String>| {
let body = body_fn();
@@ -581,13 +683,24 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
log::trace!("Obtaining token from auth store");
let token = self.auth_store.get_token().await;
log::trace!("Token: {:?}", token);
log::trace!("Token present: {}", token.is_some());
let request = build_request(&token);
log::trace!("Request: {:?}. Sending request...", request);
log::trace!(
"Sending request: method={} uri={} authenticated={}",
method,
uri_string,
token.is_some()
);
let mut response = self.client.request(request).await?;
log::debug!("-> Response: {:}", response.status());
let mut response = match self.client.request(request).await {
Ok(response) => response,
Err(err) => {
Self::log_transport_error(&method, &uri_string, &err);
return Err(Error::HTTPError(err));
}
};
log::debug!("-> Response: {}", response.status());
match response.status() {
StatusCode::OK => { /* cool */ }
@@ -606,7 +719,19 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
let new_token = self.authenticate(credentials.clone()).await?;
let request = build_request(&Some(new_token.to_string()));
response = self.client.request(request).await?;
log::trace!(
"Retrying request after authentication: method={} uri={} authenticated=true",
method,
uri_string
);
response = match self.client.request(request).await {
Ok(response) => response,
Err(err) => {
Self::log_transport_error(&method, &uri_string, &err);
return Err(Error::HTTPError(err));
}
};
log::debug!("-> Retry response: {}", response.status());
} else {
return Err(Error::ClientError(
"Unauthorized, no credentials provided".into(),

View File

@@ -1,4 +1,7 @@
pub use crate::model::{Conversation, ConversationID, Message, MessageID, OutgoingMessage};
pub use crate::model::{
Conversation, ConversationID, Message, MessageID, OutgoingMessage, ResolveHandleResponse,
SendMessageResponse,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -42,7 +45,13 @@ pub trait APIInterface {
async fn send_message(
&mut self,
outgoing_message: &OutgoingMessage,
) -> Result<Message, Self::Error>;
) -> Result<SendMessageResponse, Self::Error>;
// (GET) /resolveHandle
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error>;
// (GET) /attachment
async fn fetch_attachment_data(
@@ -70,6 +79,12 @@ pub trait APIInterface {
conversation_id: &ConversationID,
) -> Result<(), Self::Error>;
// (GET) /delete
async fn delete_conversation(
&mut self,
conversation_id: &ConversationID,
) -> Result<(), Self::Error>;
// (WS) /updates
async fn open_event_socket(
&mut self,

View File

@@ -0,0 +1,28 @@
use serde::{Deserialize, Serialize};
use super::conversation::ConversationID;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolvedHandle {
pub id: String,
pub name: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum HandleResolutionStatus {
Valid,
Invalid,
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolveHandleResponse {
#[serde(rename = "resolvedHandle")]
pub resolved_handle: ResolvedHandle,
pub status: HandleResolutionStatus,
#[serde(rename = "existingChat")]
pub existing_chat: Option<ConversationID>,
}

View File

@@ -1,7 +1,9 @@
pub mod conversation;
pub mod event;
pub mod handle;
pub mod message;
pub mod outgoing_message;
pub mod send_message_response;
pub mod update;
pub use conversation::Conversation;
@@ -10,8 +12,15 @@ pub use conversation::ConversationID;
pub use message::Message;
pub use message::MessageID;
pub use handle::HandleResolutionStatus;
pub use handle::ResolveHandleResponse;
pub use handle::ResolvedHandle;
pub use outgoing_message::OutgoingMessage;
pub use outgoing_message::OutgoingMessageBuilder;
pub use outgoing_message::OutgoingMessageTarget;
pub use send_message_response::SendMessageResponse;
pub use update::UpdateItem;

View File

@@ -1,23 +1,23 @@
use super::conversation::ConversationID;
use chrono::NaiveDateTime;
use serde::Serialize;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OutgoingMessageTarget {
Conversation(ConversationID),
Handles(Vec<String>),
}
#[derive(Debug, Clone)]
pub struct OutgoingMessage {
#[serde(skip)]
pub guid: Uuid,
#[serde(skip)]
pub date: NaiveDateTime,
#[serde(rename = "body")]
pub text: String,
#[serde(rename = "guid")]
pub conversation_id: ConversationID,
pub target: OutgoingMessageTarget,
#[serde(rename = "fileTransferGUIDs")]
pub file_transfer_guids: Vec<String>,
}
@@ -25,13 +25,27 @@ impl OutgoingMessage {
pub fn builder() -> OutgoingMessageBuilder {
OutgoingMessageBuilder::new()
}
pub fn conversation_id(&self) -> Option<&ConversationID> {
match &self.target {
OutgoingMessageTarget::Conversation(conversation_id) => Some(conversation_id),
OutgoingMessageTarget::Handles(_) => None,
}
}
pub fn handle_ids(&self) -> Option<&[String]> {
match &self.target {
OutgoingMessageTarget::Conversation(_) => None,
OutgoingMessageTarget::Handles(handle_ids) => Some(handle_ids.as_slice()),
}
}
}
#[derive(Default)]
pub struct OutgoingMessageBuilder {
guid: Option<Uuid>,
text: Option<String>,
conversation_id: Option<ConversationID>,
target: Option<OutgoingMessageTarget>,
file_transfer_guids: Option<Vec<String>>,
}
@@ -50,8 +64,18 @@ impl OutgoingMessageBuilder {
self
}
pub fn target(mut self, target: OutgoingMessageTarget) -> Self {
self.target = Some(target);
self
}
pub fn conversation_id(mut self, conversation_id: ConversationID) -> Self {
self.conversation_id = Some(conversation_id);
self.target = Some(OutgoingMessageTarget::Conversation(conversation_id));
self
}
pub fn handle_ids(mut self, handle_ids: Vec<String>) -> Self {
self.target = Some(OutgoingMessageTarget::Handles(handle_ids));
self
}
@@ -64,7 +88,7 @@ impl OutgoingMessageBuilder {
OutgoingMessage {
guid: self.guid.unwrap_or_else(Uuid::new_v4),
text: self.text.unwrap(),
conversation_id: self.conversation_id.unwrap(),
target: self.target.unwrap(),
file_transfer_guids: self.file_transfer_guids.unwrap_or_default(),
date: chrono::Utc::now().naive_utc(),
}

View File

@@ -0,0 +1,12 @@
use serde::Deserialize;
use super::{conversation::ConversationID, message::Message};
#[derive(Debug, Clone, Deserialize)]
pub struct SendMessageResponse {
#[serde(flatten)]
pub message: Message,
#[serde(rename = "conversationGUID")]
pub conversation_id: Option<ConversationID>,
}

View File

@@ -3,7 +3,7 @@ use self::test_client::TestClient;
use crate::APIInterface;
pub mod api_interface {
use crate::model::Conversation;
use crate::model::{Conversation, HandleResolutionStatus, OutgoingMessage};
use super::*;
@@ -28,4 +28,42 @@ pub mod api_interface {
assert_eq!(conversations.len(), 1);
assert_eq!(conversations[0].display_name, test_convo.display_name);
}
#[tokio::test]
async fn test_resolve_handle() {
let mut client = TestClient::new();
let resolved = client.resolve_handle("user@example.com").await.unwrap();
assert_eq!(resolved.resolved_handle.id, "user@example.com");
assert_eq!(resolved.status, HandleResolutionStatus::Valid);
assert_eq!(resolved.existing_chat, None);
}
#[tokio::test]
async fn test_send_message_with_handles() {
let mut client = TestClient::new();
let outgoing_message = OutgoingMessage::builder()
.text("hello".to_string())
.handle_ids(vec!["user@example.com".to_string()])
.build();
let sent = client.send_message(&outgoing_message).await.unwrap();
assert_eq!(sent.message.text, "hello");
assert_eq!(sent.conversation_id, None);
}
#[tokio::test]
async fn test_delete_conversation() {
let mut client = TestClient::new();
let test_convo = Conversation::builder().display_name("Delete Me").build();
client.conversations.push(test_convo.clone());
client.delete_conversation(&test_convo.guid).await.unwrap();
let conversations = client.get_conversations().await.unwrap();
assert!(conversations.is_empty());
}
}

View File

@@ -9,14 +9,17 @@ use crate::{
api::event_socket::{EventSocket, SinkMessage, SocketEvent, SocketUpdate},
api::http_client::Credentials,
model::{
Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage,
UpdateItem,
Conversation, ConversationID, Event, HandleResolutionStatus, JwtToken, Message, MessageID,
OutgoingMessage, OutgoingMessageTarget, ResolveHandleResponse, ResolvedHandle,
SendMessageResponse,
},
};
use bytes::Bytes;
use futures_util::sink::drain;
use futures_util::stream::BoxStream;
use futures_util::Sink;
use futures_util::SinkExt;
use futures_util::StreamExt;
pub struct TestClient {
@@ -63,13 +66,18 @@ impl EventSocket for TestEventSocket {
impl Sink<SinkMessage, Error = Self::Error>,
) {
(
futures_util::stream::iter(self.events.into_iter().map(Ok)).boxed(),
futures_util::sink::sink(),
futures_util::stream::iter(
self.events
.into_iter()
.map(|event| Ok(SocketEvent::Update(event))),
)
.boxed(),
drain().sink_map_err(|err| match err {}),
)
}
async fn raw_updates(self) -> Self::UpdateStream {
let results: Vec<Result<Vec<UpdateItem>, TestError>> = vec![];
let results: Vec<Result<SocketUpdate, TestError>> = vec![];
futures_util::stream::iter(results.into_iter()).boxed()
}
}
@@ -94,9 +102,9 @@ impl APIInterface for TestClient {
async fn get_messages(
&mut self,
conversation_id: &ConversationID,
limit: Option<u32>,
before: Option<MessageID>,
after: Option<MessageID>,
_limit: Option<u32>,
_before: Option<MessageID>,
_after: Option<MessageID>,
) -> Result<Vec<Message>, Self::Error> {
if let Some(messages) = self.messages.get(conversation_id) {
return Ok(messages.clone());
@@ -108,18 +116,42 @@ impl APIInterface for TestClient {
async fn send_message(
&mut self,
outgoing_message: &OutgoingMessage,
) -> Result<Message, Self::Error> {
) -> Result<SendMessageResponse, Self::Error> {
let message = Message::builder()
.guid(Uuid::new_v4().to_string())
.text(outgoing_message.text.clone())
.date(OffsetDateTime::now_utc())
.build();
let conversation_id = match &outgoing_message.target {
OutgoingMessageTarget::Conversation(conversation_id) => {
self.messages
.entry(outgoing_message.conversation_id.clone())
.entry(conversation_id.clone())
.or_insert(vec![])
.push(message.clone());
Ok(message)
None
}
OutgoingMessageTarget::Handles(_) => None,
};
Ok(SendMessageResponse {
message,
conversation_id,
})
}
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error> {
Ok(ResolveHandleResponse {
resolved_handle: ResolvedHandle {
id: handle_id.to_string(),
name: None,
},
status: HandleResolutionStatus::Valid,
existing_chat: None,
})
}
async fn open_event_socket(
@@ -131,17 +163,17 @@ impl APIInterface for TestClient {
async fn fetch_attachment_data(
&mut self,
guid: &str,
preview: bool,
_guid: &str,
_preview: bool,
) -> Result<Self::ResponseStream, Self::Error> {
Ok(futures_util::stream::iter(vec![Ok(Bytes::from_static(b"test"))]).boxed())
}
async fn upload_attachment<R>(
&mut self,
data: tokio::io::BufReader<R>,
filename: &str,
size: u64,
_data: tokio::io::BufReader<R>,
_filename: &str,
_size: u64,
) -> Result<String, Self::Error>
where
R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
@@ -151,8 +183,23 @@ impl APIInterface for TestClient {
async fn mark_conversation_as_read(
&mut self,
conversation_id: &ConversationID,
_conversation_id: &ConversationID,
) -> Result<(), Self::Error> {
Ok(())
}
async fn delete_conversation(
&mut self,
conversation_id: &ConversationID,
) -> Result<(), Self::Error> {
let previous_len = self.conversations.len();
self.conversations.retain(|c| &c.guid != conversation_id);
self.messages.remove(conversation_id);
if self.conversations.len() == previous_len {
return Err(TestError::ConversationNotFound);
}
Ok(())
}
}

View File

@@ -23,4 +23,3 @@ fn main() {
println!("cargo:rerun-if-changed={}", KORDOPHONE_XML);
}

View File

@@ -2,4 +2,3 @@ mod platform;
mod worker;
pub use worker::{spawn_worker, ChatMessage, ConversationSummary, Event, Request};

View File

@@ -114,11 +114,23 @@ impl DaemonClient for DBusClient {
.collect())
}
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
fn reply(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
let attachment_guids: Vec<&str> = vec![];
let outgoing_id = KordophoneRepository::send_message(
let outgoing_id =
KordophoneRepository::reply(&self.proxy(), &conversation_id, &text, attachment_guids)?;
Ok(Some(outgoing_id))
}
fn new_conversation(
&mut self,
handle_ids: Vec<String>,
text: String,
) -> Result<Option<String>> {
let attachment_guids: Vec<&str> = vec![];
let handle_ids: Vec<&str> = handle_ids.iter().map(String::as_str).collect();
let outgoing_id = KordophoneRepository::new_conversation(
&self.proxy(),
&conversation_id,
handle_ids,
&text,
attachment_guids,
)?;
@@ -186,4 +198,3 @@ impl DaemonClient for DBusClient {
Ok(())
}
}

View File

@@ -95,8 +95,14 @@ impl XpcClient {
impl DaemonClient for XpcClient {
fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<ConversationSummary>> {
let mut args = HashMap::new();
args.insert(Self::key("limit"), Message::String(Self::key(&limit.to_string())));
args.insert(Self::key("offset"), Message::String(Self::key(&offset.to_string())));
args.insert(
Self::key("limit"),
Message::String(Self::key(&limit.to_string())),
);
args.insert(
Self::key("offset"),
Message::String(Self::key(&offset.to_string())),
);
let reply = self
.transport
@@ -112,7 +118,9 @@ impl DaemonClient for XpcClient {
let mut conversations = Vec::new();
for item in items {
let Message::Dictionary(conv) = item else { continue };
let Message::Dictionary(conv) = item else {
continue;
};
let id = Self::get_string(conv, "guid").unwrap_or_default();
let display_name = Self::get_string(conv, "display_name").unwrap_or_default();
let preview = Self::get_string(conv, "last_message_preview").unwrap_or_default();
@@ -162,7 +170,10 @@ impl DaemonClient for XpcClient {
Message::String(Self::key(&conversation_id)),
);
if let Some(last) = last_message_id {
args.insert(Self::key("last_message_id"), Message::String(Self::key(&last)));
args.insert(
Self::key("last_message_id"),
Message::String(Self::key(&last)),
);
}
let reply = self
@@ -178,7 +189,9 @@ impl DaemonClient for XpcClient {
let mut messages = Vec::new();
for item in items {
let Message::Dictionary(msg) = item else { continue };
let Message::Dictionary(msg) = item else {
continue;
};
messages.push(ChatMessage {
sender: Self::get_string(msg, "sender").unwrap_or_default(),
text: Self::get_string(msg, "text").unwrap_or_default(),
@@ -188,7 +201,7 @@ impl DaemonClient for XpcClient {
Ok(messages)
}
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
fn reply(&mut self, conversation_id: String, text: String) -> Result<Option<String>> {
let mut args = HashMap::new();
args.insert(
Self::key("conversation_id"),
@@ -198,7 +211,34 @@ impl DaemonClient for XpcClient {
let reply = self
.transport
.send_with_reply(Self::request("SendMessage", Some(args)));
.send_with_reply(Self::request("Reply", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected send response");
};
Ok(Self::get_string(&map, "uuid"))
}
fn new_conversation(
&mut self,
handle_ids: Vec<String>,
text: String,
) -> Result<Option<String>> {
let mut args = HashMap::new();
args.insert(
Self::key("handle_ids"),
Message::Array(
handle_ids
.into_iter()
.map(|handle_id| Message::String(Self::key(&handle_id)))
.collect(),
),
);
args.insert(Self::key("text"), Message::String(Self::key(&text)));
let reply = self
.transport
.send_with_reply(Self::request("NewConversation", Some(args)));
let Message::Dictionary(map) = reply else {
anyhow::bail!("Unexpected send response");
};
@@ -230,4 +270,3 @@ impl DaemonClient for XpcClient {
Ok(())
}
}

View File

@@ -21,4 +21,3 @@ pub(crate) fn new_daemon_client() -> Result<Box<dyn DaemonClient>> {
anyhow::bail!("Unsupported platform")
}
}

View File

@@ -21,10 +21,23 @@ pub struct ChatMessage {
pub enum Request {
RefreshConversations,
RefreshMessages { conversation_id: String },
SendMessage { conversation_id: String, text: String },
MarkRead { conversation_id: String },
SyncConversation { conversation_id: String },
RefreshMessages {
conversation_id: String,
},
Reply {
conversation_id: String,
text: String,
},
NewConversation {
handle_ids: Vec<String>,
text: String,
},
MarkRead {
conversation_id: String,
},
SyncConversation {
conversation_id: String,
},
}
pub enum Event {
@@ -33,14 +46,18 @@ pub enum Event {
conversation_id: String,
messages: Vec<ChatMessage>,
},
MessageSent {
conversation_id: String,
MessageQueued {
conversation_id: Option<String>,
outgoing_id: Option<String>,
},
MarkedRead,
ConversationSyncTriggered { conversation_id: String },
ConversationSyncTriggered {
conversation_id: String,
},
ConversationsUpdated,
MessagesUpdated { conversation_id: String },
MessagesUpdated {
conversation_id: String,
},
UpdateStreamReconnected,
Error(String),
}
@@ -59,29 +76,37 @@ pub fn spawn_worker(
};
if let Err(e) = client.install_signal_handlers(event_tx.clone()) {
let _ = event_tx.send(Event::Error(format!("Failed to install daemon signals: {e}")));
let _ = event_tx.send(Event::Error(format!(
"Failed to install daemon signals: {e}"
)));
}
loop {
match request_rx.recv_timeout(Duration::from_millis(100)) {
Ok(req) => {
let res = match req {
Request::RefreshConversations => client
.get_conversations(200, 0)
.map(Event::Conversations),
Request::RefreshConversations => {
client.get_conversations(200, 0).map(Event::Conversations)
}
Request::RefreshMessages { conversation_id } => client
.get_messages(conversation_id.clone(), None)
.map(|messages| Event::Messages {
conversation_id,
messages,
}),
Request::SendMessage {
Request::Reply {
conversation_id,
text,
} => client
.send_message(conversation_id.clone(), text)
.map(|outgoing_id| Event::MessageSent {
conversation_id,
.reply(conversation_id.clone(), text)
.map(|outgoing_id| Event::MessageQueued {
conversation_id: Some(conversation_id),
outgoing_id,
}),
Request::NewConversation { handle_ids, text } => client
.new_conversation(handle_ids, text)
.map(|outgoing_id| Event::MessageQueued {
conversation_id: None,
outgoing_id,
}),
Request::MarkRead { conversation_id } => client
@@ -119,7 +144,9 @@ pub(crate) trait DaemonClient {
conversation_id: String,
last_message_id: Option<String>,
) -> Result<Vec<ChatMessage>>;
fn send_message(&mut self, conversation_id: String, text: String) -> Result<Option<String>>;
fn reply(&mut self, conversation_id: String, text: String) -> Result<Option<String>>;
fn new_conversation(&mut self, handle_ids: Vec<String>, text: String)
-> Result<Option<String>>;
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>;
fn sync_conversation(&mut self, conversation_id: String) -> Result<()>;
fn install_signal_handlers(&mut self, _event_tx: mpsc::Sender<Event>) -> Result<()> {
@@ -130,4 +157,3 @@ pub(crate) trait DaemonClient {
Ok(())
}
}

View File

@@ -27,4 +27,3 @@ fn main() {
println!("cargo:rerun-if-changed={}", KORDOPHONE_XML);
}

View File

@@ -83,7 +83,7 @@
</arg>
</method>
<method name="SendMessage">
<method name="Reply">
<arg type="s" name="conversation_id" direction="in"/>
<arg type="s" name="text" direction="in"/>
<arg type="as" name="attachment_guids" direction="in"/>
@@ -91,9 +91,28 @@
<arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Sends a message to the server. Returns the outgoing message ID.
value="Replies to an existing conversation. Returns the outgoing message ID.
Arguments:
- conversation_id: The ID of the conversation to send the message to.
- conversation_id: The ID of the conversation to reply to.
- text: The text of the message to send.
- attachment_guids: The GUIDs of the attachments to send.
Returns:
- outgoing_message_id: The ID of the outgoing message.
"/>
</method>
<method name="NewConversation">
<arg type="as" name="handle_ids" direction="in"/>
<arg type="s" name="text" direction="in"/>
<arg type="as" name="attachment_guids" direction="in"/>
<arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Sends a message to a new conversation identified by resolved handles.
Arguments:
- handle_ids: The resolved handles for the new conversation.
- text: The text of the message to send.
- attachment_guids: The GUIDs of the attachments to send.

View File

@@ -53,13 +53,21 @@ pub enum Event {
/// - last_message_id: (optional) The ID of the last message to get. If None, all messages are returned.
GetMessages(String, Option<String>, Reply<Vec<Message>>),
/// Enqueues a message to be sent to the server.
/// Enqueues a reply to an existing conversation.
/// Parameters:
/// - conversation_id: The ID of the conversation to send the message to.
/// - text: The text of the message to send.
/// - attachment_guids: The GUIDs of the attachments to send.
/// - reply: The outgoing message ID (not the server-assigned message ID).
SendMessage(String, String, Vec<String>, Reply<Uuid>),
Reply(String, String, Vec<String>, Reply<Uuid>),
/// Enqueues a message to one or more resolved handles.
/// Parameters:
/// - handle_ids: The resolved handle IDs for the new conversation.
/// - text: The text of the message to send.
/// - attachment_guids: The GUIDs of the attachments to send.
/// - reply: The outgoing message ID (not the server-assigned message ID).
NewConversation(Vec<String>, String, Vec<String>, Reply<Uuid>),
/// Notifies the daemon that a message has been sent.
/// Parameters:

View File

@@ -29,7 +29,7 @@ use kordophone_db::{
use kordophone::api::http_client::HTTPAPIClient;
use kordophone::api::APIInterface;
use kordophone::model::outgoing_message::OutgoingMessage;
use kordophone::model::outgoing_message::{OutgoingMessage, OutgoingMessageTarget};
use kordophone::model::{ConversationID, MessageID};
mod update_monitor;
@@ -330,10 +330,14 @@ impl Daemon {
let _ = reply.send(());
}
Event::SendMessage(conversation_id, text, attachment_guids, reply) => {
Event::Reply(conversation_id, text, attachment_guids, reply) => {
let conversation_id = conversation_id.clone();
let uuid = self
.enqueue_outgoing_message(text, conversation_id.clone(), attachment_guids)
.enqueue_outgoing_message(
text,
OutgoingMessageTarget::Conversation(conversation_id.clone()),
attachment_guids,
)
.await;
let _ = reply.send(uuid);
@@ -344,12 +348,52 @@ impl Daemon {
.unwrap();
}
Event::NewConversation(handle_ids, text, attachment_guids, reply) => {
let uuid = self
.enqueue_outgoing_message(
text,
OutgoingMessageTarget::Handles(handle_ids),
attachment_guids,
)
.await;
let _ = reply.send(uuid);
}
Event::MessageSent(message, outgoing_message, conversation_id) => {
log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id);
let conversation_created = match self
.ensure_conversation_exists_for_sent_message(
&conversation_id,
&outgoing_message,
&message,
)
.await
{
Ok(created) => created,
Err(e) => {
log::error!(
target: target::EVENT,
"Failed to ensure conversation {} exists for sent message {}: {}",
conversation_id,
message.id,
e
);
return;
}
};
if conversation_created {
self.signal_sender
.send(Signal::ConversationsUpdated)
.await
.unwrap();
}
// Insert the message into the database.
log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id);
self.database
if let Err(e) = self
.database
.lock()
.await
.with_repository(|r| {
@@ -363,13 +407,24 @@ impl Daemon {
)
})
.await
.unwrap();
{
log::error!(
target: target::EVENT,
"Failed to persist sent message {} for conversation {}: {}",
message.id,
conversation_id,
e
);
return;
}
// Remove from outgoing messages.
log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid);
for messages in self.outgoing_messages.values_mut() {
messages.retain(|m| m.guid != outgoing_message.guid);
}
self.outgoing_messages
.get_mut(&conversation_id)
.map(|messages| messages.retain(|m| m.guid != outgoing_message.guid));
.retain(|_, messages| !messages.is_empty());
// Send message updated signal.
self.signal_sender
@@ -477,9 +532,8 @@ impl Daemon {
.await;
// Convert DB messages to daemon model, substituting local_id when an alias exists.
let mut result: Vec<Message> = Vec::with_capacity(
db_messages.len() + outgoing_messages.len(),
);
let mut result: Vec<Message> =
Vec::with_capacity(db_messages.len() + outgoing_messages.len());
for m in db_messages.into_iter() {
let server_id = m.id.clone();
let mut dm: Message = m.into();
@@ -518,24 +572,87 @@ impl Daemon {
result
}
async fn ensure_conversation_exists_for_sent_message(
&mut self,
conversation_id: &ConversationID,
outgoing_message: &OutgoingMessage,
message: &Message,
) -> Result<bool> {
let conversation_exists = self
.database
.lock()
.await
.with_repository(|r| r.get_conversation_by_guid(conversation_id))
.await?
.is_some();
if conversation_exists {
return Ok(false);
}
let participants = Self::participants_for_outgoing_message(outgoing_message);
let mut builder = Conversation::builder()
.guid(conversation_id)
.date(message.date)
.unread_count(0)
.participants(participants);
if !message.text.trim().is_empty() {
builder = builder.last_message_preview(&message.text);
}
let conversation = builder.build();
log::info!(
target: target::EVENT,
"Creating local conversation {} from sent message {}",
conversation_id,
message.id
);
self.database
.lock()
.await
.with_repository(|r| r.insert_conversation(conversation))
.await?;
Ok(true)
}
fn participants_for_outgoing_message(outgoing_message: &OutgoingMessage) -> Vec<DbParticipant> {
let handle_ids = match &outgoing_message.target {
OutgoingMessageTarget::Conversation(_) => return Vec::new(),
OutgoingMessageTarget::Handles(handle_ids) => handle_ids,
};
let mut contact_resolver = ContactResolver::new(DefaultContactResolverBackend::default());
handle_ids
.iter()
.map(|handle| DbParticipant::Remote {
handle: handle.clone(),
contact_id: contact_resolver.resolve_contact_id(handle),
})
.collect()
}
async fn enqueue_outgoing_message(
&mut self,
text: String,
conversation_id: String,
target: OutgoingMessageTarget,
attachment_guids: Vec<String>,
) -> Uuid {
let conversation_id = conversation_id.clone();
let outgoing_message = OutgoingMessage::builder()
.text(text)
.conversation_id(conversation_id.clone())
.target(target)
.file_transfer_guids(attachment_guids)
.build();
// Keep a record of this so we can provide a consistent model to the client.
if let Some(conversation_id) = outgoing_message.conversation_id().cloned() {
// Keep a record of replies so we can provide a consistent model to the client.
self.outgoing_messages
.entry(conversation_id)
.or_insert(vec![])
.push(outgoing_message.clone());
}
let guid = outgoing_message.guid.clone();
self.post_office_sink

View File

@@ -8,6 +8,7 @@ use tokio_condvar::Condvar;
use crate::daemon::events::Event as DaemonEvent;
use kordophone::api::APIInterface;
use kordophone::model::outgoing_message::OutgoingMessage;
use kordophone::model::OutgoingMessageTarget;
use anyhow::Result;
@@ -102,10 +103,29 @@ impl<C: APIInterface, F: AsyncFnMut() -> Result<C>> PostOffice<C, F> {
Ok(sent_message) => {
log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid);
let conversation_id = message.conversation_id.clone();
let event =
DaemonEvent::MessageSent(sent_message.into(), message, conversation_id);
let conversation_id = sent_message.conversation_id.clone().or_else(|| {
match &message.target {
OutgoingMessageTarget::Conversation(conversation_id) => {
Some(conversation_id.clone())
}
OutgoingMessageTarget::Handles(_) => None,
}
});
if let Some(conversation_id) = conversation_id {
let event = DaemonEvent::MessageSent(
sent_message.message.into(),
message,
conversation_id,
);
event_sink.send(event).await.unwrap();
} else {
log::error!(
target: target::POST_OFFICE,
"Message sent but no conversation id was available for {}",
message.guid
);
}
}
Err(e) => {

View File

@@ -388,13 +388,23 @@ impl DbusRepository for DBusAgent {
self.send_event_sync(Event::DeleteAllConversations)
}
fn send_message(
fn reply(
&mut self,
conversation_id: String,
text: String,
attachment_guids: Vec<String>,
) -> Result<String, MethodErr> {
self.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
self.send_event_sync(|r| Event::Reply(conversation_id, text, attachment_guids, r))
.map(|uuid| uuid.to_string())
}
fn new_conversation(
&mut self,
handle_ids: Vec<String>,
text: String,
attachment_guids: Vec<String>,
) -> Result<String, MethodErr> {
self.send_event_sync(|r| Event::NewConversation(handle_ids, text, attachment_guids, r))
.map(|uuid| uuid.to_string())
}

View File

@@ -15,10 +15,16 @@ pub struct DispatchResult {
impl DispatchResult {
pub fn new(message: Message) -> Self {
Self { message, cleanup: None }
Self {
message,
cleanup: None,
}
}
pub fn with_cleanup<T: Any + Send + 'static>(message: Message, cleanup: T) -> Self {
Self { message, cleanup: Some(Box::new(cleanup)) }
Self {
message,
cleanup: Some(Box::new(cleanup)),
}
}
}

View File

@@ -105,7 +105,12 @@ pub async fn dispatch(
.and_then(|m| dict_get_str(m, "conversation_id"))
{
Some(id) => id,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing conversation_id",
))
}
};
match agent
.send_event(|r| Event::SyncConversation(conversation_id, r))
@@ -122,7 +127,12 @@ pub async fn dispatch(
.and_then(|m| dict_get_str(m, "conversation_id"))
{
Some(id) => id,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing conversation_id",
))
}
};
match agent
.send_event(|r| Event::MarkConversationAsRead(conversation_id, r))
@@ -137,11 +147,21 @@ pub async fn dispatch(
"GetMessages" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let conversation_id = match dict_get_str(args, "conversation_id") {
Some(id) => id,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing conversation_id",
))
}
};
let last_message_id = dict_get_str(args, "last_message_id");
match agent
@@ -158,11 +178,8 @@ pub async fn dispatch(
dict_put_str(&mut m, "sender", &msg.sender.display_name());
// Include attachment GUIDs for the client to resolve/download
let attachment_guids: Vec<String> = msg
.attachments
.iter()
.map(|a| a.guid.clone())
.collect();
let attachment_guids: Vec<String> =
msg.attachments.iter().map(|a| a.guid.clone()).collect();
m.insert(cstr("attachment_guids"), array_from_strs(attachment_guids));
// Full attachments array with metadata (mirrors DBus fields)
@@ -193,12 +210,23 @@ pub async fn dispatch(
if let Some(attribution_info) = &metadata.attribution_info {
let mut attribution_map: XpcMap = HashMap::new();
if let Some(width) = attribution_info.width {
dict_put_i64_as_str(&mut attribution_map, "width", width as i64);
dict_put_i64_as_str(
&mut attribution_map,
"width",
width as i64,
);
}
if let Some(height) = attribution_info.height {
dict_put_i64_as_str(&mut attribution_map, "height", height as i64);
dict_put_i64_as_str(
&mut attribution_map,
"height",
height as i64,
);
}
metadata_map.insert(cstr("attribution_info"), Message::Dictionary(attribution_map));
metadata_map.insert(
cstr("attribution_info"),
Message::Dictionary(attribution_map),
);
}
if !metadata_map.is_empty() {
a.insert(cstr("metadata"), Message::Dictionary(metadata_map));
@@ -226,15 +254,25 @@ pub async fn dispatch(
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
},
// SendMessage
"SendMessage" => {
// Reply
"Reply" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let conversation_id = match dict_get_str(args, "conversation_id") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing conversation_id",
))
}
};
let text = dict_get_str(args, "text").unwrap_or_default();
let attachment_guids: Vec<String> = match args.get(&cstr("attachment_guids")) {
@@ -248,12 +286,64 @@ pub async fn dispatch(
_ => Vec::new(),
};
match agent
.send_event(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
.send_event(|r| Event::Reply(conversation_id, text, attachment_guids, r))
.await
{
Ok(uuid) => {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "SendMessageResponse");
dict_put_str(&mut reply, "type", "ReplyResponse");
dict_put_str(&mut reply, "uuid", &uuid.to_string());
DispatchResult::new(Message::Dictionary(reply))
}
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
}
}
// NewConversation
"NewConversation" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let handle_ids: Vec<String> = match args.get(&cstr("handle_ids")) {
Some(Message::Array(arr)) => arr
.iter()
.filter_map(|m| match m {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
.collect(),
_ => Vec::new(),
};
if handle_ids.is_empty() {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing handle_ids",
));
}
let text = dict_get_str(args, "text").unwrap_or_default();
let attachment_guids: Vec<String> = match args.get(&cstr("attachment_guids")) {
Some(Message::Array(arr)) => arr
.iter()
.filter_map(|m| match m {
Message::String(s) => Some(s.to_string_lossy().into_owned()),
_ => None,
})
.collect(),
_ => Vec::new(),
};
match agent
.send_event(|r| Event::NewConversation(handle_ids, text, attachment_guids, r))
.await
{
Ok(uuid) => {
let mut reply: XpcMap = HashMap::new();
dict_put_str(&mut reply, "type", "NewConversationResponse");
dict_put_str(&mut reply, "uuid", &uuid.to_string());
DispatchResult::new(Message::Dictionary(reply))
}
@@ -265,11 +355,21 @@ pub async fn dispatch(
"GetAttachmentInfo" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing attachment_id",
))
}
};
match agent
.send_event(|r| Event::GetAttachment(attachment_id, r))
@@ -308,11 +408,21 @@ pub async fn dispatch(
"OpenAttachmentFd" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing attachment_id",
))
}
};
let preview = dict_get_str(args, "preview")
.map(|s| s == "true")
@@ -335,9 +445,14 @@ pub async fn dispatch(
dict_put_str(&mut reply, "type", "OpenAttachmentFdResponse");
reply.insert(cstr("fd"), Message::Fd(fd));
DispatchResult { message: Message::Dictionary(reply), cleanup: Some(Box::new(file)) }
DispatchResult {
message: Message::Dictionary(reply),
cleanup: Some(Box::new(file)),
}
}
Err(e) => {
DispatchResult::new(make_error_reply("OpenFailed", &format!("{}", e)))
}
Err(e) => DispatchResult::new(make_error_reply("OpenFailed", &format!("{}", e))),
}
}
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
@@ -348,11 +463,21 @@ pub async fn dispatch(
"DownloadAttachment" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let attachment_id = match dict_get_str(args, "attachment_id") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing attachment_id",
))
}
};
let preview = dict_get_str(args, "preview")
.map(|s| s == "true")
@@ -371,11 +496,18 @@ pub async fn dispatch(
use std::path::PathBuf;
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let path = match dict_get_str(args, "path") {
Some(v) => v,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing path")),
None => {
return DispatchResult::new(make_error_reply("InvalidRequest", "Missing path"))
}
};
match agent
.send_event(|r| Event::UploadAttachment(PathBuf::from(path), r))
@@ -413,7 +545,12 @@ pub async fn dispatch(
"UpdateSettings" => {
let args = match get_dictionary_field(root, "arguments") {
Some(a) => a,
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
None => {
return DispatchResult::new(make_error_reply(
"InvalidRequest",
"Missing arguments",
))
}
};
let server_url = dict_get_str(args, "server_url");
let username = dict_get_str(args, "username");

View File

@@ -5,10 +5,10 @@ use kordophone::api::InMemoryAuthenticationStore;
use kordophone::APIInterface;
use crate::printers::{ConversationPrinter, MessagePrinter};
use anyhow::Result;
use anyhow::{bail, Result};
use clap::Subcommand;
use kordophone::model::event::EventData;
use kordophone::model::outgoing_message::OutgoingMessage;
use kordophone::model::{HandleResolutionStatus, OutgoingMessage, OutgoingMessageTarget};
use futures_util::StreamExt;
@@ -47,14 +47,29 @@ pub enum Commands {
/// Prints all raw updates from the server.
RawUpdates,
/// Sends a message to the server.
SendMessage {
/// Resolves an address to a canonical handle.
#[command(alias = "resolve")]
ResolveHandle { address: String },
/// Replies to an existing conversation.
#[command(alias = "send-message")]
Reply {
conversation_id: String,
message: String,
},
/// Starts a new message to one or more resolved handles.
New {
#[arg(long = "handle", required = true)]
handle_ids: Vec<String>,
message: String,
},
/// Marks a conversation as read.
Mark { conversation_id: String },
/// Deletes a conversation from the server.
Delete { conversation_id: String },
}
impl Commands {
@@ -66,13 +81,21 @@ impl Commands {
Commands::Messages { conversation_id } => client.print_messages(conversation_id).await,
Commands::RawUpdates => client.print_raw_updates().await,
Commands::Events => client.print_events().await,
Commands::SendMessage {
Commands::ResolveHandle { address } => client.resolve_handle(address).await,
Commands::Reply {
conversation_id,
message,
} => client.send_message(conversation_id, message).await,
} => client.reply(conversation_id, message).await,
Commands::New {
handle_ids,
message,
} => client.new_message(handle_ids, message).await,
Commands::Mark { conversation_id } => {
client.mark_conversation_as_read(conversation_id).await
}
Commands::Delete { conversation_id } => {
client.delete_conversation(conversation_id).await
}
}
}
}
@@ -146,8 +169,7 @@ impl ClientCli {
loop {
match stream.next().await.unwrap() {
Ok(update) => {
match update {
Ok(update) => match update {
SocketUpdate::Update(updates) => {
for update in updates {
println!("Got update: {:?}", update);
@@ -156,7 +178,6 @@ impl ClientCli {
SocketUpdate::Pong => {
println!("Pong");
}
}
},
Err(e) => {
@@ -169,20 +190,91 @@ impl ClientCli {
Ok(())
}
pub async fn send_message(&mut self, conversation_id: String, message: String) -> Result<()> {
pub async fn resolve_handle(&mut self, address: String) -> Result<()> {
let response = self.api.resolve_handle(&address).await?;
let status = match response.status {
HandleResolutionStatus::Valid => "valid",
HandleResolutionStatus::Invalid => "invalid",
HandleResolutionStatus::Unknown => "unknown",
};
println!("Resolved handle: {}", response.resolved_handle.id);
if let Some(name) = response.resolved_handle.name {
println!("Name: {}", name);
}
println!("Status: {}", status);
if let Some(conversation_id) = response.existing_chat {
println!("Existing conversation: {}", conversation_id);
}
Ok(())
}
async fn send_message(&mut self, target: OutgoingMessageTarget, message: String) -> Result<()> {
let outgoing_message = OutgoingMessage::builder()
.conversation_id(conversation_id)
.target(target)
.text(message)
.build();
let message = self.api.send_message(&outgoing_message).await?;
println!("Message sent: {}", message.guid);
let response = self.api.send_message(&outgoing_message).await?;
if let Some(conversation_id) = response.conversation_id {
println!(
"Message sent: {} conversation: {}",
response.message.guid, conversation_id
);
} else {
println!("Message sent: {}", response.message.guid);
}
Ok(())
}
async fn resolve_handle_ids(&mut self, handle_ids: Vec<String>) -> Result<Vec<String>> {
let mut resolved_handle_ids = Vec::with_capacity(handle_ids.len());
for handle_id in handle_ids {
let response = self.api.resolve_handle(&handle_id).await?;
match response.status {
HandleResolutionStatus::Valid => {
resolved_handle_ids.push(response.resolved_handle.id);
}
HandleResolutionStatus::Invalid => {
bail!("Handle '{}' is not iMessage-capable.", handle_id);
}
HandleResolutionStatus::Unknown => {
bail!("Handle '{}' could not be resolved.", handle_id);
}
}
}
Ok(resolved_handle_ids)
}
pub async fn reply(&mut self, conversation_id: String, message: String) -> Result<()> {
self.send_message(
OutgoingMessageTarget::Conversation(conversation_id),
message,
)
.await
}
pub async fn new_message(&mut self, handle_ids: Vec<String>, message: String) -> Result<()> {
let resolved_handle_ids = self.resolve_handle_ids(handle_ids).await?;
self.send_message(OutgoingMessageTarget::Handles(resolved_handle_ids), message)
.await
}
pub async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
self.api.mark_conversation_as_read(&conversation_id).await?;
println!("Conversation marked as read: {}", conversation_id);
Ok(())
}
pub async fn delete_conversation(&mut self, conversation_id: String) -> Result<()> {
self.api.delete_conversation(&conversation_id).await?;
println!("Conversation deleted: {}", conversation_id);
Ok(())
}
}

View File

@@ -109,15 +109,20 @@ impl DaemonInterface for DBusDaemonInterface {
Ok(())
}
async fn enqueue_outgoing_message(
&mut self,
conversation_id: String,
text: String,
) -> Result<()> {
async fn reply(&mut self, conversation_id: String, text: String) -> Result<()> {
let attachment_guids: Vec<&str> = vec![];
let outgoing_message_id = KordophoneRepository::send_message(
let outgoing_message_id =
KordophoneRepository::reply(&self.proxy(), &conversation_id, &text, attachment_guids)?;
println!("Outgoing message ID: {}", outgoing_message_id);
Ok(())
}
async fn new_conversation(&mut self, handle_ids: Vec<String>, text: String) -> Result<()> {
let attachment_guids: Vec<&str> = vec![];
let handle_ids: Vec<&str> = handle_ids.iter().map(String::as_str).collect();
let outgoing_message_id = KordophoneRepository::new_conversation(
&self.proxy(),
&conversation_id,
handle_ids,
&text,
attachment_guids,
)?;

View File

@@ -21,11 +21,8 @@ pub trait DaemonInterface {
conversation_id: String,
last_message_id: Option<String>,
) -> Result<()>;
async fn enqueue_outgoing_message(
&mut self,
conversation_id: String,
text: String,
) -> Result<()>;
async fn reply(&mut self, conversation_id: String, text: String) -> Result<()>;
async fn new_conversation(&mut self, handle_ids: Vec<String>, text: String) -> Result<()>;
async fn wait_for_signals(&mut self) -> Result<()>;
async fn config(&mut self, cmd: ConfigCommands) -> Result<()>;
async fn delete_all_conversations(&mut self) -> Result<()>;
@@ -73,11 +70,12 @@ impl DaemonInterface for StubDaemonInterface {
"Daemon interface not implemented on this platform"
))
}
async fn enqueue_outgoing_message(
&mut self,
_conversation_id: String,
_text: String,
) -> Result<()> {
async fn reply(&mut self, _conversation_id: String, _text: String) -> Result<()> {
Err(anyhow::anyhow!(
"Daemon interface not implemented on this platform"
))
}
async fn new_conversation(&mut self, _handle_ids: Vec<String>, _text: String) -> Result<()> {
Err(anyhow::anyhow!(
"Daemon interface not implemented on this platform"
))
@@ -161,12 +159,20 @@ pub enum Commands {
/// Deletes all conversations.
DeleteAllConversations,
/// Enqueues an outgoing message to be sent to a conversation.
SendMessage {
/// Replies to an existing conversation.
#[command(alias = "send-message")]
Reply {
conversation_id: String,
text: String,
},
/// Starts a new conversation with one or more resolved handles.
New {
#[arg(long = "handle", required = true)]
handle_ids: Vec<String>,
text: String,
},
/// Downloads an attachment from the server to the attachment store. Returns the path to the attachment.
DownloadAttachment { attachment_id: String },
@@ -208,10 +214,11 @@ impl Commands {
.await
}
Commands::DeleteAllConversations => client.delete_all_conversations().await,
Commands::SendMessage {
Commands::Reply {
conversation_id,
text,
} => client.enqueue_outgoing_message(conversation_id, text).await,
} => client.reply(conversation_id, text).await,
Commands::New { handle_ids, text } => client.new_conversation(handle_ids, text).await,
Commands::UploadAttachment { path } => client.upload_attachment(path).await,
Commands::DownloadAttachment { attachment_id } => {
client.download_attachment(attachment_id).await

View File

@@ -371,11 +371,7 @@ impl DaemonInterface for XpcDaemonInterface {
_ => Err(anyhow::anyhow!("Unexpected messages payload")),
}
}
async fn enqueue_outgoing_message(
&mut self,
_conversation_id: String,
_text: String,
) -> Result<()> {
async fn reply(&mut self, _conversation_id: String, _text: String) -> Result<()> {
let mach_port_name = Self::build_service_name()?;
let mut client = XPCClient::connect(&mach_port_name);
let mut args = HashMap::new();
@@ -387,10 +383,34 @@ impl DaemonInterface for XpcDaemonInterface {
Self::key("text"),
Message::String(CString::new(_text).unwrap()),
);
let reply = self
.call_method(&mut client, "SendMessage", Some(args))
let response = self.call_method(&mut client, "Reply", Some(args)).await?;
if let Some(uuid) = Self::get_string(&response, "uuid") {
println!("Outgoing message ID: {}", uuid.to_string_lossy());
}
Ok(())
}
async fn new_conversation(&mut self, handle_ids: Vec<String>, text: String) -> Result<()> {
let mach_port_name = Self::build_service_name()?;
let mut client = XPCClient::connect(&mach_port_name);
let mut args = HashMap::new();
args.insert(
Self::key("handle_ids"),
Message::Array(
handle_ids
.into_iter()
.map(|handle_id| Message::String(CString::new(handle_id).unwrap()))
.collect(),
),
);
args.insert(
Self::key("text"),
Message::String(CString::new(text).unwrap()),
);
let response = self
.call_method(&mut client, "NewConversation", Some(args))
.await?;
if let Some(uuid) = Self::get_string(&reply, "uuid") {
if let Some(uuid) = Self::get_string(&response, "uuid") {
println!("Outgoing message ID: {}", uuid.to_string_lossy());
}
Ok(())

View File

@@ -378,10 +378,11 @@ fn run_app(
app.pinned_to_bottom = was_pinned;
}
}
daemon::Event::MessageSent {
daemon::Event::MessageQueued {
conversation_id,
outgoing_id,
} => {
if let Some(conversation_id) = conversation_id {
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
app.status = outgoing_id
.as_deref()
@@ -394,6 +395,7 @@ fn run_app(
app.refresh_messages_in_flight = true;
}
}
}
daemon::Event::MarkedRead => {}
daemon::Event::ConversationSyncTriggered { conversation_id } => {
if app.active_conversation_id.as_deref() == Some(conversation_id.as_str()) {
@@ -638,7 +640,7 @@ fn handle_chat_keys(
return;
};
request_tx
.send(daemon::Request::SendMessage {
.send(daemon::Request::Reply {
conversation_id,
text,
})

View File

@@ -1,13 +1,13 @@
use std::env;
use std::process;
use kordophone::{
api::{HTTPAPIClient, InMemoryAuthenticationStore, EventSocket},
model::{ConversationID, event::EventData},
APIInterface,
};
use kordophone::api::http_client::Credentials;
use kordophone::api::AuthenticationStore;
use kordophone::api::http_client::Credentials;
use kordophone::{
APIInterface,
api::{EventSocket, HTTPAPIClient, InMemoryAuthenticationStore},
model::{ConversationID, event::EventData},
};
use futures_util::StreamExt;
use hyper::Uri;
@@ -18,7 +18,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <conversation_id1> [conversation_id2] [conversation_id3] ...", args[0]);
eprintln!(
"Usage: {} <conversation_id1> [conversation_id2] [conversation_id3] ...",
args[0]
);
eprintln!("Environment variables required:");
eprintln!(" KORDOPHONE_API_URL - Server URL");
eprintln!(" KORDOPHONE_USERNAME - Username for authentication");
@@ -40,12 +43,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let credentials = Credentials { username, password };
// Collect all conversation IDs from command line arguments
let target_conversation_ids: Vec<ConversationID> = args[1..].iter()
.map(|id| id.clone())
.collect();
let target_conversation_ids: Vec<ConversationID> =
args[1..].iter().map(|id| id.clone()).collect();
println!("Monitoring {} conversation(s) for updates: {:?}",
target_conversation_ids.len(), target_conversation_ids);
println!(
"Monitoring {} conversation(s) for updates: {:?}",
target_conversation_ids.len(),
target_conversation_ids
);
let auth_store = InMemoryAuthenticationStore::new(Some(credentials.clone()));
let mut client = HTTPAPIClient::new(server_url, auth_store);
@@ -62,26 +67,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match event_result {
Ok(socket_event) => {
match socket_event {
kordophone::api::event_socket::SocketEvent::Update(event) => {
match event.data {
kordophone::api::event_socket::SocketEvent::Update(event) => match event.data {
EventData::MessageReceived(conversation, _message) => {
if target_conversation_ids.contains(&conversation.guid) {
println!("Message update detected for conversation {}, marking as read...", conversation.guid);
println!(
"Message update detected for conversation {}, marking as read...",
conversation.guid
);
match client.mark_conversation_as_read(&conversation.guid).await {
Ok(_) => println!("Successfully marked conversation {} as read", conversation.guid),
Err(e) => eprintln!("Failed to mark conversation {} as read: {:?}", conversation.guid, e),
Ok(_) => println!(
"Successfully marked conversation {} as read",
conversation.guid
),
Err(e) => eprintln!(
"Failed to mark conversation {} as read: {:?}",
conversation.guid, e
),
}
}
}
},
_ => {}
}
},
kordophone::api::event_socket::SocketEvent::Pong => {
// Ignore pong messages
}
}
},
}
Err(e) => {
eprintln!("Error receiving event: {:?}", e);
break;

263
docs/plans/GLIB_BINDINGS.md Normal file
View File

@@ -0,0 +1,263 @@
# GLib Bindings Plan
## Status
Proposed. Not started.
## Context
Today the GTK app talks to `kordophoned` directly over D-Bus in
[`gtk/src/service/repository.vala`](/home/buzzert/src/Kordophone/gtk/src/service/repository.vala)
and the generated interface in
[`gtk/src/service/interface/dbusservice.vala`](/home/buzzert/src/Kordophone/gtk/src/service/interface/dbusservice.vala).
At the same time, the Rust-side daemon client logic already exists in
[`core/kordophoned-client/src/worker.rs`](/home/buzzert/src/Kordophone/core/kordophoned-client/src/worker.rs)
with platform backends for D-Bus and XPC. That means protocol changes currently
have to be reflected in multiple places:
- `kordophoned` D-Bus/XPC server shims
- `kordophoned-client` Rust transport layer
- GTK/Vala D-Bus interface and proxy code
- Swift XPC client code
For GTK/Vala specifically, the goal is to stop binding the application directly
to the daemon protocol surface.
## Recommendation
Add a GTK-facing GLib/GObject wrapper on top of a small C ABI exported from the
Rust daemon client stack.
Do not expose the current `kordophoned-client` Rust API directly as raw C.
The current surface uses Rust enums, `Vec<String>`, `Option`, and a threaded
worker model, which is fine internally but not a good stable FFI boundary.
The recommended layering is:
1. Keep `core/kordophoned-client` as the Rust-native transport/domain layer.
2. Add a new FFI crate with a narrow, C-safe API.
3. Add a small GLib/GObject wrapper for GTK/Vala consumption.
4. Migrate the GTK app to that wrapper and remove its direct D-Bus binding code.
This keeps one transport implementation in Rust while giving Vala a natural
GObject-style API with methods, async operations, and signals.
## Why Not Direct Rust GObject Export?
Exporting a GObject API directly from Rust is possible in principle, but the
tooling for generating the introspection artifacts that Vala wants is still much
less straightforward than plain C/GObject.
For this repo, the lower-risk path is:
- Rust for the daemon client implementation
- C ABI as the stable binary boundary
- a thin C/GObject wrapper for GI/Vala
That gives us standard GLib ownership rules, normal `.gir` / `.typelib` /
`.vapi` generation, and a cleaner Meson integration story for the GTK app.
## Proposed Layout
Add a new crate:
- `core/kordophoned-client-c`
This crate should export a small `extern "C"` interface around the existing
daemon client logic.
Add a new Linux-focused wrapper library:
- `gtk/libkordophone-client-glib` or `gtk/src/service/glib/`
This wrapper should be written in C and expose a GObject API that Vala can use.
It should depend on the Rust C ABI library, not on D-Bus directly.
## Proposed Responsibilities
### `core/kordophoned-client`
- Own request/response/signal semantics.
- Own platform transport handling:
- D-Bus on Linux
- XPC on macOS
- Stay Rust-native.
### `core/kordophoned-client-c`
- Define opaque client handles.
- Define FFI-safe request/response structs.
- Define callback registration for async completions and daemon signals.
- Marshal Rust events onto C callbacks.
- Hide Rust enums and collections from C consumers.
### GLib Wrapper
- Expose a `KpDaemonClient` GObject.
- Convert C callbacks into `GTask` completions and GObject signals.
- Marshal all callbacks onto the GLib main context.
- Expose Vala-friendly model objects or boxed structs.
## Draft Public Surface
The GTK-facing API should look like a normal GLib client, not like a transport
binding.
Suggested primary type:
- `KpDaemonClient`
Suggested async methods:
- `get_conversations_async(limit, offset, cancellable, callback)`
- `get_messages_async(conversation_id, last_message_id, cancellable, callback)`
- `reply_async(conversation_id, text, attachment_guids, cancellable, callback)`
- `new_conversation_async(handle_ids, text, attachment_guids, cancellable, callback)`
- `mark_conversation_as_read_async(conversation_id, cancellable, callback)`
- `sync_conversation_async(conversation_id, cancellable, callback)`
- `sync_conversation_list_async(cancellable, callback)`
- `upload_attachment_async(path, cancellable, callback)`
- `download_attachment_async(attachment_id, preview, cancellable, callback)`
- `get_attachment_info_async(attachment_id, cancellable, callback)`
Suggested synchronous or utility methods:
- `open_attachment_fd(attachment_id, preview, error)`
- `start()`
- `stop()`
Suggested signals:
- `conversations-updated`
- `messages-updated(conversation-id)`
- `attachment-downloaded(attachment-id)`
- `attachment-uploaded(upload-guid, attachment-guid)`
- `reconnected`
- `error(message)`
The first pass does not need to expose every daemon event. It only needs enough
surface to replace the current GTK repository layer.
## Suggested Model Types
Avoid returning raw hash tables to Vala.
Add small typed model objects or boxed structs for:
- `KpConversationSummary`
- `KpChatMessage`
- `KpAttachmentInfo`
If send acknowledgements matter to the UI, add:
- `KpQueuedMessage`
The GTK app can keep its own higher-level `Repository` wrapper initially, but it
should be wrapping typed client results instead of raw D-Bus maps.
## Signal Handling
Signals are the main reason this should be a GLib wrapper instead of plain C
calls from Vala.
Required behavior:
- daemon signal subscriptions must stay alive for the lifetime of the client
- transport callbacks must never call into GTK from a non-main thread
- all emitted GObject signals must be marshalled onto the GLib main context
The C ABI should therefore support registration of signal callbacks plus a user
data pointer, while the GLib wrapper owns the main-context handoff.
## Migration Plan
### Phase 1: Stabilize Rust FFI Boundary
- Add FFI-safe request/response types instead of exposing the current worker
enums directly.
- Keep the Rust worker and transport code internal.
- Decide which operations are callback-based and which can be blocking.
### Phase 2: Add `kordophoned-client-c`
- Expose opaque client construction/destruction.
- Expose request entry points for the operations GTK already uses.
- Expose signal subscription hooks.
- Add explicit allocation/free helpers for returned strings and arrays.
### Phase 3: Add GLib Wrapper
- Implement `KpDaemonClient` as a GObject in C.
- Convert C callbacks into `GTask`-based async completion methods.
- Emit GObject signals for daemon events.
- Generate introspection artifacts for Vala.
### Phase 4: Migrate GTK
- Replace direct use of `DBusService.Repository` in
[`gtk/src/service/repository.vala`](/home/buzzert/src/Kordophone/gtk/src/service/repository.vala).
- Remove the generated D-Bus binding dependency from the GTK app.
- Keep the existing GTK-side repository shape initially to minimize churn.
### Phase 5: Revisit Swift
Optional.
If this turns out cleaner than the current Swift XPC wrapper, add a Swift-facing
wrapper around the same C ABI later. This is not required for the GTK migration.
## Build System Notes
This plan introduces a Cargo + Meson integration boundary.
Expected follow-up work:
- decide whether the Rust C ABI library is built via `cargo build`, `cargo-c`,
or a Meson custom target
- decide where generated headers live
- decide where `.gir`, `.typelib`, and `.vapi` artifacts are produced and
installed
The cleanest packaging story is likely:
- Cargo builds the Rust library
- Meson builds the GLib wrapper and generates introspection data
- GTK links to the GLib wrapper
## Non-Goals
- replacing D-Bus and XPC with a custom socket transport
- unifying the macOS app onto GLib
- exposing the entire daemon protocol on day one
- redesigning GTK application architecture beyond the service boundary
## Risks
- FFI ownership mistakes across Rust, C, and GLib
- callback threading bugs if signal delivery is not marshalled correctly
- build complexity from mixed Cargo and Meson workflows
- over-exposing the current daemon protocol instead of defining a cleaner client
API
## Open Questions
- Should the C ABI be Linux-only at first, or cross-platform from day one?
- Should the first GTK-facing layer expose send acknowledgements, or just fire
and rely on message update signals?
- Should handle resolution be part of the GLib client API immediately, or added
only when GTK gains compose-new-conversation UI?
- Is it worth creating a higher-level shared protocol schema before building the
C ABI, or should that wait until after the GTK migration?
## Short Version
If we do this later, the best path is probably:
- Rust daemon client stays as the implementation core
- add a small C ABI on top of it
- add a tiny C/GObject wrapper for Vala
- move GTK off direct D-Bus bindings
That removes one of the protocol surfaces we currently maintain without forcing
the GTK app to consume a Rust-native API directly.

View File

@@ -1,6 +1,5 @@
%global app_version %{!?app_version:1.3.0}
Name: kordophone
Version: %{app_version}
Version: %{?app_version}%{!?app_version:1.3.0}
Release: 1%{?dist}
Summary: GTK4/Libadwaita client for Kordophone

View File

@@ -50,8 +50,11 @@ namespace DBusService {
[DBus (name = "GetMessages")]
public abstract GLib.HashTable<string, GLib.Variant>[] get_messages(string conversation_id, string last_message_id) throws DBusError, IOError;
[DBus (name = "SendMessage")]
public abstract string send_message(string conversation_id, string text, string[] attachment_guids) throws DBusError, IOError;
[DBus (name = "Reply")]
public abstract string reply(string conversation_id, string text, string[] attachment_guids) throws DBusError, IOError;
[DBus (name = "NewConversation")]
public abstract string new_conversation(string[] handle_ids, string text, string[] attachment_guids) throws DBusError, IOError;
[DBus (name = "MessagesUpdated")]
public signal void messages_updated(string conversation_id);

View File

@@ -83,7 +83,7 @@
</arg>
</method>
<method name="SendMessage">
<method name="Reply">
<arg type="s" name="conversation_id" direction="in"/>
<arg type="s" name="text" direction="in"/>
<arg type="as" name="attachment_guids" direction="in"/>
@@ -91,9 +91,28 @@
<arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Sends a message to the server. Returns the outgoing message ID.
value="Replies to an existing conversation. Returns the outgoing message ID.
Arguments:
- conversation_id: The ID of the conversation to send the message to.
- conversation_id: The ID of the conversation to reply to.
- text: The text of the message to send.
- attachment_guids: The GUIDs of the attachments to send.
Returns:
- outgoing_message_id: The ID of the outgoing message.
"/>
</method>
<method name="NewConversation">
<arg type="as" name="handle_ids" direction="in"/>
<arg type="s" name="text" direction="in"/>
<arg type="as" name="attachment_guids" direction="in"/>
<arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Sends a message to a new conversation identified by resolved handles.
Arguments:
- handle_ids: The resolved handles for the new conversation.
- text: The text of the message to send.
- attachment_guids: The GUIDs of the attachments to send.

View File

@@ -96,12 +96,20 @@ public class Repository : DBusServiceProxy {
return returned_messages;
}
public string send_message(string conversation_guid, string message, string[] attachment_guids) throws DBusServiceProxyError, GLib.Error {
public string reply(string conversation_guid, string message, string[] attachment_guids) throws DBusServiceProxyError, GLib.Error {
if (dbus_repository == null) {
throw new DBusServiceProxyError.NOT_CONNECTED("Repository not connected");
}
return dbus_repository.send_message(conversation_guid, message, attachment_guids);
return dbus_repository.reply(conversation_guid, message, attachment_guids);
}
public string new_conversation(string[] handle_ids, string message, string[] attachment_guids) throws DBusServiceProxyError, GLib.Error {
if (dbus_repository == null) {
throw new DBusServiceProxyError.NOT_CONNECTED("Repository not connected");
}
return dbus_repository.new_conversation(handle_ids, message, attachment_guids);
}
public void sync_conversation(string conversation_guid) throws DBusServiceProxyError, GLib.Error {

View File

@@ -257,7 +257,7 @@ class TranscriptContainerView : Adw.Bin
}
try {
Repository.get_instance().send_message(selected_conversation.guid, body, attachment_guids.to_array());
Repository.get_instance().reply(selected_conversation.guid, body, attachment_guids.to_array());
} catch (Error e) {
GLib.warning("Failed to send message: %s", e.message);
}

View File

@@ -327,8 +327,8 @@ private class TranscriptDrawingArea : Widget
private void recompute_message_layouts() {
var container_width = get_width();
float max_width = container_width * 0.90f;
float image_max_width = max_width * 0.75f;
float max_width = container_width * 0.80f;
float image_max_width = max_width * 0.70f;
DateTime? last_date = null;
string? last_sender = null;

View File

@@ -159,7 +159,6 @@ public class TranscriptView : Adw.Bin
}
delegate void OpenPath(string path);
private ulong attachment_downloaded_handler_id = 0;
private void open_attachment(string attachment_guid) {
OpenPath open_path = (path) => {
try {
@@ -180,10 +179,17 @@ public class TranscriptView : Adw.Bin
// TODO: Should probably indicate progress here.
attachment_downloaded_handler_id = Repository.get_instance().attachment_downloaded.connect((guid) => {
ulong handler_id = 0;
handler_id = Repository.get_instance().attachment_downloaded.connect((guid) => {
if (guid == attachment_guid) {
open_path(attachment_info.path);
Repository.get_instance().disconnect(attachment_downloaded_handler_id);
try {
var updated_attachment_info = Repository.get_instance().get_attachment_info(attachment_guid);
open_path(updated_attachment_info.path);
} catch (GLib.Error e) {
warning("Failed to get attachment info after download: %s", e.message);
}
Repository.get_instance().disconnect(handler_id);
}
});
}

View File

@@ -119,7 +119,7 @@ struct MessageEntryView: View
Task {
do {
try await client.sendMessage(
try await client.reply(
conversationId: convo.id,
message: messageText,
transferGuids: transferGuids

View File

@@ -133,7 +133,7 @@ final class XPCClient
return results
}
public func sendMessage(conversationId: String, message: String, transferGuids: Set<String>) async throws {
public func reply(conversationId: String, message: String, transferGuids: Set<String>) async throws {
var args: [String: xpc_object_t] = [:]
args["conversation_id"] = xpcString(conversationId)
args["text"] = xpcString(message)
@@ -142,7 +142,20 @@ final class XPCClient
args["attachment_guids"] = xpcStringArray(transferGuids)
}
let req = makeRequest(method: "SendMessage", arguments: args)
let req = makeRequest(method: "Reply", arguments: args)
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError }
}
public func newConversation(handleIds: Set<String>, message: String, transferGuids: Set<String>) async throws {
var args: [String: xpc_object_t] = [:]
args["handle_ids"] = xpcStringArray(handleIds)
args["text"] = xpcString(message)
if !transferGuids.isEmpty {
args["attachment_guids"] = xpcStringArray(transferGuids)
}
let req = makeRequest(method: "NewConversation", arguments: args)
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError }
}
@@ -411,4 +424,3 @@ extension xpc_object_t
)
}
}

View File

@@ -14,6 +14,7 @@
#import "MBIMAuthToken.h"
#import "MBIMUpdateQueue.h"
#import "MBIMURLUtilities.h"
#import "MBIMLogging.h"
#import <Security/Security.h>
#import "HTTPMessage.h"
@@ -98,6 +99,10 @@
__block NSObject<HTTPResponse> *response = nil;
dispatch_semaphore_t sema = dispatch_semaphore_create(0);
MBIMBridgeOperationCompletionBlock completion = ^(NSObject<HTTPResponse> *incomingResponse) {
if (incomingResponse == nil) {
MBIMLogError(@"Operation for %@ %@ completed with a nil response.", method, path);
}
response = incomingResponse;
dispatch_semaphore_signal(sema);
};
@@ -125,6 +130,11 @@
response = [_currentOperation cancelAndReturnTimeoutResponse];
}
if (response == nil) {
MBIMLogError(@"Returning fallback 500 for %@ %@ because the operation produced no response.", method, path);
response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
}
return response;
}

View File

@@ -10,6 +10,7 @@
#import "IMCore_ClassDump.h"
#import "IMMessageItem+Encoded.h"
#import "MBIMErrorResponse.h"
@implementation MBIMSendMessageOperation
@@ -20,16 +21,192 @@
return @"sendMessage";
}
- (IMMessage *)_sendMessage:(NSString *)messageBody toChatWithGUID:(NSString *)chatGUID attachmentGUIDs:(NSArray<NSString *> *)guids
- (nullable IMChat *)_existingSingleChatForHandle:(IMHandle *)handle registry:(IMChatRegistry *)registry
{
__block IMMessage *result = nil;
if ([registry respondsToSelector:@selector(existingChatWithHandle:allowAlternativeService:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandle:allowAlternativeService:");
return [registry existingChatWithHandle:handle allowAlternativeService:NO];
}
dispatch_sync([[self class] sharedIMAccessQueue], ^{
IMChat *chat = [[IMChatRegistry sharedInstance] existingChatWithGUID:chatGUID];
if ([registry respondsToSelector:@selector(existingChatWithHandle:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandle:");
return [registry existingChatWithHandle:handle];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandle:allowRetargeting:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandle:allowRetargeting:");
return [registry existingChatForIMHandle:handle allowRetargeting:NO];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandle:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandle:");
return [registry existingChatForIMHandle:handle];
}
MBIMLogError(@"IMChatRegistry does not support any known single-handle existing chat lookup selector.");
return nil;
}
- (nullable IMChat *)_createSingleChatForHandle:(IMHandle *)handle registry:(IMChatRegistry *)registry
{
if ([registry respondsToSelector:@selector(chatWithHandle:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatWithHandle:");
return [registry chatWithHandle:handle];
}
if ([registry respondsToSelector:@selector(chatForIMHandle:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatForIMHandle:");
return [registry chatForIMHandle:handle];
}
MBIMLogError(@"IMChatRegistry does not support any known single-handle chat creation selector.");
return nil;
}
- (nullable IMChat *)_existingGroupChatForHandles:(NSArray<IMHandle *> *)handles registry:(IMChatRegistry *)registry
{
if ([registry respondsToSelector:@selector(existingChatWithHandles:allowAlternativeService:groupID:displayName:joinedChatsOnly:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandles:allowAlternativeService:groupID:displayName:joinedChatsOnly:");
return [registry existingChatWithHandles:handles
allowAlternativeService:NO
groupID:nil
displayName:nil
joinedChatsOnly:YES];
}
if ([registry respondsToSelector:@selector(existingChatWithHandles:allowAlternativeService:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandles:allowAlternativeService:");
return [registry existingChatWithHandles:handles allowAlternativeService:NO];
}
if ([registry respondsToSelector:@selector(existingChatWithHandles:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatWithHandles:");
return [registry existingChatWithHandles:handles];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandles:allowRetargeting:groupID:displayName:joinedChatsOnly:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandles:allowRetargeting:groupID:displayName:joinedChatsOnly:");
return [registry existingChatForIMHandles:handles
allowRetargeting:NO
groupID:nil
displayName:nil
joinedChatsOnly:YES];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandles:allowRetargeting:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandles:allowRetargeting:");
return [registry existingChatForIMHandles:handles allowRetargeting:NO];
}
if ([registry respondsToSelector:@selector(existingChatForIMHandles:)]) {
MBIMLogInfo(@"Using IMChatRegistry existingChatForIMHandles:");
return [registry existingChatForIMHandles:handles];
}
MBIMLogError(@"IMChatRegistry does not support any known multi-handle existing chat lookup selector.");
return nil;
}
- (nullable IMChat *)_createGroupChatForHandles:(NSArray<IMHandle *> *)handles registry:(IMChatRegistry *)registry
{
if ([registry respondsToSelector:@selector(chatWithHandles:displayName:joinedChatsOnly:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatWithHandles:displayName:joinedChatsOnly:");
return [registry chatWithHandles:handles displayName:nil joinedChatsOnly:YES];
}
if ([registry respondsToSelector:@selector(chatWithHandles:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatWithHandles:");
return [registry chatWithHandles:handles];
}
if ([registry respondsToSelector:@selector(chatForIMHandles:displayName:joinedChatsOnly:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatForIMHandles:displayName:joinedChatsOnly:");
return [registry chatForIMHandles:handles displayName:nil joinedChatsOnly:YES];
}
if ([registry respondsToSelector:@selector(chatForIMHandles:)]) {
MBIMLogInfo(@"Using IMChatRegistry chatForIMHandles:");
return [registry chatForIMHandles:handles];
}
MBIMLogError(@"IMChatRegistry does not support any known multi-handle chat creation selector.");
return nil;
}
- (nullable IMChat *)_chatForHandleIDs:(NSArray<NSString *> *)handleIDs registry:(IMChatRegistry *)registry
{
MBIMLogInfo(@"Resolving send target for handles: %@", handleIDs);
// TODO: chat might not be an iMessage chat!
IMAccount *iMessageAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]];
IMHandle *senderHandle = [iMessageAccount loginIMHandle];
if (!iMessageAccount) {
MBIMLogError(@"Unable to find an iMessage account for message send.");
return nil;
}
NSMutableArray<IMHandle *> *handles = [NSMutableArray arrayWithCapacity:[handleIDs count]];
for (NSString *handleID in handleIDs) {
IMHandle *handle = [iMessageAccount imHandleWithID:handleID];
if (!handle) {
MBIMLogError(@"Couldn't resolve IMHandle for id %@", handleID);
return nil;
}
[handles addObject:handle];
}
if ([handles count] == 1) {
IMHandle *handle = [handles firstObject];
IMChat *chat = [self _existingSingleChatForHandle:handle registry:registry];
if (!chat) {
chat = [self _createSingleChatForHandle:handle registry:registry];
}
if (chat) {
MBIMLogInfo(@"Resolved send target %@ to chat %@", [handle ID], [chat guid] ?: @"<unknown>");
} else {
MBIMLogError(@"Unable to locate or create chat for handle %@", [handle ID]);
}
return chat;
}
IMChat *chat = [self _existingGroupChatForHandles:handles registry:registry];
if (!chat) {
chat = [self _createGroupChatForHandles:handles registry:registry];
}
if (chat) {
MBIMLogInfo(@"Resolved handles %@ to chat %@", handleIDs, [chat guid] ?: @"<unknown>");
} else {
MBIMLogError(@"Unable to locate or create chat for handles %@", handleIDs);
}
return chat;
}
- (nullable NSDictionary *)_sendMessage:(NSString *)messageBody toChat:(IMChat *)chat attachmentGUIDs:(NSArray<NSString *> *)guids includeConversationGUID:(BOOL)includeConversationGUID
{
if (!chat) {
return nil;
}
NSString *chatGUID = [chat guid];
if (!chatGUID) {
chatGUID = [[[IMChatRegistry sharedInstance] allGUIDsForChat:chat] firstObject];
}
MBIMLogInfo(@"Preparing sendMessage for chat %@ (bodyLength=%lu attachmentCount=%lu)", chatGUID ?: @"<unknown>", (unsigned long)[messageBody length], (unsigned long)[guids count]);
IMAccount *sendingAccount = [chat account];
if (!sendingAccount) {
sendingAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]];
}
IMHandle *senderHandle = [sendingAccount loginIMHandle];
if (!senderHandle) {
MBIMLogError(@"Unable to determine sender handle for message send.");
return nil;
}
NSAttributedString *replyAttrString = [[NSAttributedString alloc] initWithString:messageBody];
NSAttributedString *attrStringWithFileTransfers = IMCreateSuperFormatStringWithAppendedFileTransfers(replyAttrString, guids);
@@ -40,19 +217,31 @@
flags:(kIMMessageFinished | kIMMessageIsFromMe)];
for (NSString *guid in [reply fileTransferGUIDs]) {
[[IMFileTransferCenter sharedInstance] assignTransfer:guid toHandle:chat.recipient];
[[IMFileTransferCenter sharedInstance] assignTransfer:guid toMessage:reply account:sendingAccount];
}
if (!chat) {
MBIMLogInfo(@"Chat does not exist: %@", chatGUID);
} else {
result = reply;
NSDictionary *replyRepresentation = [reply mbim_dictionaryRepresentation];
if (![replyRepresentation isKindOfClass:[NSDictionary class]]) {
MBIMLogError(@"Unable to encode sent message for chat %@", chatGUID ?: @"<unknown>");
return nil;
}
NSMutableDictionary *result = [replyRepresentation mutableCopy];
if (includeConversationGUID) {
NSString *conversationGUID = chatGUID;
if (!conversationGUID) {
conversationGUID = [[[IMChatRegistry sharedInstance] allGUIDsForChat:chat] firstObject];
}
if (conversationGUID) {
result[@"conversationGUID"] = conversationGUID;
}
}
MBIMLogInfo(@"Dispatching IMCore send for chat %@", chatGUID ?: @"<unknown>");
dispatch_async(dispatch_get_main_queue(), ^{
[chat sendMessage:reply];
});
}
});
return result;
}
@@ -79,41 +268,112 @@
- (void)main
{
NSObject<HTTPResponse> *response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
__block NSObject<HTTPResponse> *response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
NSError *error = nil;
NSDictionary *args = [NSJSONSerialization JSONObjectWithData:self.requestBodyData options:0 error:&error];
if (error || args.count == 0) {
MBIMLogError(@"Unable to parse sendMessage request body: %@", error);
self.serverCompletionBlock(response);
return;
}
NSString *guid = [args objectForKey:@"guid"];
NSString *messageBody = [args objectForKey:@"body"];
if (!guid || !messageBody) {
NSArray *rawHandleIDs = [args objectForKey:@"handleIDs"];
BOOL hasGUID = [guid isKindOfClass:[NSString class]] && [guid length] > 0;
BOOL hasHandleIDs = [rawHandleIDs isKindOfClass:[NSArray class]] && [rawHandleIDs count] > 0;
if (![messageBody isKindOfClass:[NSString class]] || (!hasGUID && !hasHandleIDs) || (hasGUID && hasHandleIDs)) {
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"sendMessage requires body and exactly one of guid or handleIDs."];
self.serverCompletionBlock(response);
return;
}
// tapbacks
#if 0
IMMessage *acknowledgment = [IMMessage instantMessageWithAssociatedMessageContent: /* [NSString stringWithFormat:@"%@ \"%%@\"", tapbackAction] */
flags:0
associatedMessageGUID:guid
associatedMessageType:IMAssociatedMessageTypeAcknowledgmentHeart
associatedMessageRange:[imMessage messagePartRange]
messageSummaryInfo:[self adjustMessageSummaryInfoForSending:message]
threadIdentifier:[imMessage threadIdentifier]];
#endif
NSArray *transferGUIDs = [args objectForKey:@"fileTransferGUIDs"];
if (!transferGUIDs) {
transferGUIDs = @[];
NSMutableArray<NSString *> *handleIDs = [NSMutableArray array];
if (hasHandleIDs) {
for (id handleID in rawHandleIDs) {
if ([handleID isKindOfClass:[NSString class]] && [handleID length] > 0) {
[handleIDs addObject:handleID];
}
}
IMMessage *result = [self _sendMessage:messageBody toChatWithGUID:guid attachmentGUIDs:transferGUIDs];
if (result) {
response = [MBIMJSONDataResponse responseWithJSONObject:[result mbim_dictionaryRepresentation]];
handleIDs = [[[NSOrderedSet orderedSetWithArray:handleIDs] array] mutableCopy];
if ([handleIDs count] == 0) {
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"No valid handle IDs provided."];
self.serverCompletionBlock(response);
return;
}
}
NSArray *rawTransferGUIDs = [args objectForKey:@"fileTransferGUIDs"];
NSMutableArray<NSString *> *transferGUIDs = [NSMutableArray array];
if ([rawTransferGUIDs isKindOfClass:[NSArray class]]) {
for (id transferGUID in rawTransferGUIDs) {
if ([transferGUID isKindOfClass:[NSString class]] && [transferGUID length] > 0) {
[transferGUIDs addObject:transferGUID];
}
}
}
MBIMLogInfo(@"sendMessage request received. guid=%@ handleIDs=%@ bodyLength=%lu attachmentGUIDs=%@", hasGUID ? guid : @"<none>", handleIDs, (unsigned long)[messageBody length], transferGUIDs);
@try {
dispatch_sync([[self class] sharedIMAccessQueue], ^{
IMChatRegistry *registry = [IMChatRegistry sharedInstance];
IMChat *chat = nil;
BOOL includeConversationGUID = NO;
if (hasGUID) {
MBIMLogInfo(@"sendMessage targeting existing conversation %@", guid);
chat = [registry existingChatWithGUID:guid];
if (!chat) {
MBIMLogError(@"Chat does not exist for guid %@", guid);
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Chat does not exist for the provided guid."];
return;
}
} else {
MBIMLogInfo(@"sendMessage targeting handles %@", handleIDs);
chat = [self _chatForHandleIDs:handleIDs registry:registry];
includeConversationGUID = YES;
if (!chat) {
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Unable to create or locate a chat for the provided handles."];
return;
}
}
NSString *resolvedChatGUID = [chat guid];
if (!resolvedChatGUID) {
resolvedChatGUID = [[[IMChatRegistry sharedInstance] allGUIDsForChat:chat] firstObject];
}
MBIMLogInfo(@"sendMessage resolved target chat %@", resolvedChatGUID ?: @"<unknown>");
NSDictionary *result = [self _sendMessage:messageBody
toChat:chat
attachmentGUIDs:transferGUIDs
includeConversationGUID:includeConversationGUID];
if (!result) {
MBIMLogError(@"sendMessage failed before a response payload could be encoded for chat %@", resolvedChatGUID ?: @"<unknown>");
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Unable to construct sent message response."];
return;
}
NSObject<HTTPResponse> *jsonResponse = [MBIMJSONDataResponse responseWithJSONObject:result];
if (jsonResponse) {
response = jsonResponse;
} else {
MBIMLogError(@"Unable to encode sendMessage JSON response for chat %@", resolvedChatGUID ?: @"<unknown>");
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Unable to encode sendMessage response."];
}
});
} @catch (NSException *exception) {
MBIMLogError(@"Unhandled exception during sendMessage. name=%@ reason=%@ userInfo=%@", exception.name, exception.reason, exception.userInfo);
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"Unhandled exception while sending message. Check server logs."];
}
if (response == nil) {
MBIMLogError(@"sendMessage completed without producing a response. guid=%@ handleIDs=%@", hasGUID ? guid : @"<none>", handleIDs);
response = [[MBIMErrorResponse alloc] initWithErrorCode:500 message:@"sendMessage did not produce a response. Check server logs."];
}
self.serverCompletionBlock(response);