use anyhow::Result; use diesel::prelude::*; use diesel::query_dsl::BelongingToDsl; use crate::{ models::{ Conversation, Message, Participant, db::conversation::Record as ConversationRecord, db::participant::{ ConversationParticipant, Record as ParticipantRecord, InsertableRecord as InsertableParticipantRecord }, db::message::Record as MessageRecord, }, schema, }; pub struct Repository<'a> { connection: &'a mut SqliteConnection, } impl<'a> Repository<'a> { pub fn new(connection: &'a mut SqliteConnection) -> Self { Self { connection } } pub fn insert_conversation(&mut self, conversation: Conversation) -> Result<()> { use crate::schema::conversations::dsl::*; use crate::schema::participants::dsl::*; use crate::schema::conversation_participants::dsl::*; let (db_conversation, db_participants) = conversation.into(); diesel::replace_into(conversations) .values(&db_conversation) .execute(self.connection)?; diesel::replace_into(participants) .values(&db_participants) .execute(self.connection)?; // Sqlite backend doesn't support batch insert, so we have to do this manually for participant in db_participants { let pid = participants .select(schema::participants::id) .filter(schema::participants::display_name.eq(&participant.display_name)) .first::(self.connection)?; diesel::replace_into(conversation_participants) .values(( conversation_id.eq(&db_conversation.id), participant_id.eq(pid), )) .execute(self.connection)?; } Ok(()) } pub fn get_conversation_by_guid(&mut self, match_guid: &str) -> Result> { use crate::schema::conversations::dsl::*; use crate::schema::participants::dsl::*; let result = conversations .find(match_guid) .first::(self.connection) .optional()?; if let Some(conversation) = result { let db_participants = ConversationParticipant::belonging_to(&conversation) .inner_join(participants) .select(ParticipantRecord::as_select()) .load::(self.connection)?; let mut model_conversation: Conversation = conversation.into(); model_conversation.participants = db_participants.into_iter().map(|p| p.into()).collect(); return Ok(Some(model_conversation)); } Ok(None) } pub fn all_conversations(&mut self) -> Result> { use crate::schema::conversations::dsl::*; use crate::schema::participants::dsl::*; let db_conversations = conversations .load::(self.connection)?; let mut result = Vec::new(); for db_conversation in db_conversations { let db_participants = ConversationParticipant::belonging_to(&db_conversation) .inner_join(participants) .select(ParticipantRecord::as_select()) .load::(self.connection)?; let mut model_conversation: Conversation = db_conversation.into(); model_conversation.participants = db_participants.into_iter().map(|p| p.into()).collect(); result.push(model_conversation); } Ok(result) } pub fn insert_message(&mut self, conversation_guid: &str, message: Message) -> Result<()> { use crate::schema::messages::dsl::*; use crate::schema::conversation_messages::dsl::*; // Handle participant if message has a remote sender let sender = message.sender.clone(); let mut db_message: MessageRecord = message.into(); db_message.sender_participant_id = self.get_or_create_participant(&sender); diesel::replace_into(messages) .values(&db_message) .execute(self.connection)?; diesel::replace_into(conversation_messages) .values(( conversation_id.eq(conversation_guid), message_id.eq(&db_message.id), )) .execute(self.connection)?; Ok(()) } pub fn insert_messages(&mut self, conversation_guid: &str, in_messages: Vec) -> Result<()> { use crate::schema::messages::dsl::*; use crate::schema::conversation_messages::dsl::*; // Local insertable struct for the join table #[derive(Insertable)] #[diesel(table_name = crate::schema::conversation_messages)] struct InsertableConversationMessage { pub conversation_id: String, pub message_id: String, } if in_messages.is_empty() { return Ok(()); } // Build the collections of insertable records let mut db_messages: Vec = Vec::with_capacity(in_messages.len()); let mut conv_msg_records: Vec = Vec::with_capacity(in_messages.len()); for message in in_messages { // Handle participant if message has a remote sender let sender = message.sender.clone(); let mut db_message: MessageRecord = message.into(); db_message.sender_participant_id = self.get_or_create_participant(&sender); conv_msg_records.push(InsertableConversationMessage { conversation_id: conversation_guid.to_string(), message_id: db_message.id.clone(), }); db_messages.push(db_message); } // Batch insert or replace messages diesel::replace_into(messages) .values(&db_messages) .execute(self.connection)?; // Batch insert the conversation-message links diesel::replace_into(conversation_messages) .values(&conv_msg_records) .execute(self.connection)?; Ok(()) } pub fn get_messages_for_conversation(&mut self, conversation_guid: &str) -> Result> { use crate::schema::messages::dsl::*; use crate::schema::conversation_messages::dsl::*; use crate::schema::participants::dsl::*; let message_records = conversation_messages .filter(conversation_id.eq(conversation_guid)) .inner_join(messages) .select(MessageRecord::as_select()) .order_by(schema::messages::date.asc()) .load::(self.connection)?; let mut result = Vec::new(); for message_record in message_records { let mut message: Message = message_record.clone().into(); // If there's a sender_participant_id, load the participant info if let Some(pid) = message_record.sender_participant_id { let participant = participants .find(pid) .first::(self.connection)?; message.sender = participant.into(); } result.push(message); } Ok(result) } // Helper function to get the last inserted row ID // This is a workaround since the Sqlite backend doesn't support `RETURNING` // Huge caveat with this is that it depends on whatever the last insert was, prevents concurrent inserts. fn last_insert_id(&mut self) -> Result { Ok(diesel::select(diesel::dsl::sql::("last_insert_rowid()")) .get_result(self.connection)?) } fn get_or_create_participant(&mut self, participant: &Participant) -> Option { match participant { Participant::Me => None, Participant::Remote { display_name: p_name, .. } => { use crate::schema::participants::dsl::*; let existing_participant = participants .filter(display_name.eq(p_name)) .first::(self.connection) .optional() .unwrap(); if let Some(participant) = existing_participant { return Some(participant.id); } let participant_record = InsertableParticipantRecord { display_name: Some(participant.display_name()), is_me: false, }; diesel::insert_into(participants) .values(&participant_record) .execute(self.connection) .unwrap(); self.last_insert_id().ok() } } } }