Private
Public Access
1
0

daemon: implements post office

This commit is contained in:
2025-05-02 14:22:43 -07:00
parent 07b55f8615
commit 2519bc05ad
13 changed files with 234 additions and 48 deletions

View File

@@ -18,6 +18,8 @@ kordophone-db = { path = "../kordophone-db" }
log = "0.4.25"
thiserror = "2.0.12"
tokio = { version = "1", features = ["full"] }
tokio-condvar = "0.3.0"
uuid = "1.16.0"
[build-dependencies]
dbus-codegen = "0.10.0"

View File

@@ -58,6 +58,15 @@
<arg type="aa{sv}" direction="out" name="messages"/>
</method>
<method name="SendMessage">
<arg type="s" name="conversation_id" direction="in"/>
<arg type="s" name="text" direction="in"/>
<arg type="s" name="outgoing_message_id" direction="out"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Sends a message to the server. Returns the outgoing message ID."/>
</method>
<signal name="MessagesUpdated">
<arg type="s" name="conversation_id" direction="in"/>
<annotation name="org.freedesktop.DBus.DocString"

View File

@@ -1,4 +1,6 @@
use tokio::sync::oneshot;
use uuid::Uuid;
use kordophone_db::models::{Conversation, Message};
use crate::daemon::settings::Settings;
@@ -33,6 +35,13 @@ pub enum Event {
/// - last_message_id: (optional) The ID of the last message to get. If None, all messages are returned.
GetMessages(String, Option<String>, Reply<Vec<Message>>),
/// Enqueues a message to be sent to the server.
/// Parameters:
/// - conversation_id: The ID of the conversation to send the message to.
/// - text: The text of the message to send.
/// - reply: The outgoing message ID (not the server-assigned message ID).
SendMessage(String, String, Reply<Uuid>),
/// Delete all conversations from the database.
DeleteAllConversations(Reply<()>),
}

View File

