Merge branch 'wip/local_ids'
* wip/local_ids: first attempt at trying to keep track of locally send id
This commit is contained in:
@@ -0,0 +1,3 @@
|
|||||||
|
-- Drop the alias mapping table
|
||||||
|
DROP TABLE IF EXISTS `message_aliases`;
|
||||||
|
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
-- Add table to map local (client) IDs to server message GUIDs
|
||||||
|
CREATE TABLE IF NOT EXISTS `message_aliases` (
|
||||||
|
`local_id` TEXT NOT NULL PRIMARY KEY,
|
||||||
|
`server_id` TEXT NOT NULL UNIQUE,
|
||||||
|
`conversation_id` TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
@@ -307,8 +307,11 @@ impl<'a> Repository<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn delete_all_messages(&mut self) -> Result<()> {
|
pub fn delete_all_messages(&mut self) -> Result<()> {
|
||||||
use crate::schema::messages::dsl::*;
|
use crate::schema::messages::dsl as messages_dsl;
|
||||||
diesel::delete(messages).execute(self.connection)?;
|
use crate::schema::message_aliases::dsl as aliases_dsl;
|
||||||
|
|
||||||
|
diesel::delete(messages_dsl::messages).execute(self.connection)?;
|
||||||
|
diesel::delete(aliases_dsl::message_aliases).execute(self.connection)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -359,6 +362,57 @@ impl<'a> Repository<'a> {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create or update an alias mapping between a local (client) message id and a server message id.
|
||||||
|
pub fn set_message_alias(
|
||||||
|
&mut self,
|
||||||
|
local_id_in: &str,
|
||||||
|
server_id_in: &str,
|
||||||
|
conversation_id_in: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
use crate::schema::message_aliases::dsl::*;
|
||||||
|
diesel::replace_into(message_aliases)
|
||||||
|
.values((
|
||||||
|
local_id.eq(local_id_in),
|
||||||
|
server_id.eq(server_id_in),
|
||||||
|
conversation_id.eq(conversation_id_in),
|
||||||
|
))
|
||||||
|
.execute(self.connection)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the local id for a given server id, if any.
|
||||||
|
pub fn get_local_id_for(&mut self, server_id_in: &str) -> Result<Option<String>> {
|
||||||
|
use crate::schema::message_aliases::dsl::*;
|
||||||
|
let result = message_aliases
|
||||||
|
.filter(server_id.eq(server_id_in))
|
||||||
|
.select(local_id)
|
||||||
|
.first::<String>(self.connection)
|
||||||
|
.optional()?;
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Batch lookup: returns a map server_id -> local_id for the provided server ids.
|
||||||
|
pub fn get_local_ids_for(
|
||||||
|
&mut self,
|
||||||
|
server_ids_in: Vec<String>,
|
||||||
|
) -> Result<HashMap<String, String>> {
|
||||||
|
use crate::schema::message_aliases::dsl::*;
|
||||||
|
if server_ids_in.is_empty() {
|
||||||
|
return Ok(HashMap::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
let rows: Vec<(String, String)> = message_aliases
|
||||||
|
.filter(server_id.eq_any(&server_ids_in))
|
||||||
|
.select((server_id, local_id))
|
||||||
|
.load::<(String, String)>(self.connection)?;
|
||||||
|
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
for (sid, lid) in rows {
|
||||||
|
map.insert(sid, lid);
|
||||||
|
}
|
||||||
|
Ok(map)
|
||||||
|
}
|
||||||
|
|
||||||
/// Update the contact_id for an existing participant record.
|
/// Update the contact_id for an existing participant record.
|
||||||
pub fn update_participant_contact(
|
pub fn update_participant_contact(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
|||||||
@@ -44,6 +44,14 @@ diesel::table! {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
diesel::table! {
|
||||||
|
message_aliases (local_id) {
|
||||||
|
local_id -> Text,
|
||||||
|
server_id -> Text,
|
||||||
|
conversation_id -> Text,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
diesel::table! {
|
diesel::table! {
|
||||||
settings (key) {
|
settings (key) {
|
||||||
key -> Text,
|
key -> Text,
|
||||||
@@ -62,5 +70,6 @@ diesel::allow_tables_to_appear_in_same_query!(
|
|||||||
conversation_participants,
|
conversation_participants,
|
||||||
messages,
|
messages,
|
||||||
conversation_messages,
|
conversation_messages,
|
||||||
|
message_aliases,
|
||||||
settings,
|
settings,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -347,7 +347,16 @@ impl Daemon {
|
|||||||
self.database
|
self.database
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.with_repository(|r| r.insert_message(&conversation_id, message.into()))
|
.with_repository(|r| {
|
||||||
|
// 1) Insert the server message
|
||||||
|
r.insert_message(&conversation_id, message.clone().into())?;
|
||||||
|
// 2) Persist alias local -> server for stable UI ids
|
||||||
|
r.set_message_alias(
|
||||||
|
&outgoing_message.guid.to_string(),
|
||||||
|
&message.id,
|
||||||
|
&conversation_id,
|
||||||
|
)
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -448,18 +457,38 @@ impl Daemon {
|
|||||||
.get(&conversation_id)
|
.get(&conversation_id)
|
||||||
.unwrap_or(&empty_vec);
|
.unwrap_or(&empty_vec);
|
||||||
|
|
||||||
self.database
|
// Fetch DB messages and an alias map (server_id -> local_id) in one DB access.
|
||||||
|
let (db_messages, alias_map) = self
|
||||||
|
.database
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.with_repository(|r| {
|
.with_repository(|r| {
|
||||||
r.get_messages_for_conversation(&conversation_id)
|
let msgs = r.get_messages_for_conversation(&conversation_id).unwrap();
|
||||||
.unwrap()
|
let ids: Vec<String> = msgs.iter().map(|m| m.id.clone()).collect();
|
||||||
.into_iter()
|
let map = r.get_local_ids_for(ids).unwrap_or_default();
|
||||||
.map(|m| m.into()) // Convert db::Message to daemon::Message
|
(msgs, map)
|
||||||
.chain(outgoing_messages.into_iter().map(|m| m.into()))
|
|
||||||
.collect()
|
|
||||||
})
|
})
|
||||||
.await
|
.await;
|
||||||
|
|
||||||
|
// Convert DB messages to daemon model, substituting local_id when an alias exists.
|
||||||
|
let mut result: Vec<Message> = Vec::with_capacity(
|
||||||
|
db_messages.len() + outgoing_messages.len(),
|
||||||
|
);
|
||||||
|
for m in db_messages.into_iter() {
|
||||||
|
let server_id = m.id.clone();
|
||||||
|
let mut dm: Message = m.into();
|
||||||
|
if let Some(local_id) = alias_map.get(&server_id) {
|
||||||
|
dm.id = local_id.clone();
|
||||||
|
}
|
||||||
|
result.push(dm);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append pending outgoing messages (these already use local_id)
|
||||||
|
for om in outgoing_messages.iter() {
|
||||||
|
result.push(om.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn enqueue_outgoing_message(
|
async fn enqueue_outgoing_message(
|
||||||
|
|||||||
Reference in New Issue
Block a user