Adds the ability to sync just one conversation
This commit is contained in:
@@ -15,8 +15,8 @@
|
|||||||
<annotation name="org.freedesktop.DBus.DocString"
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
value="Array of dictionaries. Each dictionary has keys:
|
value="Array of dictionaries. Each dictionary has keys:
|
||||||
'id' (string): Unique identifier
|
'id' (string): Unique identifier
|
||||||
'title' (string): Display name
|
'display_name' (string): Display name
|
||||||
'last_message' (string): Preview text
|
'last_message_preview' (string): Preview text
|
||||||
'is_unread' (boolean): Unread status
|
'is_unread' (boolean): Unread status
|
||||||
'date' (int64): Date of last message
|
'date' (int64): Date of last message
|
||||||
'participants' (array of strings): List of participants
|
'participants' (array of strings): List of participants
|
||||||
@@ -29,6 +29,12 @@
|
|||||||
value="Initiates a background sync of all conversations with the server."/>
|
value="Initiates a background sync of all conversations with the server."/>
|
||||||
</method>
|
</method>
|
||||||
|
|
||||||
|
<method name="SyncConversation">
|
||||||
|
<arg type="s" name="conversation_id" direction="in"/>
|
||||||
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
|
value="Initiates a background sync of a single conversation with the server."/>
|
||||||
|
</method>
|
||||||
|
|
||||||
<signal name="ConversationsUpdated">
|
<signal name="ConversationsUpdated">
|
||||||
<annotation name="org.freedesktop.DBus.DocString"
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
value="Emitted when the list of conversations is updated."/>
|
value="Emitted when the list of conversations is updated."/>
|
||||||
@@ -43,6 +49,7 @@
|
|||||||
</method>
|
</method>
|
||||||
|
|
||||||
<signal name="MessagesUpdated">
|
<signal name="MessagesUpdated">
|
||||||
|
<arg type="s" name="conversation_id" direction="in"/>
|
||||||
<annotation name="org.freedesktop.DBus.DocString"
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
value="Emitted when the list of messages is updated."/>
|
value="Emitted when the list of messages is updated."/>
|
||||||
</signal>
|
</signal>
|
||||||
|
|||||||
@@ -12,6 +12,9 @@ pub enum Event {
|
|||||||
/// Asynchronous event for syncing all conversations with the server.
|
/// Asynchronous event for syncing all conversations with the server.
|
||||||
SyncAllConversations(Reply<()>),
|
SyncAllConversations(Reply<()>),
|
||||||
|
|
||||||
|
/// Asynchronous event for syncing a single conversation with the server.
|
||||||
|
SyncConversation(String, Reply<()>),
|
||||||
|
|
||||||
/// Returns all known conversations from the database.
|
/// Returns all known conversations from the database.
|
||||||
GetAllConversations(Reply<Vec<Conversation>>),
|
GetAllConversations(Reply<Vec<Conversation>>),
|
||||||
|
|
||||||
|
|||||||
@@ -130,6 +130,19 @@ impl Daemon {
|
|||||||
reply.send(()).unwrap();
|
reply.send(()).unwrap();
|
||||||
},
|
},
|
||||||
|
|
||||||
|
Event::SyncConversation(conversation_id, reply) => {
|
||||||
|
let mut db_clone = self.database.clone();
|
||||||
|
let signal_sender = self.signal_sender.clone();
|
||||||
|
self.runtime.spawn(async move {
|
||||||
|
let result = Self::sync_conversation_impl(&mut db_clone, &signal_sender, conversation_id).await;
|
||||||
|
if let Err(e) = result {
|
||||||
|
log::error!("Error handling sync event: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reply.send(()).unwrap();
|
||||||
|
},
|
||||||
|
|
||||||
Event::GetAllConversations(reply) => {
|
Event::GetAllConversations(reply) => {
|
||||||
let conversations = self.get_conversations().await;
|
let conversations = self.get_conversations().await;
|
||||||
reply.send(conversations).unwrap();
|
reply.send(conversations).unwrap();
|
||||||
@@ -193,24 +206,8 @@ impl Daemon {
|
|||||||
// Insert the conversation
|
// Insert the conversation
|
||||||
database.with_repository(|r| r.insert_conversation(conversation)).await?;
|
database.with_repository(|r| r.insert_conversation(conversation)).await?;
|
||||||
|
|
||||||
// Fetch and sync messages for this conversation
|
// Sync individual conversation.
|
||||||
let last_message_id = database.with_repository(|r| -> Option<String> {
|
Self::sync_conversation_impl(database, signal_sender, conversation_id).await?;
|
||||||
r.get_last_message_for_conversation(&conversation_id)
|
|
||||||
.unwrap_or(None)
|
|
||||||
.map(|m| m.id)
|
|
||||||
}).await;
|
|
||||||
|
|
||||||
log::debug!(target: target::SYNC, "Fetching messages for conversation {}", conversation_id);
|
|
||||||
log::debug!(target: target::SYNC, "Last message id: {:?}", last_message_id);
|
|
||||||
|
|
||||||
let messages = client.get_messages(&conversation_id, None, None, last_message_id).await?;
|
|
||||||
let db_messages: Vec<kordophone_db::models::Message> = messages.into_iter()
|
|
||||||
.map(kordophone_db::models::Message::from)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Insert each message
|
|
||||||
log::debug!(target: target::SYNC, "Inserting {} messages for conversation {}", db_messages.len(), conversation_id);
|
|
||||||
database.with_repository(|r| r.insert_messages(&conversation_id, db_messages)).await?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send conversations updated signal.
|
// Send conversations updated signal.
|
||||||
@@ -220,6 +217,40 @@ impl Daemon {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn sync_conversation_impl(database: &mut Arc<Mutex<Database>>, signal_sender: &Sender<Signal>, conversation_id: String) -> Result<()> {
|
||||||
|
log::info!(target: target::SYNC, "Starting conversation sync for {}", conversation_id);
|
||||||
|
|
||||||
|
let mut client = Self::get_client_impl(database).await?;
|
||||||
|
|
||||||
|
// Fetch and sync messages for this conversation
|
||||||
|
let last_message_id = database.with_repository(|r| -> Option<String> {
|
||||||
|
r.get_last_message_for_conversation(&conversation_id)
|
||||||
|
.unwrap_or(None)
|
||||||
|
.map(|m| m.id)
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
log::debug!(target: target::SYNC, "Fetching messages for conversation {}", &conversation_id);
|
||||||
|
log::debug!(target: target::SYNC, "Last message id: {:?}", last_message_id);
|
||||||
|
|
||||||
|
let messages = client.get_messages(&conversation_id, None, None, last_message_id).await?;
|
||||||
|
let db_messages: Vec<kordophone_db::models::Message> = messages.into_iter()
|
||||||
|
.map(kordophone_db::models::Message::from)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Insert each message
|
||||||
|
let num_messages = db_messages.len();
|
||||||
|
log::debug!(target: target::SYNC, "Inserting {} messages for conversation {}", num_messages, &conversation_id);
|
||||||
|
database.with_repository(|r| r.insert_messages(&conversation_id, db_messages)).await?;
|
||||||
|
|
||||||
|
// Send messages updated signal, if we actually inserted any messages.
|
||||||
|
if num_messages > 0 {
|
||||||
|
signal_sender.send(Signal::MessagesUpdated(conversation_id.clone())).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_settings(&mut self) -> Result<Settings> {
|
async fn get_settings(&mut self) -> Result<Settings> {
|
||||||
let settings = self.database.with_settings(Settings::from_db
|
let settings = self.database.with_settings(Settings::from_db
|
||||||
).await?;
|
).await?;
|
||||||
|
|||||||
@@ -1,4 +1,10 @@
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum Signal {
|
pub enum Signal {
|
||||||
|
/// Emitted when the list of conversations is updated.
|
||||||
ConversationsUpdated,
|
ConversationsUpdated,
|
||||||
|
|
||||||
|
/// Emitted when the list of messages for a conversation is updated.
|
||||||
|
/// Parameters:
|
||||||
|
/// - conversation_id: The ID of the conversation that was updated.
|
||||||
|
MessagesUpdated(String),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,5 +11,6 @@ pub mod interface {
|
|||||||
|
|
||||||
pub mod signals {
|
pub mod signals {
|
||||||
pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated;
|
pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated;
|
||||||
|
pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -74,6 +74,10 @@ impl DbusRepository for ServerImpl {
|
|||||||
self.send_event_sync(Event::SyncAllConversations)
|
self.send_event_sync(Event::SyncAllConversations)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn sync_conversation(&mut self, conversation_id: String) -> Result<(), dbus::MethodErr> {
|
||||||
|
self.send_event_sync(|r| Event::SyncConversation(conversation_id, r))
|
||||||
|
}
|
||||||
|
|
||||||
fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
|
fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
|
||||||
let last_message_id_opt = if last_message_id.is_empty() {
|
let last_message_id_opt = if last_message_id.is_empty() {
|
||||||
None
|
None
|
||||||
|
|||||||
@@ -65,6 +65,15 @@ async fn main() {
|
|||||||
0
|
0
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Signal::MessagesUpdated(conversation_id) => {
|
||||||
|
log::info!("Sending signal: MessagesUpdated for conversation {}", conversation_id);
|
||||||
|
endpoint.send_signal(interface::OBJECT_PATH, DbusSignals::MessagesUpdated{ conversation_id })
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
log::error!("Failed to send signal");
|
||||||
|
0
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -21,7 +21,9 @@ pub enum Commands {
|
|||||||
Conversations,
|
Conversations,
|
||||||
|
|
||||||
/// Runs a sync operation.
|
/// Runs a sync operation.
|
||||||
Sync,
|
Sync {
|
||||||
|
conversation_id: Option<String>,
|
||||||
|
},
|
||||||
|
|
||||||
/// Prints the server Kordophone version.
|
/// Prints the server Kordophone version.
|
||||||
Version,
|
Version,
|
||||||
@@ -69,7 +71,7 @@ impl Commands {
|
|||||||
match cmd {
|
match cmd {
|
||||||
Commands::Version => client.print_version().await,
|
Commands::Version => client.print_version().await,
|
||||||
Commands::Conversations => client.print_conversations().await,
|
Commands::Conversations => client.print_conversations().await,
|
||||||
Commands::Sync => client.sync_conversations().await,
|
Commands::Sync { conversation_id } => client.sync_conversations(conversation_id).await,
|
||||||
Commands::Config { command } => client.config(command).await,
|
Commands::Config { command } => client.config(command).await,
|
||||||
Commands::Signals => client.wait_for_signals().await,
|
Commands::Signals => client.wait_for_signals().await,
|
||||||
Commands::Messages { conversation_id, last_message_id } => client.print_messages(conversation_id, last_message_id).await,
|
Commands::Messages { conversation_id, last_message_id } => client.print_messages(conversation_id, last_message_id).await,
|
||||||
@@ -109,9 +111,14 @@ impl DaemonCli {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn sync_conversations(&mut self) -> Result<()> {
|
pub async fn sync_conversations(&mut self, conversation_id: Option<String>) -> Result<()> {
|
||||||
KordophoneRepository::sync_all_conversations(&self.proxy())
|
if let Some(conversation_id) = conversation_id {
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e))
|
KordophoneRepository::sync_conversation(&self.proxy(), &conversation_id)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to sync conversation: {}", e))
|
||||||
|
} else {
|
||||||
|
KordophoneRepository::sync_all_conversations(&self.proxy())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn print_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Result<()> {
|
pub async fn print_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Result<()> {
|
||||||
|
|||||||
Reference in New Issue
Block a user