Compare commits
3 Commits
master
...
features/c
| Author | SHA1 | Date | |
|---|---|---|---|
| 8d9251bfe2 | |||
| 0cfa5e05d4 | |||
| 717138b371 |
840
core/Cargo.lock
generated
840
core/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -307,8 +307,8 @@ impl<'a> Repository<'a> {
|
||||
}
|
||||
|
||||
pub fn delete_all_messages(&mut self) -> Result<()> {
|
||||
use crate::schema::messages::dsl as messages_dsl;
|
||||
use crate::schema::message_aliases::dsl as aliases_dsl;
|
||||
use crate::schema::messages::dsl as messages_dsl;
|
||||
|
||||
diesel::delete(messages_dsl::messages).execute(self.connection)?;
|
||||
diesel::delete(aliases_dsl::message_aliases).execute(self.connection)?;
|
||||
|
||||
@@ -394,8 +394,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
|
||||
None => "updates".to_string(),
|
||||
};
|
||||
|
||||
let uri = self
|
||||
.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
|
||||
let uri = self.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
|
||||
|
||||
loop {
|
||||
log::debug!("Connecting to websocket: {:?}", uri);
|
||||
@@ -430,14 +429,16 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
|
||||
match connect_async(request).await.map_err(Error::from) {
|
||||
Ok((socket, response)) => {
|
||||
log::debug!("Websocket connected: {:?}", response.status());
|
||||
break Ok(WebsocketEventSocket::new(socket))
|
||||
break Ok(WebsocketEventSocket::new(socket));
|
||||
}
|
||||
Err(e) => match &e {
|
||||
Error::ClientError(ce) => match ce.as_str() {
|
||||
"HTTP error: 401 Unauthorized" | "Unauthorized" => {
|
||||
// Try to authenticate
|
||||
if let Some(credentials) = &self.auth_store.get_credentials().await {
|
||||
log::warn!("Websocket connection failed, attempting to authenticate");
|
||||
log::warn!(
|
||||
"Websocket connection failed, attempting to authenticate"
|
||||
);
|
||||
let new_token = self.authenticate(credentials.clone()).await?;
|
||||
self.auth_store.set_token(new_token.to_string()).await;
|
||||
|
||||
@@ -469,16 +470,16 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
|
||||
let https = HttpsConnector::new();
|
||||
let client = Client::builder().build::<_, Body>(https);
|
||||
|
||||
HTTPAPIClient { base_url, auth_store, client }
|
||||
HTTPAPIClient {
|
||||
base_url,
|
||||
auth_store,
|
||||
client,
|
||||
}
|
||||
}
|
||||
|
||||
fn uri_for_endpoint(&self, endpoint: &str, scheme: Option<&str>) -> Result<Uri, Error> {
|
||||
let mut parts = self.base_url.clone().into_parts();
|
||||
let root_path: PathBuf = parts
|
||||
.path_and_query
|
||||
.ok_or(Error::URLError)?
|
||||
.path()
|
||||
.into();
|
||||
let root_path: PathBuf = parts.path_and_query.ok_or(Error::URLError)?.path().into();
|
||||
|
||||
let path = root_path.join(endpoint);
|
||||
let path_str = path.to_str().ok_or(Error::URLError)?;
|
||||
|
||||
@@ -23,6 +23,7 @@ tokio-condvar = "0.3.0"
|
||||
uuid = "1.16.0"
|
||||
once_cell = "1.19.0"
|
||||
mime_guess = "2.0"
|
||||
notify = { package = "notify-rust", version = "4.10.0" }
|
||||
|
||||
# D-Bus dependencies only on Linux
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
|
||||
@@ -103,6 +103,13 @@
|
||||
"/>
|
||||
</method>
|
||||
|
||||
<method name="TestNotification">
|
||||
<arg type="s" name="summary" direction="in"/>
|
||||
<arg type="s" name="body" direction="in"/>
|
||||
<annotation name="org.freedesktop.DBus.DocString"
|
||||
value="Displays a test desktop notification with the provided summary and body."/>
|
||||
</method>
|
||||
|
||||
<signal name="MessagesUpdated">
|
||||
<arg type="s" name="conversation_id" direction="in"/>
|
||||
<annotation name="org.freedesktop.DBus.DocString"
|
||||
|
||||
@@ -61,6 +61,9 @@ pub enum Event {
|
||||
/// - reply: The outgoing message ID (not the server-assigned message ID).
|
||||
SendMessage(String, String, Vec<String>, Reply<Uuid>),
|
||||
|
||||
/// Triggers a manual test notification.
|
||||
TestNotification(String, String, Reply<Result<(), String>>),
|
||||
|
||||
/// Notifies the daemon that a message has been sent.
|
||||
/// Parameters:
|
||||
/// - message: The message that was sent.
|
||||
|
||||
@@ -17,8 +17,11 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::{
|
||||
broadcast,
|
||||
mpsc::{Receiver, Sender},
|
||||
Mutex,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use kordophone_db::{
|
||||
@@ -41,6 +44,9 @@ mod post_office;
|
||||
use post_office::Event as PostOfficeEvent;
|
||||
use post_office::PostOffice;
|
||||
|
||||
mod notifier;
|
||||
use notifier::NotificationService;
|
||||
|
||||
mod models;
|
||||
pub use models::Attachment;
|
||||
pub use models::Message;
|
||||
@@ -76,8 +82,7 @@ pub struct Daemon {
|
||||
pub event_sender: Sender<Event>,
|
||||
event_receiver: Receiver<Event>,
|
||||
|
||||
signal_receiver: Option<Receiver<Signal>>,
|
||||
signal_sender: Sender<Signal>,
|
||||
signal_sender: broadcast::Sender<Signal>,
|
||||
|
||||
post_office_sink: Sender<PostOfficeEvent>,
|
||||
post_office_source: Option<Receiver<PostOfficeEvent>>,
|
||||
@@ -87,6 +92,7 @@ pub struct Daemon {
|
||||
attachment_store_sink: Option<Sender<AttachmentStoreEvent>>,
|
||||
update_monitor_command_tx: Option<Sender<UpdateMonitorCommand>>,
|
||||
|
||||
notifier: Arc<NotificationService>,
|
||||
version: String,
|
||||
database: Arc<Mutex<Database>>,
|
||||
runtime: tokio::runtime::Runtime,
|
||||
@@ -103,7 +109,7 @@ impl Daemon {
|
||||
|
||||
// Create event channels
|
||||
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
|
||||
let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100);
|
||||
let (signal_sender, _) = tokio::sync::broadcast::channel(100);
|
||||
let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100);
|
||||
|
||||
// Create background task runtime
|
||||
@@ -114,13 +120,14 @@ impl Daemon {
|
||||
|
||||
let database_impl = Database::new(&database_path.to_string_lossy())?;
|
||||
let database = Arc::new(Mutex::new(database_impl));
|
||||
let notifier = Arc::new(NotificationService::new());
|
||||
|
||||
Ok(Self {
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
notifier,
|
||||
database,
|
||||
event_receiver,
|
||||
event_sender,
|
||||
signal_receiver: Some(signal_receiver),
|
||||
signal_sender,
|
||||
post_office_sink,
|
||||
post_office_source: Some(post_office_source),
|
||||
@@ -165,6 +172,16 @@ impl Daemon {
|
||||
attachment_store.run().await;
|
||||
});
|
||||
|
||||
// Notification listener
|
||||
{
|
||||
let notifier = self.notifier.clone();
|
||||
let mut signal_rx = self.signal_sender.subscribe();
|
||||
let database = self.database.clone();
|
||||
tokio::spawn(async move {
|
||||
notifier.listen(signal_rx, database).await;
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(event) = self.event_receiver.recv().await {
|
||||
log::debug!(target: target::EVENT, "Received event: {:?}", event);
|
||||
self.handle_event(event).await;
|
||||
@@ -260,10 +277,11 @@ impl Daemon {
|
||||
self.spawn_conversation_list_sync();
|
||||
|
||||
// Send signal to the client that the update stream has been reconnected.
|
||||
self.signal_sender
|
||||
.send(Signal::UpdateStreamReconnected)
|
||||
.await
|
||||
.unwrap();
|
||||
Self::send_signal(
|
||||
&self.signal_sender,
|
||||
Signal::UpdateStreamReconnected,
|
||||
target::UPDATES,
|
||||
);
|
||||
}
|
||||
|
||||
Event::GetAllConversations(limit, offset, reply) => {
|
||||
@@ -326,17 +344,13 @@ impl Daemon {
|
||||
}
|
||||
|
||||
Event::SendMessage(conversation_id, text, attachment_guids, reply) => {
|
||||
let conversation_id = conversation_id.clone();
|
||||
let uuid = self
|
||||
.enqueue_outgoing_message(text, conversation_id.clone(), attachment_guids)
|
||||
.await;
|
||||
reply.send(uuid).unwrap();
|
||||
|
||||
// Send message updated signal, we have a placeholder message we will return.
|
||||
self.signal_sender
|
||||
.send(Signal::MessagesUpdated(conversation_id.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
// Notify clients that messages have changed (e.g., to refresh placeholders).
|
||||
self.emit_messages_updated(conversation_id);
|
||||
}
|
||||
|
||||
Event::MessageSent(message, outgoing_message, conversation_id) => {
|
||||
@@ -366,11 +380,20 @@ impl Daemon {
|
||||
.get_mut(&conversation_id)
|
||||
.map(|messages| messages.retain(|m| m.guid != outgoing_message.guid));
|
||||
|
||||
// Send message updated signal.
|
||||
self.signal_sender
|
||||
.send(Signal::MessagesUpdated(conversation_id))
|
||||
.await
|
||||
.unwrap();
|
||||
// Notify clients to refresh the conversation after the final message arrives.
|
||||
self.emit_messages_updated(conversation_id);
|
||||
}
|
||||
|
||||
Event::TestNotification(summary, body, reply) => {
|
||||
let result = self
|
||||
.signal_sender
|
||||
.send(Signal::Internal(InternalSignal::TestNotification {
|
||||
summary,
|
||||
body,
|
||||
}))
|
||||
.map(|_| ())
|
||||
.map_err(|e| e.to_string());
|
||||
reply.send(result).unwrap();
|
||||
}
|
||||
|
||||
Event::GetAttachment(guid, reply) => {
|
||||
@@ -402,10 +425,11 @@ impl Daemon {
|
||||
log::debug!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id);
|
||||
|
||||
// Send signal to the client that the attachment has been downloaded.
|
||||
self.signal_sender
|
||||
.send(Signal::AttachmentDownloaded(attachment_id))
|
||||
.await
|
||||
.unwrap();
|
||||
Self::send_signal(
|
||||
&self.signal_sender,
|
||||
Signal::AttachmentDownloaded(attachment_id),
|
||||
target::ATTACHMENTS,
|
||||
);
|
||||
}
|
||||
|
||||
Event::UploadAttachment(path, reply) => {
|
||||
@@ -420,17 +444,17 @@ impl Daemon {
|
||||
Event::AttachmentUploaded(upload_guid, attachment_guid) => {
|
||||
log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid);
|
||||
|
||||
self.signal_sender
|
||||
.send(Signal::AttachmentUploaded(upload_guid, attachment_guid))
|
||||
.await
|
||||
.unwrap();
|
||||
Self::send_signal(
|
||||
&self.signal_sender,
|
||||
Signal::AttachmentUploaded(upload_guid, attachment_guid),
|
||||
target::ATTACHMENTS,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Panics if the signal receiver has already been taken.
|
||||
pub fn obtain_signal_receiver(&mut self) -> Receiver<Signal> {
|
||||
self.signal_receiver.take().unwrap()
|
||||
pub fn subscribe_signals(&self) -> broadcast::Receiver<Signal> {
|
||||
self.signal_sender.subscribe()
|
||||
}
|
||||
|
||||
async fn get_conversations_limit_offset(
|
||||
@@ -445,6 +469,10 @@ impl Daemon {
|
||||
.await
|
||||
}
|
||||
|
||||
fn emit_messages_updated(&self, conversation_id: String) {
|
||||
Self::send_messages_updated(&self.signal_sender, conversation_id);
|
||||
}
|
||||
|
||||
async fn get_messages(
|
||||
&mut self,
|
||||
conversation_id: String,
|
||||
@@ -471,9 +499,8 @@ impl Daemon {
|
||||
.await;
|
||||
|
||||
// Convert DB messages to daemon model, substituting local_id when an alias exists.
|
||||
let mut result: Vec<Message> = Vec::with_capacity(
|
||||
db_messages.len() + outgoing_messages.len(),
|
||||
);
|
||||
let mut result: Vec<Message> =
|
||||
Vec::with_capacity(db_messages.len() + outgoing_messages.len());
|
||||
for m in db_messages.into_iter() {
|
||||
let server_id = m.id.clone();
|
||||
let mut dm: Message = m.into();
|
||||
@@ -521,7 +548,7 @@ impl Daemon {
|
||||
|
||||
async fn sync_conversation_list(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
signal_sender: &Sender<Signal>,
|
||||
signal_sender: &broadcast::Sender<Signal>,
|
||||
) -> Result<()> {
|
||||
log::info!(target: target::SYNC, "Starting list conversation sync");
|
||||
|
||||
@@ -573,7 +600,7 @@ impl Daemon {
|
||||
}
|
||||
|
||||
// Send conversations updated signal
|
||||
signal_sender.send(Signal::ConversationsUpdated).await?;
|
||||
Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC);
|
||||
|
||||
log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations);
|
||||
Ok(())
|
||||
@@ -581,7 +608,7 @@ impl Daemon {
|
||||
|
||||
async fn sync_all_conversations_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
signal_sender: &Sender<Signal>,
|
||||
signal_sender: &broadcast::Sender<Signal>,
|
||||
) -> Result<()> {
|
||||
log::info!(target: target::SYNC, "Starting full conversation sync");
|
||||
|
||||
@@ -609,7 +636,7 @@ impl Daemon {
|
||||
}
|
||||
|
||||
// Send conversations updated signal.
|
||||
signal_sender.send(Signal::ConversationsUpdated).await?;
|
||||
Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC);
|
||||
|
||||
log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations);
|
||||
Ok(())
|
||||
@@ -617,7 +644,7 @@ impl Daemon {
|
||||
|
||||
async fn sync_conversation_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
signal_sender: &Sender<Signal>,
|
||||
signal_sender: &broadcast::Sender<Signal>,
|
||||
conversation_id: String,
|
||||
) -> Result<()> {
|
||||
log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id);
|
||||
@@ -675,9 +702,7 @@ impl Daemon {
|
||||
|
||||
// Send messages updated signal, if we actually inserted any messages.
|
||||
if num_messages > 0 {
|
||||
signal_sender
|
||||
.send(Signal::MessagesUpdated(conversation_id.clone()))
|
||||
.await?;
|
||||
Self::send_messages_updated(signal_sender, conversation_id.clone());
|
||||
}
|
||||
|
||||
log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id);
|
||||
@@ -698,14 +723,14 @@ impl Daemon {
|
||||
async fn update_conversation_metadata_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
conversation: Conversation,
|
||||
signal_sender: &Sender<Signal>,
|
||||
signal_sender: &broadcast::Sender<Signal>,
|
||||
) -> Result<()> {
|
||||
log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid);
|
||||
let updated = database
|
||||
.with_repository(|r| r.merge_conversation_metadata(conversation))
|
||||
.await?;
|
||||
if updated {
|
||||
signal_sender.send(Signal::ConversationsUpdated).await?;
|
||||
Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::DAEMON);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -720,6 +745,40 @@ impl Daemon {
|
||||
self.database.with_settings(|s| settings.save(s)).await
|
||||
}
|
||||
|
||||
fn send_signal(sender: &broadcast::Sender<Signal>, signal: Signal, log_target: &str) {
|
||||
if let Err(error) = sender.send(signal) {
|
||||
log::trace!(
|
||||
target: log_target,
|
||||
"Signal delivery skipped (no listeners?): {}",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn send_internal(sender: &broadcast::Sender<Signal>, signal: InternalSignal) {
|
||||
if let Err(error) = sender.send(Signal::Internal(signal)) {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"Internal signal delivery skipped: {}",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn send_messages_updated(sender: &broadcast::Sender<Signal>, conversation_id: String) {
|
||||
Self::send_internal(
|
||||
sender,
|
||||
InternalSignal::MessagesUpdated(conversation_id.clone()),
|
||||
);
|
||||
if let Err(error) = sender.send(Signal::MessagesUpdated(conversation_id)) {
|
||||
log::warn!(
|
||||
target: target::DAEMON,
|
||||
"Failed to send MessagesUpdated signal: {}",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_client_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
) -> Result<HTTPAPIClient<DatabaseAuthenticationStore>> {
|
||||
@@ -752,9 +811,11 @@ impl Daemon {
|
||||
})
|
||||
.await?;
|
||||
|
||||
self.signal_sender
|
||||
.send(Signal::ConversationsUpdated)
|
||||
.await?;
|
||||
Self::send_signal(
|
||||
&self.signal_sender,
|
||||
Signal::ConversationsUpdated,
|
||||
target::SYNC,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -25,8 +25,7 @@ impl Attachment {
|
||||
// Prefer common, user-friendly extensions over obscure ones
|
||||
match normalized {
|
||||
"image/jpeg" | "image/pjpeg" => Some("jpg"),
|
||||
_ => mime_guess::get_mime_extensions_str(normalized)
|
||||
.and_then(|list| {
|
||||
_ => mime_guess::get_mime_extensions_str(normalized).and_then(|list| {
|
||||
// If jpg is one of the candidates, prefer it
|
||||
if list.iter().any(|e| *e == "jpg") {
|
||||
Some("jpg")
|
||||
|
||||
288
core/kordophoned/src/daemon/notifier.rs
Normal file
288
core/kordophoned/src/daemon/notifier.rs
Normal file
@@ -0,0 +1,288 @@
|
||||
use super::contact_resolver::{ContactResolver, DefaultContactResolverBackend};
|
||||
use super::models::message::Participant;
|
||||
use super::signals::{InternalSignal, Signal};
|
||||
use super::{target, Message};
|
||||
|
||||
use kordophone_db::{
|
||||
database::{Database, DatabaseAccess},
|
||||
models::Conversation,
|
||||
models::Participant as DbParticipant,
|
||||
};
|
||||
use notify::Notification;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{broadcast, Mutex};
|
||||
|
||||
/// Centralised notification helper used by platform transports (D-Bus, XPC, …).
|
||||
pub struct NotificationService {
|
||||
resolver: Mutex<ContactResolver<DefaultContactResolverBackend>>,
|
||||
}
|
||||
|
||||
impl NotificationService {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
resolver: Mutex::new(ContactResolver::new(
|
||||
DefaultContactResolverBackend::default(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn listen(
|
||||
self: Arc<Self>,
|
||||
mut signal_rx: broadcast::Receiver<Signal>,
|
||||
database: Arc<Mutex<Database>>,
|
||||
) {
|
||||
log::trace!(target: target::DAEMON, "NotificationService listener started");
|
||||
loop {
|
||||
match signal_rx.recv().await {
|
||||
Ok(Signal::Internal(InternalSignal::MessagesUpdated(conversation_id))) => {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService received MessagesUpdated for {}",
|
||||
conversation_id
|
||||
);
|
||||
self.notify_new_messages(&database, &conversation_id).await;
|
||||
}
|
||||
Ok(Signal::Internal(InternalSignal::TestNotification { summary, body })) => {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService received TestNotification"
|
||||
);
|
||||
if let Err(error) = self.send_manual(&summary, &body) {
|
||||
log::warn!(
|
||||
target: target::DAEMON,
|
||||
"Failed to display test notification: {}",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(other) => {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService ignoring signal: {:?}",
|
||||
other
|
||||
);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
log::warn!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService lagged; skipped {} signals",
|
||||
skipped
|
||||
);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
log::trace!(target: target::DAEMON, "NotificationService listener exiting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether a new user-visible notification should be shown for the
|
||||
/// given conversation and displays it if appropriate.
|
||||
pub async fn notify_new_messages(
|
||||
&self,
|
||||
database: &Arc<Mutex<Database>>,
|
||||
conversation_id: &str,
|
||||
) {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService preparing payload for {}",
|
||||
conversation_id
|
||||
);
|
||||
if let Some((summary, body)) = self.prepare_payload(database, conversation_id).await {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService displaying notification for {}",
|
||||
conversation_id
|
||||
);
|
||||
if let Err(error) = self.show_notification(&summary, &body) {
|
||||
log::warn!(
|
||||
target: target::DAEMON,
|
||||
"Failed to display notification for conversation {}: {}",
|
||||
conversation_id,
|
||||
error
|
||||
);
|
||||
}
|
||||
} else {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService skipping notification for {}",
|
||||
conversation_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Displays a manual test notification.
|
||||
pub fn send_manual(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService sending manual notification"
|
||||
);
|
||||
self.show_notification(summary, body)
|
||||
}
|
||||
|
||||
async fn prepare_payload(
|
||||
&self,
|
||||
database: &Arc<Mutex<Database>>,
|
||||
conversation_id: &str,
|
||||
) -> Option<(String, String)> {
|
||||
let conversation_opt = database
|
||||
.lock()
|
||||
.await
|
||||
.with_repository(|r| r.get_conversation_by_guid(conversation_id))
|
||||
.await;
|
||||
|
||||
let conversation = match conversation_opt {
|
||||
Ok(Some(conv)) => conv,
|
||||
Ok(None) => {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService: conversation {} not found",
|
||||
conversation_id
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Err(err) => {
|
||||
log::warn!(
|
||||
target: target::DAEMON,
|
||||
"Notification lookup failed for conversation {}: {}",
|
||||
conversation_id,
|
||||
err
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
if conversation.unread_count == 0 {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService: conversation {} has no unread messages",
|
||||
conversation_id
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
let last_message_opt = database
|
||||
.lock()
|
||||
.await
|
||||
.with_repository(|r| r.get_last_message_for_conversation(conversation_id))
|
||||
.await;
|
||||
|
||||
let last_message: Message = match last_message_opt {
|
||||
Ok(Some(message)) => message.into(),
|
||||
Ok(None) => {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService: conversation {} has no messages",
|
||||
conversation_id
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Err(err) => {
|
||||
log::warn!(
|
||||
target: target::DAEMON,
|
||||
"Notification lookup failed for conversation {}: {}",
|
||||
conversation_id,
|
||||
err
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
if matches!(last_message.sender, Participant::Me) {
|
||||
log::trace!(
|
||||
target: target::DAEMON,
|
||||
"NotificationService: last message in {} was sent by self",
|
||||
conversation_id
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut resolver = self.resolver.lock().await;
|
||||
let summary = self.conversation_display_name(&conversation, &mut resolver);
|
||||
let sender_display_name =
|
||||
self.resolve_participant_display_name(&last_message.sender, &mut resolver);
|
||||
|
||||
let mut message_text = last_message.text.replace('\u{FFFC}', "");
|
||||
if message_text.trim().is_empty() {
|
||||
if !last_message.attachments.is_empty() {
|
||||
message_text = "Sent an attachment".to_string();
|
||||
} else {
|
||||
message_text = "Sent a message".to_string();
|
||||
}
|
||||
}
|
||||
|
||||
let body = if sender_display_name.is_empty() {
|
||||
message_text
|
||||
} else {
|
||||
format!("{}: {}", sender_display_name, message_text)
|
||||
};
|
||||
|
||||
Some((summary, body))
|
||||
}
|
||||
|
||||
fn conversation_display_name(
|
||||
&self,
|
||||
conversation: &Conversation,
|
||||
resolver: &mut ContactResolver<DefaultContactResolverBackend>,
|
||||
) -> String {
|
||||
if let Some(display_name) = &conversation.display_name {
|
||||
if !display_name.trim().is_empty() {
|
||||
return display_name.clone();
|
||||
}
|
||||
}
|
||||
|
||||
let names: Vec<String> = conversation
|
||||
.participants
|
||||
.iter()
|
||||
.filter_map(|participant| match participant {
|
||||
DbParticipant::Me => None,
|
||||
DbParticipant::Remote { handle, contact_id } => {
|
||||
if let Some(contact_id) = contact_id {
|
||||
Some(
|
||||
resolver
|
||||
.get_contact_display_name(contact_id)
|
||||
.unwrap_or_else(|| handle.clone()),
|
||||
)
|
||||
} else {
|
||||
Some(handle.clone())
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if names.is_empty() {
|
||||
"Kordophone".to_string()
|
||||
} else {
|
||||
names.join(", ")
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_participant_display_name(
|
||||
&self,
|
||||
participant: &Participant,
|
||||
resolver: &mut ContactResolver<DefaultContactResolverBackend>,
|
||||
) -> String {
|
||||
match participant {
|
||||
Participant::Me => "".to_string(),
|
||||
Participant::Remote { handle, contact_id } => {
|
||||
if let Some(contact_id) = contact_id {
|
||||
resolver
|
||||
.get_contact_display_name(contact_id)
|
||||
.unwrap_or_else(|| handle.clone())
|
||||
} else {
|
||||
handle.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn show_notification(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> {
|
||||
Notification::new()
|
||||
.appname("Kordophone")
|
||||
.summary(summary)
|
||||
.body(body)
|
||||
.show()
|
||||
.map(|_| ())
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,12 @@
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum InternalSignal {
|
||||
/// Notification that new messages are available for a conversation.
|
||||
MessagesUpdated(String),
|
||||
|
||||
/// Manual test notification request.
|
||||
TestNotification { summary: String, body: String },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Signal {
|
||||
/// Emitted when the list of conversations is updated.
|
||||
@@ -21,4 +30,7 @@ pub enum Signal {
|
||||
|
||||
/// Emitted when the update stream is reconnected after a timeout or configuration change.
|
||||
UpdateStreamReconnected,
|
||||
|
||||
/// Internal-only signals consumed by daemon components.
|
||||
Internal(InternalSignal),
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use dbus::arg;
|
||||
use dbus_tree::MethodErr;
|
||||
use std::sync::Arc;
|
||||
use std::{future::Future, thread};
|
||||
use tokio::sync::{mpsc, oneshot, Mutex};
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
|
||||
|
||||
use kordophoned::daemon::{
|
||||
contact_resolver::{ContactResolver, DefaultContactResolverBackend},
|
||||
@@ -22,12 +22,15 @@ use dbus_tokio::connection;
|
||||
#[derive(Clone)]
|
||||
pub struct DBusAgent {
|
||||
event_sink: mpsc::Sender<Event>,
|
||||
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>,
|
||||
signal_receiver: Arc<Mutex<Option<broadcast::Receiver<Signal>>>>,
|
||||
contact_resolver: ContactResolver<DefaultContactResolverBackend>,
|
||||
}
|
||||
|
||||
impl DBusAgent {
|
||||
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self {
|
||||
pub fn new(
|
||||
event_sink: mpsc::Sender<Event>,
|
||||
signal_receiver: broadcast::Receiver<Signal>,
|
||||
) -> Self {
|
||||
Self {
|
||||
event_sink,
|
||||
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
|
||||
@@ -75,8 +78,9 @@ impl DBusAgent {
|
||||
.take()
|
||||
.expect("Signal receiver already taken");
|
||||
|
||||
while let Some(signal) = receiver.recv().await {
|
||||
match signal {
|
||||
loop {
|
||||
match receiver.recv().await {
|
||||
Ok(signal) => match signal {
|
||||
Signal::ConversationsUpdated => {
|
||||
log::debug!("Sending signal: ConversationsUpdated");
|
||||
registry
|
||||
@@ -150,6 +154,20 @@ impl DBusAgent {
|
||||
0
|
||||
});
|
||||
}
|
||||
Signal::Internal(_) => {
|
||||
log::trace!("Ignoring internal signal for D-Bus transport");
|
||||
}
|
||||
},
|
||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
log::warn!(
|
||||
"Signal receiver lagged; skipped {} daemon signals",
|
||||
skipped
|
||||
);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
log::warn!("Signal channel closed; stopping D-Bus forwarding");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -398,6 +416,13 @@ impl DbusRepository for DBusAgent {
|
||||
.map(|uuid| uuid.to_string())
|
||||
}
|
||||
|
||||
fn test_notification(&mut self, summary: String, body: String) -> Result<(), MethodErr> {
|
||||
match self.send_event_sync(|r| Event::TestNotification(summary, body, r))? {
|
||||
Ok(()) => Ok(()),
|
||||
Err(message) => Err(MethodErr::failed(&message)),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_attachment_info(
|
||||
&mut self,
|
||||
attachment_id: String,
|
||||
|
||||
@@ -26,7 +26,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
|
||||
use dbus::agent::DBusAgent;
|
||||
|
||||
// Start the D-Bus agent (events in, signals out).
|
||||
let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
|
||||
let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals());
|
||||
tokio::spawn(async move {
|
||||
agent.run().await;
|
||||
});
|
||||
@@ -35,8 +35,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
|
||||
#[cfg(target_os = "macos")]
|
||||
async fn start_ipc_agent(daemon: &mut Daemon) {
|
||||
// Start the macOS XPC agent (events in, signals out) on a dedicated thread.
|
||||
let agent =
|
||||
xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
|
||||
let agent = xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals());
|
||||
std::thread::spawn(move || {
|
||||
// Use a single-threaded Tokio runtime for the XPC agent.
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::ffi::CString;
|
||||
use std::os::raw::c_char;
|
||||
use std::ptr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, oneshot, Mutex};
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
|
||||
use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
|
||||
use xpc_connection_sys as xpc_sys;
|
||||
|
||||
@@ -22,11 +22,14 @@ type Subscribers = Arc<std::sync::Mutex<Vec<XpcConn>>>;
|
||||
#[derive(Clone)]
|
||||
pub struct XpcAgent {
|
||||
event_sink: mpsc::Sender<Event>,
|
||||
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>,
|
||||
signal_receiver: Arc<Mutex<Option<broadcast::Receiver<Signal>>>>,
|
||||
}
|
||||
|
||||
impl XpcAgent {
|
||||
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self {
|
||||
pub fn new(
|
||||
event_sink: mpsc::Sender<Event>,
|
||||
signal_receiver: broadcast::Receiver<Signal>,
|
||||
) -> Self {
|
||||
Self {
|
||||
event_sink,
|
||||
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
|
||||
@@ -71,7 +74,31 @@ impl XpcAgent {
|
||||
.await
|
||||
.take()
|
||||
.expect("Signal receiver already taken");
|
||||
while let Some(signal) = receiver.recv().await {
|
||||
loop {
|
||||
let signal = match receiver.recv().await {
|
||||
Ok(signal) => signal,
|
||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
log::warn!(
|
||||
target: LOG_TARGET,
|
||||
"XPC agent lagged; skipped {} signals",
|
||||
skipped
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
log::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Signal channel closed; stopping XPC forwarding"
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if matches!(signal, Signal::Internal(_)) {
|
||||
log::trace!(target: LOG_TARGET, "Skipping internal signal for XPC");
|
||||
continue;
|
||||
}
|
||||
|
||||
log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
|
||||
let msg = super::util::signal_to_message(signal);
|
||||
let xobj = message_to_xpc_object(msg);
|
||||
|
||||
@@ -15,10 +15,16 @@ pub struct DispatchResult {
|
||||
|
||||
impl DispatchResult {
|
||||
pub fn new(message: Message) -> Self {
|
||||
Self { message, cleanup: None }
|
||||
Self {
|
||||
message,
|
||||
cleanup: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_cleanup<T: Any + Send + 'static>(message: Message, cleanup: T) -> Self {
|
||||
Self { message, cleanup: Some(Box::new(cleanup)) }
|
||||
Self {
|
||||
message,
|
||||
cleanup: Some(Box::new(cleanup)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,7 +105,12 @@ pub async fn dispatch(
|
||||
.and_then(|m| dict_get_str(m, "conversation_id"))
|
||||
{
|
||||
Some(id) => id,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing conversation_id",
|
||||
))
|
||||
}
|
||||
};
|
||||
match agent
|
||||
.send_event(|r| Event::SyncConversation(conversation_id, r))
|
||||
@@ -122,7 +127,12 @@ pub async fn dispatch(
|
||||
.and_then(|m| dict_get_str(m, "conversation_id"))
|
||||
{
|
||||
Some(id) => id,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing conversation_id",
|
||||
))
|
||||
}
|
||||
};
|
||||
match agent
|
||||
.send_event(|r| Event::MarkConversationAsRead(conversation_id, r))
|
||||
@@ -137,11 +147,21 @@ pub async fn dispatch(
|
||||
"GetMessages" => {
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing arguments",
|
||||
))
|
||||
}
|
||||
};
|
||||
let conversation_id = match dict_get_str(args, "conversation_id") {
|
||||
Some(id) => id,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing conversation_id",
|
||||
))
|
||||
}
|
||||
};
|
||||
let last_message_id = dict_get_str(args, "last_message_id");
|
||||
match agent
|
||||
@@ -158,11 +178,8 @@ pub async fn dispatch(
|
||||
dict_put_str(&mut m, "sender", &msg.sender.display_name());
|
||||
|
||||
// Include attachment GUIDs for the client to resolve/download
|
||||
let attachment_guids: Vec<String> = msg
|
||||
.attachments
|
||||
.iter()
|
||||
.map(|a| a.guid.clone())
|
||||
.collect();
|
||||
let attachment_guids: Vec<String> =
|
||||
msg.attachments.iter().map(|a| a.guid.clone()).collect();
|
||||
m.insert(cstr("attachment_guids"), array_from_strs(attachment_guids));
|
||||
|
||||
// Full attachments array with metadata (mirrors DBus fields)
|
||||
@@ -193,12 +210,23 @@ pub async fn dispatch(
|
||||
if let Some(attribution_info) = &metadata.attribution_info {
|
||||
let mut attribution_map: XpcMap = HashMap::new();
|
||||
if let Some(width) = attribution_info.width {
|
||||
dict_put_i64_as_str(&mut attribution_map, "width", width as i64);
|
||||
dict_put_i64_as_str(
|
||||
&mut attribution_map,
|
||||
"width",
|
||||
width as i64,
|
||||
);
|
||||
}
|
||||
if let Some(height) = attribution_info.height {
|
||||
dict_put_i64_as_str(&mut attribution_map, "height", height as i64);
|
||||
dict_put_i64_as_str(
|
||||
&mut attribution_map,
|
||||
"height",
|
||||
height as i64,
|
||||
);
|
||||
}
|
||||
metadata_map.insert(cstr("attribution_info"), Message::Dictionary(attribution_map));
|
||||
metadata_map.insert(
|
||||
cstr("attribution_info"),
|
||||
Message::Dictionary(attribution_map),
|
||||
);
|
||||
}
|
||||
if !metadata_map.is_empty() {
|
||||
a.insert(cstr("metadata"), Message::Dictionary(metadata_map));
|
||||
@@ -230,11 +258,21 @@ pub async fn dispatch(
|
||||
"SendMessage" => {
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing arguments",
|
||||
))
|
||||
}
|
||||
};
|
||||
let conversation_id = match dict_get_str(args, "conversation_id") {
|
||||
Some(v) => v,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing conversation_id")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing conversation_id",
|
||||
))
|
||||
}
|
||||
};
|
||||
let text = dict_get_str(args, "text").unwrap_or_default();
|
||||
let attachment_guids: Vec<String> = match args.get(&cstr("attachment_guids")) {
|
||||
@@ -265,11 +303,21 @@ pub async fn dispatch(
|
||||
"GetAttachmentInfo" => {
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing arguments",
|
||||
))
|
||||
}
|
||||
};
|
||||
let attachment_id = match dict_get_str(args, "attachment_id") {
|
||||
Some(v) => v,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing attachment_id",
|
||||
))
|
||||
}
|
||||
};
|
||||
match agent
|
||||
.send_event(|r| Event::GetAttachment(attachment_id, r))
|
||||
@@ -308,11 +356,21 @@ pub async fn dispatch(
|
||||
"OpenAttachmentFd" => {
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing arguments",
|
||||
))
|
||||
}
|
||||
};
|
||||
let attachment_id = match dict_get_str(args, "attachment_id") {
|
||||
Some(v) => v,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing attachment_id",
|
||||
))
|
||||
}
|
||||
};
|
||||
let preview = dict_get_str(args, "preview")
|
||||
.map(|s| s == "true")
|
||||
@@ -335,9 +393,14 @@ pub async fn dispatch(
|
||||
dict_put_str(&mut reply, "type", "OpenAttachmentFdResponse");
|
||||
reply.insert(cstr("fd"), Message::Fd(fd));
|
||||
|
||||
DispatchResult { message: Message::Dictionary(reply), cleanup: Some(Box::new(file)) }
|
||||
DispatchResult {
|
||||
message: Message::Dictionary(reply),
|
||||
cleanup: Some(Box::new(file)),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
DispatchResult::new(make_error_reply("OpenFailed", &format!("{}", e)))
|
||||
}
|
||||
Err(e) => DispatchResult::new(make_error_reply("OpenFailed", &format!("{}", e))),
|
||||
}
|
||||
}
|
||||
Err(e) => DispatchResult::new(make_error_reply("DaemonError", &format!("{}", e))),
|
||||
@@ -348,11 +411,21 @@ pub async fn dispatch(
|
||||
"DownloadAttachment" => {
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing arguments",
|
||||
))
|
||||
}
|
||||
};
|
||||
let attachment_id = match dict_get_str(args, "attachment_id") {
|
||||
Some(v) => v,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing attachment_id")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing attachment_id",
|
||||
))
|
||||
}
|
||||
};
|
||||
let preview = dict_get_str(args, "preview")
|
||||
.map(|s| s == "true")
|
||||
@@ -371,11 +444,18 @@ pub async fn dispatch(
|
||||
use std::path::PathBuf;
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing arguments",
|
||||
))
|
||||
}
|
||||
};
|
||||
let path = match dict_get_str(args, "path") {
|
||||
Some(v) => v,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing path")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply("InvalidRequest", "Missing path"))
|
||||
}
|
||||
};
|
||||
match agent
|
||||
.send_event(|r| Event::UploadAttachment(PathBuf::from(path), r))
|
||||
@@ -413,7 +493,12 @@ pub async fn dispatch(
|
||||
"UpdateSettings" => {
|
||||
let args = match get_dictionary_field(root, "arguments") {
|
||||
Some(a) => a,
|
||||
None => return DispatchResult::new(make_error_reply("InvalidRequest", "Missing arguments")),
|
||||
None => {
|
||||
return DispatchResult::new(make_error_reply(
|
||||
"InvalidRequest",
|
||||
"Missing arguments",
|
||||
))
|
||||
}
|
||||
};
|
||||
let server_url = dict_get_str(args, "server_url");
|
||||
let username = dict_get_str(args, "username");
|
||||
|
||||
@@ -146,8 +146,7 @@ impl ClientCli {
|
||||
|
||||
loop {
|
||||
match stream.next().await.unwrap() {
|
||||
Ok(update) => {
|
||||
match update {
|
||||
Ok(update) => match update {
|
||||
SocketUpdate::Update(updates) => {
|
||||
for update in updates {
|
||||
println!("Got update: {:?}", update);
|
||||
@@ -156,7 +155,6 @@ impl ClientCli {
|
||||
SocketUpdate::Pong => {
|
||||
println!("Pong");
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
Err(e) => {
|
||||
|
||||
@@ -209,4 +209,9 @@ impl DaemonInterface for DBusDaemonInterface {
|
||||
KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {}", e))
|
||||
}
|
||||
|
||||
async fn test_notification(&mut self, summary: String, body: String) -> Result<()> {
|
||||
KordophoneRepository::test_notification(&self.proxy(), &summary, &body)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to trigger test notification: {}", e))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ pub trait DaemonInterface {
|
||||
async fn download_attachment(&mut self, attachment_id: String) -> Result<()>;
|
||||
async fn upload_attachment(&mut self, path: String) -> Result<()>;
|
||||
async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()>;
|
||||
async fn test_notification(&mut self, summary: String, body: String) -> Result<()>;
|
||||
}
|
||||
|
||||
struct StubDaemonInterface;
|
||||
@@ -112,6 +113,11 @@ impl DaemonInterface for StubDaemonInterface {
|
||||
"Daemon interface not implemented on this platform"
|
||||
))
|
||||
}
|
||||
async fn test_notification(&mut self, _summary: String, _body: String) -> Result<()> {
|
||||
Err(anyhow::anyhow!(
|
||||
"Daemon interface not implemented on this platform"
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_daemon_interface() -> Result<Box<dyn DaemonInterface>> {
|
||||
@@ -175,6 +181,9 @@ pub enum Commands {
|
||||
|
||||
/// Marks a conversation as read.
|
||||
MarkConversationAsRead { conversation_id: String },
|
||||
|
||||
/// Displays a test notification using the daemon.
|
||||
TestNotification { summary: String, body: String },
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
@@ -219,6 +228,9 @@ impl Commands {
|
||||
Commands::MarkConversationAsRead { conversation_id } => {
|
||||
client.mark_conversation_as_read(conversation_id).await
|
||||
}
|
||||
Commands::TestNotification { summary, body } => {
|
||||
client.test_notification(summary, body).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use std::env;
|
||||
use std::process;
|
||||
|
||||
use kordophone::{
|
||||
api::{HTTPAPIClient, InMemoryAuthenticationStore, EventSocket},
|
||||
model::{ConversationID, event::EventData},
|
||||
APIInterface,
|
||||
};
|
||||
use kordophone::api::http_client::Credentials;
|
||||
use kordophone::api::AuthenticationStore;
|
||||
use kordophone::api::http_client::Credentials;
|
||||
use kordophone::{
|
||||
APIInterface,
|
||||
api::{EventSocket, HTTPAPIClient, InMemoryAuthenticationStore},
|
||||
model::{ConversationID, event::EventData},
|
||||
};
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use hyper::Uri;
|
||||
@@ -18,7 +18,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
let args: Vec<String> = env::args().collect();
|
||||
if args.len() < 2 {
|
||||
eprintln!("Usage: {} <conversation_id1> [conversation_id2] [conversation_id3] ...", args[0]);
|
||||
eprintln!(
|
||||
"Usage: {} <conversation_id1> [conversation_id2] [conversation_id3] ...",
|
||||
args[0]
|
||||
);
|
||||
eprintln!("Environment variables required:");
|
||||
eprintln!(" KORDOPHONE_API_URL - Server URL");
|
||||
eprintln!(" KORDOPHONE_USERNAME - Username for authentication");
|
||||
@@ -40,12 +43,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let credentials = Credentials { username, password };
|
||||
|
||||
// Collect all conversation IDs from command line arguments
|
||||
let target_conversation_ids: Vec<ConversationID> = args[1..].iter()
|
||||
.map(|id| id.clone())
|
||||
.collect();
|
||||
let target_conversation_ids: Vec<ConversationID> =
|
||||
args[1..].iter().map(|id| id.clone()).collect();
|
||||
|
||||
println!("Monitoring {} conversation(s) for updates: {:?}",
|
||||
target_conversation_ids.len(), target_conversation_ids);
|
||||
println!(
|
||||
"Monitoring {} conversation(s) for updates: {:?}",
|
||||
target_conversation_ids.len(),
|
||||
target_conversation_ids
|
||||
);
|
||||
|
||||
let auth_store = InMemoryAuthenticationStore::new(Some(credentials.clone()));
|
||||
let mut client = HTTPAPIClient::new(server_url, auth_store);
|
||||
@@ -62,26 +67,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
match event_result {
|
||||
Ok(socket_event) => {
|
||||
match socket_event {
|
||||
kordophone::api::event_socket::SocketEvent::Update(event) => {
|
||||
match event.data {
|
||||
kordophone::api::event_socket::SocketEvent::Update(event) => match event.data {
|
||||
EventData::MessageReceived(conversation, _message) => {
|
||||
if target_conversation_ids.contains(&conversation.guid) {
|
||||
println!("Message update detected for conversation {}, marking as read...", conversation.guid);
|
||||
println!(
|
||||
"Message update detected for conversation {}, marking as read...",
|
||||
conversation.guid
|
||||
);
|
||||
match client.mark_conversation_as_read(&conversation.guid).await {
|
||||
Ok(_) => println!("Successfully marked conversation {} as read", conversation.guid),
|
||||
Err(e) => eprintln!("Failed to mark conversation {} as read: {:?}", conversation.guid, e),
|
||||
Ok(_) => println!(
|
||||
"Successfully marked conversation {} as read",
|
||||
conversation.guid
|
||||
),
|
||||
Err(e) => eprintln!(
|
||||
"Failed to mark conversation {} as read: {:?}",
|
||||
conversation.guid, e
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
kordophone::api::event_socket::SocketEvent::Pong => {
|
||||
// Ignore pong messages
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error receiving event: {:?}", e);
|
||||
break;
|
||||
|
||||
Reference in New Issue
Block a user