@@ -16,6 +16,7 @@ use thiserror::Error;
use tokio::sync::mpsc::{Sender, Receiver};
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
use kordophone_db::{
database::{Database, DatabaseAccess},
@@ -24,6 +25,7 @@ use kordophone_db::{
use kordophone::api::APIInterface;
use kordophone::api::http_client::HTTPAPIClient;
use kordophone::model::outgoing_message::OutgoingMessage;
mod update_monitor;
use update_monitor::UpdateMonitor;
@@ -31,6 +33,10 @@ use update_monitor::UpdateMonitor;
mod auth_store;
use auth_store::DatabaseAuthenticationStore;
mod post_office;
use post_office::PostOffice;
use post_office::Event as PostOfficeEvent;
#[derive(Debug, Error)]
pub enum DaemonError {
#[error("Client Not Configured")]
@@ -52,6 +58,9 @@ pub struct Daemon {
signal_receiver: Option<Receiver<Signal>>,
signal_sender: Sender<Signal>,
post_office_sink: Sender<PostOfficeEvent>,
post_office_source: Option<Receiver<PostOfficeEvent>>,
version: String,
database: Arc<Mutex<Database>>,
runtime: tokio::runtime::Runtime,
@@ -69,6 +78,7 @@ impl Daemon {
// Create event channels
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100);
let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100);
// Create background task runtime
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
@@ -84,6 +94,8 @@ impl Daemon {
event_sender,
signal_receiver: Some(signal_receiver),
signal_sender,
post_office_sink,
post_office_source: Some(post_office_source),
runtime
})
}
@@ -92,12 +104,23 @@ impl Daemon {
log::info!("Starting daemon version {}", self.version);
log::debug!("Debug logging enabled.");
// Update monitor
let mut update_monitor = UpdateMonitor::new(self.database.clone(), self.event_sender.clone());
tokio::spawn(async move {
update_monitor.run().await; // should run indefinitely
});
// Post office
{
let mut database = self.database.clone();
let event_sender = self.event_sender.clone();
let post_office_source = self.post_office_source.take().unwrap();
tokio::spawn(async move {
let mut post_office = PostOffice::new(post_office_source, event_sender, async move || Self::get_client_impl(&mut database).await );
post_office.run().await;
});
}
while let Some(event) = self.event_receiver.recv().await {
log::debug!(target: target::EVENT, "Received event: {:?}", event);
self.handle_event(event).await;
@@ -188,6 +211,11 @@ impl Daemon {
reply.send(()).unwrap();
},
Event::SendMessage(conversation_id, text, reply) => {
let uuid = self.enqueue_outgoing_message(text, conversation_id).await;
reply.send(uuid).unwrap();
},
}
}
@@ -204,6 +232,18 @@ impl Daemon {
self.database.lock().await.with_repository(|r| r.get_messages_for_conversation(&conversation_id).unwrap()).await
}
async fn enqueue_outgoing_message(&mut self, text: String, conversation_id: String) -> Uuid {
let outgoing_message = OutgoingMessage::builder()
.text(text)
.conversation_id(conversation_id)
.build();
let guid = outgoing_message.guid.clone();
self.post_office_sink.send(PostOfficeEvent::EnqueueOutgoingMessage(outgoing_message)).await.unwrap();
guid
}
async fn sync_conversation_list(database: &mut Arc<Mutex<Database>>, signal_sender: &Sender<Signal>) -> Result<()> {
log::info!(target: target::SYNC, "Starting list conversation sync");

View File

@@ -0,0 +1,115 @@
use std::collections::VecDeque;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, Receiver};
use tokio::sync::{Mutex, MutexGuard};
use tokio_condvar::Condvar;
use crate::daemon::events::Event as DaemonEvent;
use kordophone::model::outgoing_message::OutgoingMessage;
use kordophone::api::APIInterface;
use anyhow::Result;
mod target {
pub static POST_OFFICE: &str = "post_office";
}
#[derive(Debug)]
pub enum Event {
EnqueueOutgoingMessage(OutgoingMessage),
}
pub struct PostOffice<C: APIInterface, F: AsyncFnMut() -> Result<C>> {
event_source: Receiver<Event>,
event_sink: Sender<DaemonEvent>,
make_client: F,
message_queue: Mutex<VecDeque<OutgoingMessage>>,
message_available: Condvar,
}
impl<C: APIInterface, F: AsyncFnMut() -> Result<C>> PostOffice<C, F> {
pub fn new(event_source: Receiver<Event>, event_sink: Sender<DaemonEvent>, make_client: F) -> Self {
Self {
event_source,
event_sink,
make_client,
message_queue: Mutex::new(VecDeque::new()),
message_available: Condvar::new(),
}
}
pub async fn queue_message(&mut self, message: &OutgoingMessage) {
self.message_queue.lock().await.push_back(message.clone());
self.message_available.notify_one();
}
pub async fn run(&mut self) {
log::info!(target: target::POST_OFFICE, "Starting post office");
loop {
let mut retry_messages = Vec::new();
log::debug!(target: target::POST_OFFICE, "Waiting for event");
tokio::select! {
// Incoming events
Some(event) = self.event_source.recv() => {
match event {
Event::EnqueueOutgoingMessage(message) => {
log::debug!(target: target::POST_OFFICE, "Received enqueue outgoing message event");
self.message_queue.lock().await.push_back(message);
self.message_available.notify_one();
}
}
}
// Message queue
mut lock = self.message_available.wait(self.message_queue.lock().await) => {
log::debug!(target: target::POST_OFFICE, "Message available in queue");
retry_messages = Self::try_send_message_impl(&mut lock, &mut self.make_client).await;
}
}
if !retry_messages.is_empty() {
log::debug!(target: target::POST_OFFICE, "Queueing {} messages for retry", retry_messages.len());
for message in retry_messages {
self.queue_message(&message).await;
}
}
}
}
async fn try_send_message_impl(message_queue: &mut MutexGuard<'_, VecDeque<OutgoingMessage>>, make_client: &mut F) -> Vec<OutgoingMessage> {
log::debug!(target: target::POST_OFFICE, "Trying to send enqueued messages");
let mut retry_messages = Vec::new();
while let Some(message) = message_queue.pop_front() {
match (make_client)().await {
Ok(mut client) => {
log::debug!(target: target::POST_OFFICE, "Obtained client, sending message.");
match client.send_message(&message).await {
Ok(message) => {
log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid);
// TODO: Notify the daemon via the event sink.
}
Err(e) => {
log::error!(target: target::POST_OFFICE, "Error sending message: {:?}", e);
log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds");
tokio::time::sleep(Duration::from_secs(5)).await;
retry_messages.push(message);
}
}
}
Err(e) => {
log::error!(target: target::POST_OFFICE, "Error creating client: {:?}", e);
log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
retry_messages
}
}

View File

@@ -102,6 +102,11 @@ impl DbusRepository for ServerImpl {
fn delete_all_conversations(&mut self) -> Result<(), dbus::MethodErr> {
self.send_event_sync(Event::DeleteAllConversations)
}
fn send_message(&mut self, conversation_id: String, text: String) -> Result<String, dbus::MethodErr> {
self.send_event_sync(|r| Event::SendMessage(conversation_id, text, r))
.map(|uuid| uuid.to_string())
}
}
impl DbusSettings for ServerImpl {