Private
Public Access
1
0

daemon: add support for getting messages from db

This commit is contained in:
2025-04-28 16:00:04 -07:00
parent 9c245a5b52
commit c189e5f9e3
11 changed files with 267 additions and 15 deletions

136
Cargo.lock generated
View File

@@ -297,6 +297,27 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
[[package]]
name = "csv"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d02f3b0da4c6504f86e9cd789d8dbafab48c2321be74e9987593de5a894d93d"
dependencies = [
"memchr",
]
[[package]]
name = "ctor"
version = "0.2.8"
@@ -459,6 +480,16 @@ dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-next"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1"
dependencies = [
"cfg-if",
"dirs-sys-next",
]
[[package]]
name = "dirs-sys"
version = "0.5.0"
@@ -467,10 +498,21 @@ checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab"
dependencies = [
"libc",
"option-ext",
"redox_users",
"redox_users 0.5.0",
"windows-sys 0.59.0",
]
[[package]]
name = "dirs-sys-next"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
dependencies = [
"libc",
"redox_users 0.4.6",
"winapi",
]
[[package]]
name = "dotenv"
version = "0.15.0"
@@ -497,6 +539,12 @@ version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
[[package]]
name = "encode_unicode"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
[[package]]
name = "env_filter"
version = "0.1.2"
@@ -678,6 +726,12 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "hermit-abi"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbd780fe5cc30f81464441920d82ac8740e2e46b29a6fad543ddd075229ce37e"
[[package]]
name = "http"
version = "0.2.12"
@@ -788,6 +842,17 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "is-terminal"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9"
dependencies = [
"hermit-abi 0.5.0",
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
@@ -888,7 +953,7 @@ dependencies = [
"kordophone",
"kordophone-db",
"log",
"thiserror",
"thiserror 2.0.12",
"tokio",
]
@@ -906,6 +971,7 @@ dependencies = [
"kordophone-db",
"log",
"pretty",
"prettytable",
"time",
"tokio",
]
@@ -1199,6 +1265,20 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "prettytable"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46480520d1b77c9a3482d39939fcf96831537a250ec62d4fd8fbdf8e0302e781"
dependencies = [
"csv",
"encode_unicode",
"is-terminal",
"lazy_static",
"term",
"unicode-width",
]
[[package]]
name = "proc-macro2"
version = "1.0.95"
@@ -1256,6 +1336,17 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_users"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43"
dependencies = [
"getrandom",
"libredox",
"thiserror 1.0.69",
]
[[package]]
name = "redox_users"
version = "0.5.0"
@@ -1264,7 +1355,7 @@ checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b"
dependencies = [
"getrandom",
"libredox",
"thiserror",
"thiserror 2.0.12",
]
[[package]]
@@ -1315,6 +1406,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustversion"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2"
[[package]]
name = "ryu"
version = "1.0.17"
@@ -1477,6 +1574,17 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "term"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f"
dependencies = [
"dirs-next",
"rustversion",
"winapi",
]
[[package]]
name = "termcolor"
version = "1.4.1"
@@ -1495,13 +1603,33 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
dependencies = [
"thiserror-impl",
"thiserror-impl 2.0.12",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]

View File

@@ -207,6 +207,21 @@ impl<'a> Repository<'a> {
Ok(result)
}
pub fn get_last_message_for_conversation(&mut self, conversation_guid: &str) -> Result<Option<Message>> {
use crate::schema::messages::dsl::*;
use crate::schema::conversation_messages::dsl::*;
let message_record = conversation_messages
.filter(conversation_id.eq(conversation_guid))
.inner_join(messages)
.select(MessageRecord::as_select())
.order_by(schema::messages::date.desc())
.first::<MessageRecord>(self.connection)
.optional()?;
Ok(message_record.map(|r| r.into()))
}
// Helper function to get the last inserted row ID
// This is a workaround since the Sqlite backend doesn't support `RETURNING`
// Huge caveat with this is that it depends on whatever the last insert was, prevents concurrent inserts.

View File

@@ -310,6 +310,10 @@ async fn test_insert_messages_batch() {
),
}
}
// Make sure the last message is the last one we inserted
let last_message = repository.get_last_message_for_conversation(&conversation_id).unwrap().unwrap();
assert_eq!(last_message.id, message4.id);
})
.await;
}

View File

