daemon: maintain outgoing message reference so model is consistent
This commit is contained in:
@@ -2,6 +2,9 @@ use tokio::sync::oneshot;
|
||||
use uuid::Uuid;
|
||||
|
||||
use kordophone_db::models::{Conversation, Message};
|
||||
use kordophone::model::ConversationID;
|
||||
use kordophone::model::OutgoingMessage;
|
||||
|
||||
use crate::daemon::settings::Settings;
|
||||
|
||||
pub type Reply<T> = oneshot::Sender<T>;
|
||||
@@ -45,6 +48,13 @@ pub enum Event {
|
||||
/// - reply: The outgoing message ID (not the server-assigned message ID).
|
||||
SendMessage(String, String, Reply<Uuid>),
|
||||
|
||||
/// Notifies the daemon that a message has been sent.
|
||||
/// Parameters:
|
||||
/// - message: The message that was sent.
|
||||
/// - outgoing_message: The outgoing message that was sent.
|
||||
/// - conversation_id: The ID of the conversation that the message was sent to.
|
||||
MessageSent(Message, OutgoingMessage, ConversationID),
|
||||
|
||||
/// Delete all conversations from the database.
|
||||
DeleteAllConversations(Reply<()>),
|
||||
}
|
||||
|
||||
@@ -10,11 +10,14 @@ use signals::*;
|
||||
|
||||
use anyhow::Result;
|
||||
use directories::ProjectDirs;
|
||||
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc::{Sender, Receiver};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -26,6 +29,7 @@ use kordophone_db::{
|
||||
use kordophone::api::APIInterface;
|
||||
use kordophone::api::http_client::HTTPAPIClient;
|
||||
use kordophone::model::outgoing_message::OutgoingMessage;
|
||||
use kordophone::model::ConversationID;
|
||||
|
||||
mod update_monitor;
|
||||
use update_monitor::UpdateMonitor;
|
||||
@@ -62,6 +66,8 @@ pub struct Daemon {
|
||||
post_office_sink: Sender<PostOfficeEvent>,
|
||||
post_office_source: Option<Receiver<PostOfficeEvent>>,
|
||||
|
||||
outgoing_messages: HashMap<ConversationID, Vec<OutgoingMessage>>,
|
||||
|
||||
version: String,
|
||||
database: Arc<Mutex<Database>>,
|
||||
runtime: tokio::runtime::Runtime,
|
||||
@@ -80,6 +86,7 @@ impl Daemon {
|
||||
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
|
||||
let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100);
|
||||
let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100);
|
||||
|
||||
// Create background task runtime
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
@@ -97,6 +104,7 @@ impl Daemon {
|
||||
signal_sender,
|
||||
post_office_sink,
|
||||
post_office_source: Some(post_office_source),
|
||||
outgoing_messages: HashMap::new(),
|
||||
runtime
|
||||
})
|
||||
}
|
||||
@@ -214,8 +222,31 @@ impl Daemon {
|
||||
},
|
||||
|
||||
Event::SendMessage(conversation_id, text, reply) => {
|
||||
let uuid = self.enqueue_outgoing_message(text, conversation_id).await;
|
||||
let conversation_id = conversation_id.clone();
|
||||
let uuid = self.enqueue_outgoing_message(text, conversation_id.clone()).await;
|
||||
reply.send(uuid).unwrap();
|
||||
|
||||
// Send message updated signal, we have a placeholder message we will return.
|
||||
self.signal_sender.send(Signal::MessagesUpdated(conversation_id.clone())).await.unwrap();
|
||||
},
|
||||
|
||||
Event::MessageSent(message, outgoing_message, conversation_id) => {
|
||||
log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id);
|
||||
|
||||
// Insert the message into the database.
|
||||
log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id);
|
||||
self.database.lock().await
|
||||
.with_repository(|r|
|
||||
r.insert_message( &conversation_id, message)
|
||||
).await.unwrap();
|
||||
|
||||
// Remove from outgoing messages.
|
||||
log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid);
|
||||
self.outgoing_messages.get_mut(&conversation_id)
|
||||
.map(|messages| messages.retain(|m| m.guid != outgoing_message.guid));
|
||||
|
||||
// Send message updated signal.
|
||||
self.signal_sender.send(Signal::MessagesUpdated(conversation_id)).await.unwrap();
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -234,15 +265,34 @@ impl Daemon {
|
||||
}
|
||||
|
||||
async fn get_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Vec<Message> {
|
||||
self.database.lock().await.with_repository(|r| r.get_messages_for_conversation(&conversation_id).unwrap()).await
|
||||
// Get outgoing messages for this conversation.
|
||||
let empty_vec: Vec<OutgoingMessage> = vec![];
|
||||
let outgoing_messages: &Vec<OutgoingMessage> = self.outgoing_messages.get(&conversation_id)
|
||||
.unwrap_or(&empty_vec);
|
||||
|
||||
self.database.lock().await
|
||||
.with_repository(|r|
|
||||
r.get_messages_for_conversation(&conversation_id)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.chain(outgoing_messages.into_iter().map(|m| m.into()))
|
||||
.collect()
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn enqueue_outgoing_message(&mut self, text: String, conversation_id: String) -> Uuid {
|
||||
let conversation_id = conversation_id.clone();
|
||||
let outgoing_message = OutgoingMessage::builder()
|
||||
.text(text)
|
||||
.conversation_id(conversation_id)
|
||||
.conversation_id(conversation_id.clone())
|
||||
.build();
|
||||
|
||||
// Keep a record of this so we can provide a consistent model to the client.
|
||||
self.outgoing_messages.entry(conversation_id)
|
||||
.or_insert(vec![])
|
||||
.push(outgoing_message.clone());
|
||||
|
||||
let guid = outgoing_message.guid.clone();
|
||||
self.post_office_sink.send(PostOfficeEvent::EnqueueOutgoingMessage(outgoing_message)).await.unwrap();
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::mpsc::{Sender, Receiver};
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_condvar::Condvar;
|
||||
|
||||
use crate::daemon::events::Event as DaemonEvent;
|
||||
@@ -49,9 +49,6 @@ impl<C: APIInterface, F: AsyncFnMut() -> Result<C>> PostOffice<C, F> {
|
||||
|
||||
loop {
|
||||
let mut retry_messages = Vec::new();
|
||||
|
||||
log::debug!(target: target::POST_OFFICE, "Waiting for event");
|
||||
|
||||
tokio::select! {
|
||||
// Incoming events
|
||||
Some(event) = self.event_source.recv() => {
|
||||
@@ -67,7 +64,14 @@ impl<C: APIInterface, F: AsyncFnMut() -> Result<C>> PostOffice<C, F> {
|
||||
// Message queue
|
||||
mut lock = self.message_available.wait(self.message_queue.lock().await) => {
|
||||
log::debug!(target: target::POST_OFFICE, "Message available in queue");
|
||||
retry_messages = Self::try_send_message_impl(&mut lock, &mut self.make_client).await;
|
||||
|
||||
// Get the next message to send, if any
|
||||
let message = lock.pop_front();
|
||||
drop(lock); // Release the lock before sending, we dont want to remain locked while sending.
|
||||
|
||||
if let Some(message) = message {
|
||||
retry_messages = Self::try_send_message(&mut self.make_client, &self.event_sink, message).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,33 +84,40 @@ impl<C: APIInterface, F: AsyncFnMut() -> Result<C>> PostOffice<C, F> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_send_message_impl(message_queue: &mut MutexGuard<'_, VecDeque<OutgoingMessage>>, make_client: &mut F) -> Vec<OutgoingMessage> {
|
||||
log::debug!(target: target::POST_OFFICE, "Trying to send enqueued messages");
|
||||
|
||||
async fn try_send_message(
|
||||
make_client: &mut F,
|
||||
event_sink: &Sender<DaemonEvent>,
|
||||
message: OutgoingMessage
|
||||
) -> Vec<OutgoingMessage>
|
||||
{
|
||||
let mut retry_messages = Vec::new();
|
||||
while let Some(message) = message_queue.pop_front() {
|
||||
match (make_client)().await {
|
||||
Ok(mut client) => {
|
||||
log::debug!(target: target::POST_OFFICE, "Obtained client, sending message.");
|
||||
match client.send_message(&message).await {
|
||||
Ok(message) => {
|
||||
log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid);
|
||||
// TODO: Notify the daemon via the event sink.
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(target: target::POST_OFFICE, "Error sending message: {:?}", e);
|
||||
log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
retry_messages.push(message);
|
||||
}
|
||||
|
||||
match (make_client)().await {
|
||||
Ok(mut client) => {
|
||||
log::debug!(target: target::POST_OFFICE, "Obtained client, sending message.");
|
||||
match client.send_message(&message).await {
|
||||
Ok(sent_message) => {
|
||||
log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid);
|
||||
|
||||
let conversation_id = message.conversation_id.clone();
|
||||
let event = DaemonEvent::MessageSent(sent_message.into(), message, conversation_id);
|
||||
event_sink.send(event).await.unwrap();
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
log::error!(target: target::POST_OFFICE, "Error sending message: {:?}", e);
|
||||
log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
retry_messages.push(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
log::error!(target: target::POST_OFFICE, "Error creating client: {:?}", e);
|
||||
log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(target: target::POST_OFFICE, "Error creating client: {:?}", e);
|
||||
log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
retry_messages.push(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user