Add plumbing for new message/reply through core, gtk, and osx
This commit is contained in:
@@ -53,13 +53,21 @@ pub enum Event {
|
||||
/// - last_message_id: (optional) The ID of the last message to get. If None, all messages are returned.
|
||||
GetMessages(String, Option<String>, Reply<Vec<Message>>),
|
||||
|
||||
/// Enqueues a message to be sent to the server.
|
||||
/// Enqueues a reply to an existing conversation.
|
||||
/// Parameters:
|
||||
/// - conversation_id: The ID of the conversation to send the message to.
|
||||
/// - text: The text of the message to send.
|
||||
/// - attachment_guids: The GUIDs of the attachments to send.
|
||||
/// - reply: The outgoing message ID (not the server-assigned message ID).
|
||||
SendMessage(String, String, Vec<String>, Reply<Uuid>),
|
||||
Reply(String, String, Vec<String>, Reply<Uuid>),
|
||||
|
||||
/// Enqueues a message to one or more resolved handles.
|
||||
/// Parameters:
|
||||
/// - handle_ids: The resolved handle IDs for the new conversation.
|
||||
/// - text: The text of the message to send.
|
||||
/// - attachment_guids: The GUIDs of the attachments to send.
|
||||
/// - reply: The outgoing message ID (not the server-assigned message ID).
|
||||
NewConversation(Vec<String>, String, Vec<String>, Reply<Uuid>),
|
||||
|
||||
/// Notifies the daemon that a message has been sent.
|
||||
/// Parameters:
|
||||
|
||||
@@ -29,7 +29,7 @@ use kordophone_db::{
|
||||
|
||||
use kordophone::api::http_client::HTTPAPIClient;
|
||||
use kordophone::api::APIInterface;
|
||||
use kordophone::model::outgoing_message::OutgoingMessage;
|
||||
use kordophone::model::outgoing_message::{OutgoingMessage, OutgoingMessageTarget};
|
||||
use kordophone::model::{ConversationID, MessageID};
|
||||
|
||||
mod update_monitor;
|
||||
@@ -330,10 +330,14 @@ impl Daemon {
|
||||
let _ = reply.send(());
|
||||
}
|
||||
|
||||
Event::SendMessage(conversation_id, text, attachment_guids, reply) => {
|
||||
Event::Reply(conversation_id, text, attachment_guids, reply) => {
|
||||
let conversation_id = conversation_id.clone();
|
||||
let uuid = self
|
||||
.enqueue_outgoing_message(text, conversation_id.clone(), attachment_guids)
|
||||
.enqueue_outgoing_message(
|
||||
text,
|
||||
OutgoingMessageTarget::Conversation(conversation_id.clone()),
|
||||
attachment_guids,
|
||||
)
|
||||
.await;
|
||||
let _ = reply.send(uuid);
|
||||
|
||||
@@ -344,12 +348,52 @@ impl Daemon {
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Event::NewConversation(handle_ids, text, attachment_guids, reply) => {
|
||||
let uuid = self
|
||||
.enqueue_outgoing_message(
|
||||
text,
|
||||
OutgoingMessageTarget::Handles(handle_ids),
|
||||
attachment_guids,
|
||||
)
|
||||
.await;
|
||||
let _ = reply.send(uuid);
|
||||
}
|
||||
|
||||
Event::MessageSent(message, outgoing_message, conversation_id) => {
|
||||
log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id);
|
||||
|
||||
let conversation_created = match self
|
||||
.ensure_conversation_exists_for_sent_message(
|
||||
&conversation_id,
|
||||
&outgoing_message,
|
||||
&message,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(created) => created,
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
target: target::EVENT,
|
||||
"Failed to ensure conversation {} exists for sent message {}: {}",
|
||||
conversation_id,
|
||||
message.id,
|
||||
e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if conversation_created {
|
||||
self.signal_sender
|
||||
.send(Signal::ConversationsUpdated)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Insert the message into the database.
|
||||
log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id);
|
||||
self.database
|
||||
if let Err(e) = self
|
||||
.database
|
||||
.lock()
|
||||
.await
|
||||
.with_repository(|r| {
|
||||
@@ -363,13 +407,24 @@ impl Daemon {
|
||||
)
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
log::error!(
|
||||
target: target::EVENT,
|
||||
"Failed to persist sent message {} for conversation {}: {}",
|
||||
message.id,
|
||||
conversation_id,
|
||||
e
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Remove from outgoing messages.
|
||||
log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid);
|
||||
for messages in self.outgoing_messages.values_mut() {
|
||||
messages.retain(|m| m.guid != outgoing_message.guid);
|
||||
}
|
||||
self.outgoing_messages
|
||||
.get_mut(&conversation_id)
|
||||
.map(|messages| messages.retain(|m| m.guid != outgoing_message.guid));
|
||||
.retain(|_, messages| !messages.is_empty());
|
||||
|
||||
// Send message updated signal.
|
||||
self.signal_sender
|
||||
@@ -517,24 +572,87 @@ impl Daemon {
|
||||
result
|
||||
}
|
||||
|
||||
async fn ensure_conversation_exists_for_sent_message(
|
||||
&mut self,
|
||||
conversation_id: &ConversationID,
|
||||
outgoing_message: &OutgoingMessage,
|
||||
message: &Message,
|
||||
) -> Result<bool> {
|
||||
let conversation_exists = self
|
||||
.database
|
||||
.lock()
|
||||
.await
|
||||
.with_repository(|r| r.get_conversation_by_guid(conversation_id))
|
||||
.await?
|
||||
.is_some();
|
||||
|
||||
if conversation_exists {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let participants = Self::participants_for_outgoing_message(outgoing_message);
|
||||
let mut builder = Conversation::builder()
|
||||
.guid(conversation_id)
|
||||
.date(message.date)
|
||||
.unread_count(0)
|
||||
.participants(participants);
|
||||
|
||||
if !message.text.trim().is_empty() {
|
||||
builder = builder.last_message_preview(&message.text);
|
||||
}
|
||||
|
||||
let conversation = builder.build();
|
||||
log::info!(
|
||||
target: target::EVENT,
|
||||
"Creating local conversation {} from sent message {}",
|
||||
conversation_id,
|
||||
message.id
|
||||
);
|
||||
|
||||
self.database
|
||||
.lock()
|
||||
.await
|
||||
.with_repository(|r| r.insert_conversation(conversation))
|
||||
.await?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn participants_for_outgoing_message(outgoing_message: &OutgoingMessage) -> Vec<DbParticipant> {
|
||||
let handle_ids = match &outgoing_message.target {
|
||||
OutgoingMessageTarget::Conversation(_) => return Vec::new(),
|
||||
OutgoingMessageTarget::Handles(handle_ids) => handle_ids,
|
||||
};
|
||||
|
||||
let mut contact_resolver = ContactResolver::new(DefaultContactResolverBackend::default());
|
||||
handle_ids
|
||||
.iter()
|
||||
.map(|handle| DbParticipant::Remote {
|
||||
handle: handle.clone(),
|
||||
contact_id: contact_resolver.resolve_contact_id(handle),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn enqueue_outgoing_message(
|
||||
&mut self,
|
||||
text: String,
|
||||
conversation_id: String,
|
||||
target: OutgoingMessageTarget,
|
||||
attachment_guids: Vec<String>,
|
||||
) -> Uuid {
|
||||
let conversation_id = conversation_id.clone();
|
||||
let outgoing_message = OutgoingMessage::builder()
|
||||
.text(text)
|
||||
.conversation_id(conversation_id.clone())
|
||||
.target(target)
|
||||
.file_transfer_guids(attachment_guids)
|
||||
.build();
|
||||
|
||||
// Keep a record of this so we can provide a consistent model to the client.
|
||||
self.outgoing_messages
|
||||
.entry(conversation_id)
|
||||
.or_insert(vec![])
|
||||
.push(outgoing_message.clone());
|
||||
if let Some(conversation_id) = outgoing_message.conversation_id().cloned() {
|
||||
// Keep a record of replies so we can provide a consistent model to the client.
|
||||
self.outgoing_messages
|
||||
.entry(conversation_id)
|
||||
.or_insert(vec![])
|
||||
.push(outgoing_message.clone());
|
||||
}
|
||||
|
||||
let guid = outgoing_message.guid.clone();
|
||||
self.post_office_sink
|
||||
|
||||
@@ -388,13 +388,23 @@ impl DbusRepository for DBusAgent {
|
||||
self.send_event_sync(Event::DeleteAllConversations)
|
||||
}
|
||||
|
||||
fn send_message(
|
||||
fn reply(
|
||||
&mut self,
|
||||
conversation_id: String,
|
||||
text: String,
|
||||
attachment_guids: Vec<String>,
|
||||
) -> Result<String, MethodErr> {
|
||||
self.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
|
||||
self.send_event_sync(|r| Event::Reply(conversation_id, text, attachment_guids, r))
|
||||
.map(|uuid| uuid.to_string())
|
||||
}
|
||||
|
||||
fn new_conversation(
|
||||
&mut self,
|
||||
handle_ids: Vec<String>,
|
||||
text: String,
|
||||
attachment_guids: Vec<String>,
|
||||
) -> Result<String, MethodErr> {
|
||||
self.send_event_sync(|r| Event::NewConversation(handle_ids, text, attachment_guids, r))
|
||||
.map(|uuid| uuid.to_string())
|
||||
}
|
||||
|
||||
|
||||
@@ -127,7 +127,7 @@ impl XpcAgent {
|
||||
|
||||
// Drop any cleanup resource now that payload is constructed and sent.
|
||||
drop(result.cleanup);
|
||||
|
||||
|
||||
log::trace!(target: LOG_TARGET, "XPC reply sent for method: {}", method);
|
||||
} else {
|
||||
log::warn!(target: LOG_TARGET, "No reply port for method: {}", method);
|
||||
|
||||
@@ -254,8 +254,8 @@ pub async fn dispatch(
|
||||
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
|
||||
},
|
||||
|
||||
// SendMessage
|
||||
"SendMessage" => {
|
||||
// Reply
|
||||
"Reply" => {
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => {
|
||||
@@ -286,12 +286,64 @@ pub async fn dispatch(
|
||||
_ => Vec::new(),
|
||||
};
|
||||
match agent
|
||||
.send_event(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
|
||||
.send_event(|r| Event::Reply(conversation_id, text, attachment_guids, r))
|
||||
.await
|
||||
{
|
||||
Ok(uuid) => {
|
||||
let mut reply: XpcMap = HashMap::new();
|
||||
dict_put_str(&mut reply, "type", "SendMessageResponse");
|
||||
dict_put_str(&mut reply, "type", "ReplyResponse");
|
||||
dict_put_str(&mut reply, "uuid", &uuid.to_string());
|
||||
DispatchResult::new(Message::Dictionary(reply))
|
||||
}
|
||||
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
// NewConversation
|
||||
"NewConversation" => {
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing arguments",
|
||||
))
|
||||
}
|
||||
};
|
||||
let handle_ids: Vec<String> = match args.get(&cstr("handle_ids")) {
|
||||
Some(Message::Array(arr)) => arr
|
||||
.iter()
|
||||
.filter_map(|m| match m {
|
||||
Message::String(s) => Some(s.to_string_lossy().into_owned()),
|
||||
_ => None,
|
||||
})
|
||||
.collect(),
|
||||
_ => Vec::new(),
|
||||
};
|
||||
if handle_ids.is_empty() {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing handle_ids",
|
||||
));
|
||||
}
|
||||
let text = dict_get_str(args, "text").unwrap_or_default();
|
||||
let attachment_guids: Vec<String> = match args.get(&cstr("attachment_guids")) {
|
||||
Some(Message::Array(arr)) => arr
|
||||
.iter()
|
||||
.filter_map(|m| match m {
|
||||
Message::String(s) => Some(s.to_string_lossy().into_owned()),
|
||||
_ => None,
|
||||
})
|
||||
.collect(),
|
||||
_ => Vec::new(),
|
||||
};
|
||||
match agent
|
||||
.send_event(|r| Event::NewConversation(handle_ids, text, attachment_guids, r))
|
||||
.await
|
||||
{
|
||||
Ok(uuid) => {
|
||||
let mut reply: XpcMap = HashMap::new();
|
||||
dict_put_str(&mut reply, "type", "NewConversationResponse");
|
||||
dict_put_str(&mut reply, "uuid", &uuid.to_string());
|
||||
DispatchResult::new(Message::Dictionary(reply))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user