Implements mark as read
This commit is contained in:
@@ -7,4 +7,8 @@ pub mod settings;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub mod target {
|
||||
pub static REPOSITORY: &str = "repository";
|
||||
}
|
||||
|
||||
pub use repository::Repository;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::models::participant::Participant;
|
||||
use crate::models::{message::Message, participant::Participant};
|
||||
use chrono::{DateTime, NaiveDateTime};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -27,6 +27,36 @@ impl Conversation {
|
||||
display_name: self.display_name.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge(&self, other: &Conversation, last_message: Option<&Message>) -> Conversation {
|
||||
let mut new_conversation = self.clone();
|
||||
new_conversation.unread_count = other.unread_count;
|
||||
new_conversation.participants = other.participants.clone();
|
||||
new_conversation.display_name = other.display_name.clone();
|
||||
|
||||
if let Some(last_message) = last_message {
|
||||
if last_message.date > self.date {
|
||||
new_conversation.date = last_message.date;
|
||||
}
|
||||
|
||||
if !last_message.text.is_empty() && !last_message.text.trim().is_empty() {
|
||||
new_conversation.last_message_preview = Some(last_message.text.clone());
|
||||
}
|
||||
}
|
||||
|
||||
new_conversation
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Conversation {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.guid == other.guid &&
|
||||
self.unread_count == other.unread_count &&
|
||||
self.display_name == other.display_name &&
|
||||
self.last_message_preview == other.last_message_preview &&
|
||||
self.date == other.date &&
|
||||
self.participants == other.participants
|
||||
}
|
||||
}
|
||||
|
||||
impl From<kordophone::model::Conversation> for Conversation {
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::{
|
||||
Conversation, Message, Participant,
|
||||
},
|
||||
schema,
|
||||
target,
|
||||
};
|
||||
|
||||
pub struct Repository<'a> {
|
||||
@@ -323,25 +324,35 @@ impl<'a> Repository<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn merge_conversation_metadata(&mut self, in_conversation: Conversation) -> Result<bool> {
|
||||
let mut updated = false;
|
||||
let conversation = self.get_conversation_by_guid(&in_conversation.guid)?;
|
||||
if let Some(conversation) = conversation {
|
||||
let merged_conversation = conversation.merge(&in_conversation, None);
|
||||
|
||||
if merged_conversation != conversation {
|
||||
self.insert_conversation(merged_conversation)?;
|
||||
updated = true;
|
||||
}
|
||||
}
|
||||
|
||||
log::debug!(target: target::REPOSITORY, "Merged conversation metadata: {} updated: {}", in_conversation.guid, updated);
|
||||
Ok(updated)
|
||||
}
|
||||
|
||||
fn update_conversation_metadata(&mut self, conversation_guid: &str) -> Result<()> {
|
||||
let conversation = self.get_conversation_by_guid(conversation_guid)?;
|
||||
if let Some(mut conversation) = conversation {
|
||||
if let Some(conversation) = conversation {
|
||||
if let Some(last_message) = self.get_last_message_for_conversation(conversation_guid)? {
|
||||
log::debug!(
|
||||
target: target::REPOSITORY,
|
||||
"Updating conversation metadata: {} message: {:?}",
|
||||
conversation_guid,
|
||||
last_message
|
||||
);
|
||||
|
||||
if last_message.date > conversation.date {
|
||||
conversation.date = last_message.date;
|
||||
}
|
||||
|
||||
if !last_message.text.is_empty() && !last_message.text.trim().is_empty() {
|
||||
conversation.last_message_preview = Some(last_message.text.clone());
|
||||
}
|
||||
|
||||
self.insert_conversation(conversation)?;
|
||||
let merged_conversation = conversation.merge(&conversation, Some(&last_message));
|
||||
self.insert_conversation(merged_conversation)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -271,6 +271,13 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
|
||||
Ok(token)
|
||||
}
|
||||
|
||||
async fn mark_conversation_as_read(&mut self, conversation_id: &ConversationID) -> Result<(), Self::Error> {
|
||||
// SERVER JANK: This should be POST, but it's GET for some reason.
|
||||
let endpoint = format!("markConversation?guid={}", conversation_id);
|
||||
self.response_with_body_retry(&endpoint, Method::GET, Body::empty, true).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_messages(
|
||||
&mut self,
|
||||
conversation_id: &ConversationID,
|
||||
|
||||
@@ -64,6 +64,9 @@ pub trait APIInterface {
|
||||
// (POST) /authenticate
|
||||
async fn authenticate(&mut self, credentials: Credentials) -> Result<JwtToken, Self::Error>;
|
||||
|
||||
// (GET) /markConversation
|
||||
async fn mark_conversation_as_read(&mut self, conversation_id: &ConversationID) -> Result<(), Self::Error>;
|
||||
|
||||
// (WS) /updates
|
||||
async fn open_event_socket(
|
||||
&mut self,
|
||||
|
||||
@@ -148,4 +148,8 @@ impl APIInterface for TestClient {
|
||||
{
|
||||
Ok(String::from("test"))
|
||||
}
|
||||
|
||||
async fn mark_conversation_as_read(&mut self, conversation_id: &ConversationID) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,12 @@
|
||||
value="Initiates a background sync of a single conversation with the server."/>
|
||||
</method>
|
||||
|
||||
<method name="MarkConversationAsRead">
|
||||
<arg type="s" name="conversation_id" direction="in"/>
|
||||
<annotation name="org.freedesktop.DBus.DocString"
|
||||
value="Marks a conversation as read."/>
|
||||
</method>
|
||||
|
||||
<signal name="ConversationsUpdated">
|
||||
<annotation name="org.freedesktop.DBus.DocString"
|
||||
value="Emitted when the list of conversations is updated."/>
|
||||
|
||||
@@ -26,6 +26,12 @@ pub enum Event {
|
||||
/// Asynchronous event for syncing a single conversation with the server.
|
||||
SyncConversation(String, Reply<()>),
|
||||
|
||||
/// Asynchronous event for marking a conversation as read.
|
||||
MarkConversationAsRead(String, Reply<()>),
|
||||
|
||||
/// Asynchronous event for updating the metadata for a conversation.
|
||||
UpdateConversationMetadata(Conversation, Reply<()>),
|
||||
|
||||
/// Sent when the update stream is reconnected after a timeout or configuration change.
|
||||
UpdateStreamReconnected,
|
||||
|
||||
|
||||
@@ -63,6 +63,7 @@ pub mod target {
|
||||
pub static SETTINGS: &str = "settings";
|
||||
pub static UPDATES: &str = "updates";
|
||||
pub static ATTACHMENTS: &str = "attachments";
|
||||
pub static DAEMON: &str = "daemon";
|
||||
}
|
||||
|
||||
pub struct Daemon {
|
||||
@@ -221,6 +222,31 @@ impl Daemon {
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::MarkConversationAsRead(conversation_id, reply) => {
|
||||
let mut db_clone = self.database.clone();
|
||||
self.runtime.spawn(async move {
|
||||
let result = Self::mark_conversation_as_read_impl(&mut db_clone, conversation_id).await;
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::DAEMON, "Error handling mark conversation as read event: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::UpdateConversationMetadata(conversation, reply) => {
|
||||
let mut db_clone = self.database.clone();
|
||||
let signal_sender = self.signal_sender.clone();
|
||||
self.runtime.spawn(async move {
|
||||
let result = Self::update_conversation_metadata_impl(&mut db_clone, conversation, &signal_sender).await;
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::DAEMON, "Error handling update conversation metadata event: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::UpdateStreamReconnected => {
|
||||
log::info!(target: target::UPDATES, "Update stream reconnected");
|
||||
|
||||
@@ -590,6 +616,33 @@ impl Daemon {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_conversation_as_read_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
conversation_id: String,
|
||||
) -> Result<()> {
|
||||
log::debug!(target: target::DAEMON, "Marking conversation as read: {}", conversation_id);
|
||||
|
||||
let mut client = Self::get_client_impl(database).await?;
|
||||
client.mark_conversation_as_read(&conversation_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_conversation_metadata_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
conversation: Conversation,
|
||||
signal_sender: &Sender<Signal>,
|
||||
) -> Result<()> {
|
||||
log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid);
|
||||
let updated = database.with_repository(|r| r.merge_conversation_metadata(conversation)).await?;
|
||||
if updated {
|
||||
signal_sender
|
||||
.send(Signal::ConversationsUpdated)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_settings(&mut self) -> Result<Settings> {
|
||||
let settings = self.database.with_settings(Settings::from_db).await?;
|
||||
Ok(settings)
|
||||
|
||||
@@ -65,11 +65,19 @@ impl UpdateMonitor {
|
||||
UpdateEventData::ConversationChanged(conversation) => {
|
||||
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
|
||||
|
||||
// Explicitly update the unread count, we assume this is fresh from the notification.
|
||||
let db_conversation: kordophone_db::models::Conversation = conversation.clone().into();
|
||||
self.send_event(|r| Event::UpdateConversationMetadata(db_conversation, r))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("Failed to send daemon event: {}", e);
|
||||
});
|
||||
|
||||
// Check if we've synced this conversation recently (within 5 seconds)
|
||||
// This is currently a hack/workaround to prevent an infinite loop of sync events, because for some reason
|
||||
// imagent will post a conversation changed notification when we call getMessages.
|
||||
if let Some(last_sync) = self.last_sync_times.get(&conversation.guid) {
|
||||
if last_sync.elapsed() < Duration::from_secs(5) {
|
||||
if last_sync.elapsed() < Duration::from_secs(1) {
|
||||
log::info!(target: target::UPDATES, "Skipping sync for conversation id: {}. Last sync was {} seconds ago.",
|
||||
conversation.guid, last_sync.elapsed().as_secs_f64());
|
||||
return;
|
||||
@@ -85,7 +93,7 @@ impl UpdateMonitor {
|
||||
match (&last_message, &conversation.last_message) {
|
||||
(Some(message), Some(conversation_message)) => {
|
||||
if message.id == conversation_message.guid {
|
||||
log::info!(target: target::UPDATES, "Skipping sync for conversation id: {}. We already have this message.", conversation.guid);
|
||||
log::info!(target: target::UPDATES, "Skipping sync for conversation id: {}. We already have this message.", &conversation.guid);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,6 +233,10 @@ impl DbusRepository for DBusAgent {
|
||||
self.send_event_sync(|r| Event::SyncConversation(conversation_id, r))
|
||||
}
|
||||
|
||||
fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<(), MethodErr> {
|
||||
self.send_event_sync(|r| Event::MarkConversationAsRead(conversation_id, r))
|
||||
}
|
||||
|
||||
fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result<Vec<arg::PropMap>, MethodErr> {
|
||||
let last_message_id_opt = if last_message_id.is_empty() {
|
||||
None
|
||||
|
||||
@@ -52,6 +52,9 @@ pub enum Commands {
|
||||
conversation_id: String,
|
||||
message: String,
|
||||
},
|
||||
|
||||
/// Marks a conversation as read.
|
||||
Mark { conversation_id: String },
|
||||
}
|
||||
|
||||
impl Commands {
|
||||
@@ -67,6 +70,9 @@ impl Commands {
|
||||
conversation_id,
|
||||
message,
|
||||
} => client.send_message(conversation_id, message).await,
|
||||
Commands::Mark { conversation_id } => {
|
||||
client.mark_conversation_as_read(conversation_id).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -163,4 +169,10 @@ impl ClientCli {
|
||||
println!("Message sent: {}", message.guid);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
|
||||
self.api.mark_conversation_as_read(&conversation_id).await?;
|
||||
println!("Conversation marked as read: {}", conversation_id);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +58,9 @@ pub enum Commands {
|
||||
|
||||
/// Uploads an attachment to the server, returns upload guid.
|
||||
UploadAttachment { path: String },
|
||||
|
||||
/// Marks a conversation as read.
|
||||
MarkConversationAsRead { conversation_id: String },
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
@@ -99,6 +102,9 @@ impl Commands {
|
||||
Commands::DownloadAttachment { attachment_id } => {
|
||||
client.download_attachment(attachment_id).await
|
||||
}
|
||||
Commands::MarkConversationAsRead { conversation_id } => {
|
||||
client.mark_conversation_as_read(conversation_id).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -289,4 +295,9 @@ impl DaemonCli {
|
||||
println!("Upload GUID: {}", upload_guid);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> {
|
||||
KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {}", e))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user