diff --git a/kordophone-db/src/models/db/message.rs b/kordophone-db/src/models/db/message.rs index 67c7392..7f94262 100644 --- a/kordophone-db/src/models/db/message.rs +++ b/kordophone-db/src/models/db/message.rs @@ -2,7 +2,7 @@ use diesel::prelude::*; use chrono::NaiveDateTime; use crate::models::{Message, Participant}; -#[derive(Queryable, Selectable, Insertable, AsChangeset, Clone, Identifiable)] +#[derive(Queryable, Selectable, Insertable, AsChangeset, Clone, Identifiable, Debug)] #[diesel(table_name = crate::schema::messages)] #[diesel(check_for_backend(diesel::sqlite::Sqlite))] pub struct Record { diff --git a/kordophone-db/src/models/message.rs b/kordophone-db/src/models/message.rs index d16f620..076ea52 100644 --- a/kordophone-db/src/models/message.rs +++ b/kordophone-db/src/models/message.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, NaiveDateTime}; use uuid::Uuid; use crate::models::participant::Participant; +use kordophone::model::outgoing_message::OutgoingMessage; #[derive(Clone, Debug)] pub struct Message { @@ -40,6 +41,17 @@ impl From for Message { } } +impl From<&OutgoingMessage> for Message { + fn from(value: &OutgoingMessage) -> Self { + Self { + id: value.guid.to_string(), + sender: Participant::Me, + text: value.text.clone(), + date: value.date, + } + } +} + pub struct MessageBuilder { id: Option, sender: Option, diff --git a/kordophone-db/src/repository.rs b/kordophone-db/src/repository.rs index d630810..e200feb 100644 --- a/kordophone-db/src/repository.rs +++ b/kordophone-db/src/repository.rs @@ -246,6 +246,7 @@ impl<'a> Repository<'a> { fn update_conversation_metadata(&mut self, conversation_guid: &str, last_message: &MessageRecord) -> Result<()> { let conversation = self.get_conversation_by_guid(conversation_guid)?; if let Some(mut conversation) = conversation { + log::debug!("Updating conversation metadata: {} message: {:?}", conversation_guid, last_message); conversation.date = last_message.date; conversation.last_message_preview = Some(last_message.text.clone()); self.insert_conversation(conversation)?; diff --git a/kordophone-db/src/tests/mod.rs b/kordophone-db/src/tests/mod.rs index 1023b9e..c8c64d3 100644 --- a/kordophone-db/src/tests/mod.rs +++ b/kordophone-db/src/tests/mod.rs @@ -54,7 +54,7 @@ async fn test_add_conversation() { repository.insert_conversation(modified_conversation.clone()).unwrap(); // Make sure we still only have one conversation. - let all_conversations = repository.all_conversations().unwrap(); + let all_conversations = repository.all_conversations(i32::MAX, 0).unwrap(); assert_eq!(all_conversations.len(), 1); // And make sure the display name was updated @@ -125,7 +125,7 @@ async fn test_all_conversations_with_participants() { repository.insert_conversation(conversation2).unwrap(); // Get all conversations and verify the results - let all_conversations = repository.all_conversations().unwrap(); + let all_conversations = repository.all_conversations(i32::MAX, 0).unwrap(); assert_eq!(all_conversations.len(), 2); // Find and verify each conversation's participants diff --git a/kordophone/src/model/outgoing_message.rs b/kordophone/src/model/outgoing_message.rs index 530b741..d3a4123 100644 --- a/kordophone/src/model/outgoing_message.rs +++ b/kordophone/src/model/outgoing_message.rs @@ -1,5 +1,6 @@ use serde::Serialize; use super::conversation::ConversationID; +use chrono::NaiveDateTime; use uuid::Uuid; #[derive(Debug, Clone, Serialize)] @@ -7,6 +8,9 @@ pub struct OutgoingMessage { #[serde(skip)] pub guid: Uuid, + #[serde(skip)] + pub date: NaiveDateTime, + #[serde(rename = "body")] pub text: String, @@ -62,6 +66,7 @@ impl OutgoingMessageBuilder { text: self.text.unwrap(), conversation_id: self.conversation_id.unwrap(), file_transfer_guids: self.file_transfer_guids.unwrap_or_default(), + date: chrono::Utc::now().naive_utc(), } } } \ No newline at end of file diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index b21107f..2b20d6b 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -2,6 +2,9 @@ use tokio::sync::oneshot; use uuid::Uuid; use kordophone_db::models::{Conversation, Message}; +use kordophone::model::ConversationID; +use kordophone::model::OutgoingMessage; + use crate::daemon::settings::Settings; pub type Reply = oneshot::Sender; @@ -45,6 +48,13 @@ pub enum Event { /// - reply: The outgoing message ID (not the server-assigned message ID). SendMessage(String, String, Reply), + /// Notifies the daemon that a message has been sent. + /// Parameters: + /// - message: The message that was sent. + /// - outgoing_message: The outgoing message that was sent. + /// - conversation_id: The ID of the conversation that the message was sent to. + MessageSent(Message, OutgoingMessage, ConversationID), + /// Delete all conversations from the database. DeleteAllConversations(Reply<()>), } diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 5bd4e75..42059ba 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -10,11 +10,14 @@ use signals::*; use anyhow::Result; use directories::ProjectDirs; + use std::error::Error; use std::path::PathBuf; +use std::collections::HashMap; +use std::sync::Arc; + use thiserror::Error; use tokio::sync::mpsc::{Sender, Receiver}; -use std::sync::Arc; use tokio::sync::Mutex; use uuid::Uuid; @@ -26,6 +29,7 @@ use kordophone_db::{ use kordophone::api::APIInterface; use kordophone::api::http_client::HTTPAPIClient; use kordophone::model::outgoing_message::OutgoingMessage; +use kordophone::model::ConversationID; mod update_monitor; use update_monitor::UpdateMonitor; @@ -62,6 +66,8 @@ pub struct Daemon { post_office_sink: Sender, post_office_source: Option>, + outgoing_messages: HashMap>, + version: String, database: Arc>, runtime: tokio::runtime::Runtime, @@ -80,6 +86,7 @@ impl Daemon { let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100); let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100); let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100); + // Create background task runtime let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -97,6 +104,7 @@ impl Daemon { signal_sender, post_office_sink, post_office_source: Some(post_office_source), + outgoing_messages: HashMap::new(), runtime }) } @@ -214,8 +222,31 @@ impl Daemon { }, Event::SendMessage(conversation_id, text, reply) => { - let uuid = self.enqueue_outgoing_message(text, conversation_id).await; + let conversation_id = conversation_id.clone(); + let uuid = self.enqueue_outgoing_message(text, conversation_id.clone()).await; reply.send(uuid).unwrap(); + + // Send message updated signal, we have a placeholder message we will return. + self.signal_sender.send(Signal::MessagesUpdated(conversation_id.clone())).await.unwrap(); + }, + + Event::MessageSent(message, outgoing_message, conversation_id) => { + log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id); + + // Insert the message into the database. + log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id); + self.database.lock().await + .with_repository(|r| + r.insert_message( &conversation_id, message) + ).await.unwrap(); + + // Remove from outgoing messages. + log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid); + self.outgoing_messages.get_mut(&conversation_id) + .map(|messages| messages.retain(|m| m.guid != outgoing_message.guid)); + + // Send message updated signal. + self.signal_sender.send(Signal::MessagesUpdated(conversation_id)).await.unwrap(); }, } } @@ -234,15 +265,34 @@ impl Daemon { } async fn get_messages(&mut self, conversation_id: String, last_message_id: Option) -> Vec { - self.database.lock().await.with_repository(|r| r.get_messages_for_conversation(&conversation_id).unwrap()).await + // Get outgoing messages for this conversation. + let empty_vec: Vec = vec![]; + let outgoing_messages: &Vec = self.outgoing_messages.get(&conversation_id) + .unwrap_or(&empty_vec); + + self.database.lock().await + .with_repository(|r| + r.get_messages_for_conversation(&conversation_id) + .unwrap() + .into_iter() + .chain(outgoing_messages.into_iter().map(|m| m.into())) + .collect() + ) + .await } async fn enqueue_outgoing_message(&mut self, text: String, conversation_id: String) -> Uuid { + let conversation_id = conversation_id.clone(); let outgoing_message = OutgoingMessage::builder() .text(text) - .conversation_id(conversation_id) + .conversation_id(conversation_id.clone()) .build(); + // Keep a record of this so we can provide a consistent model to the client. + self.outgoing_messages.entry(conversation_id) + .or_insert(vec![]) + .push(outgoing_message.clone()); + let guid = outgoing_message.guid.clone(); self.post_office_sink.send(PostOfficeEvent::EnqueueOutgoingMessage(outgoing_message)).await.unwrap(); diff --git a/kordophoned/src/daemon/post_office.rs b/kordophoned/src/daemon/post_office.rs index 88ebe30..55d626e 100644 --- a/kordophoned/src/daemon/post_office.rs +++ b/kordophoned/src/daemon/post_office.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::time::Duration; use tokio::sync::mpsc::{Sender, Receiver}; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::Mutex; use tokio_condvar::Condvar; use crate::daemon::events::Event as DaemonEvent; @@ -49,9 +49,6 @@ impl Result> PostOffice { loop { let mut retry_messages = Vec::new(); - - log::debug!(target: target::POST_OFFICE, "Waiting for event"); - tokio::select! { // Incoming events Some(event) = self.event_source.recv() => { @@ -67,7 +64,14 @@ impl Result> PostOffice { // Message queue mut lock = self.message_available.wait(self.message_queue.lock().await) => { log::debug!(target: target::POST_OFFICE, "Message available in queue"); - retry_messages = Self::try_send_message_impl(&mut lock, &mut self.make_client).await; + + // Get the next message to send, if any + let message = lock.pop_front(); + drop(lock); // Release the lock before sending, we dont want to remain locked while sending. + + if let Some(message) = message { + retry_messages = Self::try_send_message(&mut self.make_client, &self.event_sink, message).await; + } } } @@ -80,33 +84,40 @@ impl Result> PostOffice { } } - async fn try_send_message_impl(message_queue: &mut MutexGuard<'_, VecDeque>, make_client: &mut F) -> Vec { - log::debug!(target: target::POST_OFFICE, "Trying to send enqueued messages"); - + async fn try_send_message( + make_client: &mut F, + event_sink: &Sender, + message: OutgoingMessage + ) -> Vec + { let mut retry_messages = Vec::new(); - while let Some(message) = message_queue.pop_front() { - match (make_client)().await { - Ok(mut client) => { - log::debug!(target: target::POST_OFFICE, "Obtained client, sending message."); - match client.send_message(&message).await { - Ok(message) => { - log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid); - // TODO: Notify the daemon via the event sink. - } - Err(e) => { - log::error!(target: target::POST_OFFICE, "Error sending message: {:?}", e); - log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds"); - tokio::time::sleep(Duration::from_secs(5)).await; - retry_messages.push(message); - } + + match (make_client)().await { + Ok(mut client) => { + log::debug!(target: target::POST_OFFICE, "Obtained client, sending message."); + match client.send_message(&message).await { + Ok(sent_message) => { + log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid); + + let conversation_id = message.conversation_id.clone(); + let event = DaemonEvent::MessageSent(sent_message.into(), message, conversation_id); + event_sink.send(event).await.unwrap(); + } + + Err(e) => { + log::error!(target: target::POST_OFFICE, "Error sending message: {:?}", e); + log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds"); + tokio::time::sleep(Duration::from_secs(5)).await; + retry_messages.push(message); } } + } - Err(e) => { - log::error!(target: target::POST_OFFICE, "Error creating client: {:?}", e); - log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds"); - tokio::time::sleep(Duration::from_secs(5)).await; - } + Err(e) => { + log::error!(target: target::POST_OFFICE, "Error creating client: {:?}", e); + log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds"); + tokio::time::sleep(Duration::from_secs(5)).await; + retry_messages.push(message); } }