Perf optimizations, recommended by o3
This commit is contained in:
@@ -31,6 +31,14 @@ pub struct Database {
|
|||||||
impl Database {
|
impl Database {
|
||||||
pub fn new(path: &str) -> Result<Self> {
|
pub fn new(path: &str) -> Result<Self> {
|
||||||
let mut connection = SqliteConnection::establish(path)?;
|
let mut connection = SqliteConnection::establish(path)?;
|
||||||
|
|
||||||
|
// Performance optimisations for SQLite. These are safe defaults that speed
|
||||||
|
// up concurrent writes and cut the fsync cost dramatically while still
|
||||||
|
// keeping durability guarantees that are good enough for an end-user
|
||||||
|
// application.
|
||||||
|
diesel::sql_query("PRAGMA journal_mode = WAL;").execute(&mut connection)?;
|
||||||
|
diesel::sql_query("PRAGMA synchronous = NORMAL;").execute(&mut connection)?;
|
||||||
|
|
||||||
connection
|
connection
|
||||||
.run_pending_migrations(MIGRATIONS)
|
.run_pending_migrations(MIGRATIONS)
|
||||||
.map_err(|e| anyhow::anyhow!("Error running migrations: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Error running migrations: {}", e))?;
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use diesel::query_dsl::BelongingToDsl;
|
use diesel::query_dsl::BelongingToDsl;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
models::{
|
models::{
|
||||||
@@ -142,8 +143,8 @@ impl<'a> Repository<'a> {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
use crate::schema::conversation_messages::dsl::*;
|
use crate::schema::conversation_messages::dsl::*;
|
||||||
use crate::schema::messages::dsl::*;
|
use crate::schema::messages::dsl::*;
|
||||||
|
use crate::schema::participants::dsl as participants_dsl;
|
||||||
|
|
||||||
// Local insertable struct for the join table
|
|
||||||
#[derive(Insertable)]
|
#[derive(Insertable)]
|
||||||
#[diesel(table_name = crate::schema::conversation_messages)]
|
#[diesel(table_name = crate::schema::conversation_messages)]
|
||||||
struct InsertableConversationMessage {
|
struct InsertableConversationMessage {
|
||||||
@@ -155,37 +156,99 @@ impl<'a> Repository<'a> {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the collections of insertable records
|
// Use a single transaction for everything – this removes the implicit
|
||||||
let mut db_messages: Vec<MessageRecord> = Vec::with_capacity(in_messages.len());
|
// autocommit after every statement which costs a lot when we have many
|
||||||
let mut conv_msg_records: Vec<InsertableConversationMessage> =
|
// individual queries.
|
||||||
Vec::with_capacity(in_messages.len());
|
self.connection
|
||||||
|
.transaction::<_, diesel::result::Error, _>(|conn| {
|
||||||
|
// Cache participant ids we have already looked up / created – in a
|
||||||
|
// typical conversation we only have a handful of participants, but we
|
||||||
|
// might be processing hundreds of messages. Avoiding an extra SELECT per
|
||||||
|
// message saves a lot of round-trips to SQLite.
|
||||||
|
let mut participant_cache: HashMap<String, i32> = HashMap::new();
|
||||||
|
|
||||||
for message in in_messages {
|
// Prepare collections for the batch inserts.
|
||||||
// Handle participant if message has a remote sender
|
let mut db_messages: Vec<MessageRecord> =
|
||||||
let sender = message.sender.clone();
|
Vec::with_capacity(in_messages.len());
|
||||||
let mut db_message: MessageRecord = message.into();
|
let mut conv_msg_records: Vec<InsertableConversationMessage> =
|
||||||
db_message.sender_participant_id = self.get_or_create_participant(&sender);
|
Vec::with_capacity(in_messages.len());
|
||||||
|
|
||||||
conv_msg_records.push(InsertableConversationMessage {
|
for message in in_messages {
|
||||||
conversation_id: conversation_guid.to_string(),
|
// Resolve/insert the sender participant only once per display name.
|
||||||
message_id: db_message.id.clone(),
|
let sender_id = match &message.sender {
|
||||||
});
|
Participant::Me => None,
|
||||||
|
Participant::Remote { display_name, .. } => {
|
||||||
|
if let Some(cached_participant_id) = participant_cache.get(display_name) {
|
||||||
|
Some(*cached_participant_id)
|
||||||
|
} else {
|
||||||
|
// Try to load from DB first
|
||||||
|
let existing: Option<i32> = participants_dsl::participants
|
||||||
|
.filter(participants_dsl::display_name.eq(display_name))
|
||||||
|
.select(participants_dsl::id)
|
||||||
|
.first::<i32>(conn)
|
||||||
|
.optional()?;
|
||||||
|
|
||||||
db_messages.push(db_message);
|
let participant_id = if let Some(pid) = existing {
|
||||||
}
|
pid
|
||||||
|
} else {
|
||||||
|
let new_participant = InsertableParticipantRecord {
|
||||||
|
display_name: Some(display_name.clone()),
|
||||||
|
is_me: false,
|
||||||
|
};
|
||||||
|
|
||||||
// Batch insert or replace messages
|
diesel::insert_into(participants_dsl::participants)
|
||||||
diesel::replace_into(messages)
|
.values(&new_participant)
|
||||||
.values(&db_messages)
|
.execute(conn)?;
|
||||||
.execute(self.connection)?;
|
|
||||||
|
|
||||||
// Batch insert the conversation-message links
|
// last_insert_rowid() is connection-wide, but we are the only
|
||||||
diesel::replace_into(conversation_messages)
|
// writer inside this transaction.
|
||||||
.values(&conv_msg_records)
|
diesel::select(diesel::dsl::sql::<diesel::sql_types::Integer>(
|
||||||
.execute(self.connection)?;
|
"last_insert_rowid()",
|
||||||
|
))
|
||||||
|
.get_result::<i32>(conn)?
|
||||||
|
};
|
||||||
|
|
||||||
// Update conversation date
|
participant_cache.insert(display_name.clone(), participant_id);
|
||||||
self.update_conversation_metadata(conversation_guid)?;
|
Some(participant_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Convert the message into its DB form.
|
||||||
|
let mut db_message: MessageRecord = message.into();
|
||||||
|
db_message.sender_participant_id = sender_id;
|
||||||
|
|
||||||
|
conv_msg_records.push(InsertableConversationMessage {
|
||||||
|
conversation_id: conversation_guid.to_string(),
|
||||||
|
message_id: db_message.id.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
db_messages.push(db_message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the actual batch inserts.
|
||||||
|
diesel::replace_into(messages)
|
||||||
|
.values(&db_messages)
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
diesel::replace_into(conversation_messages)
|
||||||
|
.values(&conv_msg_records)
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
// Update conversation metadata quickly using the last message we just
|
||||||
|
// processed instead of re-querying the DB.
|
||||||
|
if let Some(last_msg) = db_messages.last() {
|
||||||
|
use crate::schema::conversations::dsl as conv_dsl;
|
||||||
|
diesel::update(conv_dsl::conversations.filter(conv_dsl::id.eq(conversation_guid)))
|
||||||
|
.set((
|
||||||
|
conv_dsl::date.eq(last_msg.date),
|
||||||
|
conv_dsl::last_message_preview.eq::<Option<String>>(Some(last_msg.text.clone())),
|
||||||
|
))
|
||||||
|
.execute(conn)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ fn initialize_logging() {
|
|||||||
.unwrap_or(LevelFilter::Info);
|
.unwrap_or(LevelFilter::Info);
|
||||||
|
|
||||||
env_logger::Builder::from_default_env()
|
env_logger::Builder::from_default_env()
|
||||||
.format_timestamp_secs()
|
.format_timestamp_millis()
|
||||||
.filter_level(log_level)
|
.filter_level(log_level)
|
||||||
.init();
|
.init();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user