@@ -8,6 +8,8 @@
value="Returns the version of the client daemon."/>
</method>
<!-- Conversations -->
<method name="GetConversations">
<arg type="aa{sv}" direction="out" name="conversations">
<annotation name="org.freedesktop.DBus.DocString"
@@ -31,6 +33,20 @@
<annotation name="org.freedesktop.DBus.DocString"
value="Emitted when the list of conversations is updated."/>
</signal>
<!-- Messages -->
<method name="GetMessages">
<arg type="s" name="conversation_id" direction="in"/>
<arg type="s" name="last_message_id" direction="in"/>
<arg type="aa{sv}" direction="out" name="messages"/>
</method>
<signal name="MessagesUpdated">
<annotation name="org.freedesktop.DBus.DocString"
value="Emitted when the list of messages is updated."/>
</signal>
</interface>
<interface name="net.buzzert.kordophone.Settings">

View File

@@ -1,5 +1,5 @@
use tokio::sync::oneshot;
use kordophone_db::models::Conversation;
use kordophone_db::models::{Conversation, Message};
use crate::daemon::settings::Settings;
pub type Reply<T> = oneshot::Sender<T>;
@@ -20,6 +20,12 @@ pub enum Event {
/// Update settings in the database.
UpdateSettings(Settings, Reply<()>),
/// Returns all messages for a conversation from the database.
/// Parameters:
/// - conversation_id: The ID of the conversation to get messages for.
/// - last_message_id: (optional) The ID of the last message to get. If None, all messages are returned.
GetMessages(String, Option<String>, Reply<Vec<Message>>),
}

View File

@@ -19,7 +19,7 @@ use async_trait::async_trait;
use kordophone_db::{
database::{Database, DatabaseAccess},
models::Conversation,
models::{Conversation, Message},
};
use kordophone::model::JwtToken;
@@ -54,6 +54,7 @@ impl TokenStore for DatabaseTokenStore {
mod target {
pub static SYNC: &str = "sync";
pub static EVENT: &str = "event";
}
pub struct Daemon {
@@ -100,7 +101,11 @@ impl Daemon {
}
pub async fn run(&mut self) {
log::info!("Starting daemon version {}", self.version);
log::debug!("Debug logging enabled.");
while let Some(event) = self.event_receiver.recv().await {
log::debug!(target: target::EVENT, "Received event: {:?}", event);
self.handle_event(event).await;
}
}
@@ -148,6 +153,11 @@ impl Daemon {
reply.send(()).unwrap();
},
Event::GetMessages(conversation_id, last_message_id, reply) => {
let messages = self.get_messages(conversation_id, last_message_id).await;
reply.send(messages).unwrap();
},
}
}
@@ -160,6 +170,10 @@ impl Daemon {
self.database.lock().await.with_repository(|r| r.all_conversations().unwrap()).await
}
async fn get_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Vec<Message> {
self.database.lock().await.with_repository(|r| r.get_messages_for_conversation(&conversation_id).unwrap()).await
}
async fn sync_all_conversations_impl(database: &mut Arc<Mutex<Database>>, signal_sender: &Sender<Signal>) -> Result<()> {
log::info!(target: target::SYNC, "Starting conversation sync");
@@ -180,8 +194,16 @@ impl Daemon {
database.with_repository(|r| r.insert_conversation(conversation)).await?;
// Fetch and sync messages for this conversation
let last_message_id = database.with_repository(|r| -> Option<String> {
r.get_last_message_for_conversation(&conversation_id)
.unwrap_or(None)
.map(|m| m.id)
}).await;
log::debug!(target: target::SYNC, "Fetching messages for conversation {}", conversation_id);
let messages = client.get_messages(&conversation_id, None, None, None).await?;
log::debug!(target: target::SYNC, "Last message id: {:?}", last_message_id);
let messages = client.get_messages(&conversation_id, None, None, last_message_id).await?;
let db_messages: Vec<kordophone_db::models::Message> = messages.into_iter()
.map(|m| kordophone_db::models::Message::from(m))
.collect();

View File

@@ -74,6 +74,28 @@ impl DbusRepository for ServerImpl {
fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> {
self.send_event_sync(Event::SyncAllConversations)
}
fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
let last_message_id_opt = if last_message_id.is_empty() {
None
} else {
Some(last_message_id)
};
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))
.and_then(|messages| {
let result = messages.into_iter().map(|msg| {
let mut map = arg::PropMap::new();
map.insert("id".into(), arg::Variant(Box::new(msg.id)));
map.insert("text".into(), arg::Variant(Box::new(msg.text)));
map.insert("date".into(), arg::Variant(Box::new(msg.date.and_utc().timestamp())));
map.insert("sender".into(), arg::Variant(Box::new(msg.sender.display_name())));
map
}).collect();
Ok(result)
})
}
}
impl DbusSettings for ServerImpl {

View File

@@ -12,9 +12,14 @@ use dbus::interface;
use dbus::server_impl::ServerImpl;
fn initialize_logging() {
// Weird: is this the best way to do this?
let log_level = std::env::var("RUST_LOG")
.map(|s| s.parse::<LevelFilter>().unwrap_or(LevelFilter::Info))
.unwrap_or(LevelFilter::Info);
env_logger::Builder::from_default_env()
.filter_level(LevelFilter::Info)
.format_timestamp_secs()
.filter_level(log_level)
.init();
}

View File

@@ -15,6 +15,7 @@ kordophone = { path = "../kordophone" }
kordophone-db = { path = "../kordophone-db" }
log = "0.4.22"
pretty = { version = "0.12.3", features = ["termcolor"] }
prettytable = "0.10.0"
time = "0.3.37"
tokio = "1.41.1"

View File

@@ -1,8 +1,8 @@
use anyhow::Result;
use clap::Subcommand;
use dbus::blocking::{Connection, Proxy};
use prettytable::table;
use crate::printers::{ConversationPrinter, MessagePrinter};
use std::future;
const DBUS_NAME: &str = "net.buzzert.kordophonecd";
const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon";
@@ -34,6 +34,12 @@ pub enum Commands {
/// Waits for signals from the daemon.
Signals,
/// Prints the messages for a conversation.
Messages {
conversation_id: String,
last_message_id: Option<String>,
},
}
#[derive(Subcommand)]
@@ -66,6 +72,7 @@ impl Commands {
Commands::Sync => client.sync_conversations().await,
Commands::Config { command } => client.config(command).await,
Commands::Signals => client.wait_for_signals().await,
Commands::Messages { conversation_id, last_message_id } => client.print_messages(conversation_id, last_message_id).await,
}
}
}
@@ -107,6 +114,17 @@ impl DaemonCli {
.map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e))
}
pub async fn print_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Result<()> {
let messages = KordophoneRepository::get_messages(&self.proxy(), &conversation_id, &last_message_id.unwrap_or_default())?;
println!("Number of messages: {}", messages.len());
for message in messages {
println!("{}", MessagePrinter::new(&message.into()));
}
Ok(())
}
pub async fn wait_for_signals(&mut self) -> Result<()> {
use dbus::Message;
mod dbus_signals {
@@ -136,13 +154,17 @@ impl DaemonCli {
}
pub async fn print_settings(&mut self) -> Result<()> {
let server_url = KordophoneSettings::server_url(&self.proxy())?;
let username = KordophoneSettings::username(&self.proxy())?;
let credential_item = KordophoneSettings::credential_item(&self.proxy())?;
let server_url = KordophoneSettings::server_url(&self.proxy()).unwrap_or_default();
let username = KordophoneSettings::username(&self.proxy()).unwrap_or_default();
let credential_item = KordophoneSettings::credential_item(&self.proxy()).unwrap_or_default();
let table = table!(
[ b->"Server URL", &server_url ],
[ b->"Username", &username ],
[ b->"Credential Item", &credential_item ]
);
table.printstd();
println!("Server URL: {}", server_url);
println!("Username: {}", username);
println!("Credential Item: {}", credential_item);
Ok(())
}

View File

@@ -86,6 +86,17 @@ impl From<kordophone_db::models::Message> for PrintableMessage {
}
}
impl From<arg::PropMap> for PrintableMessage {
fn from(value: arg::PropMap) -> Self {
Self {
guid: value.get("id").unwrap().as_str().unwrap().to_string(),
date: OffsetDateTime::from_unix_timestamp(value.get("date").unwrap().as_i64().unwrap()).unwrap(),
sender: value.get("sender").unwrap().as_str().unwrap().to_string(),
text: value.get("text").unwrap().as_str().unwrap().to_string(),
}
}
}
pub struct ConversationPrinter<'a> {
doc: RcDoc<'a, PrintableConversation>
}