Add 'core/' from commit 'b0dfc4146ca0da535a87f8509aec68817fb2ab14'
git-subtree-dir: core git-subtree-mainline:a07f3dcd23git-subtree-split:b0dfc4146c
This commit is contained in:
280
core/kordophoned/src/daemon/attachment_store.rs
Normal file
280
core/kordophoned/src/daemon/attachment_store.rs
Normal file
@@ -0,0 +1,280 @@
|
||||
use std::{
|
||||
io::{BufWriter, Write},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use futures_util::StreamExt;
|
||||
use kordophone::APIInterface;
|
||||
use thiserror::Error;
|
||||
|
||||
use kordophone_db::database::Database;
|
||||
|
||||
use crate::daemon::events::Event as DaemonEvent;
|
||||
use crate::daemon::events::Reply;
|
||||
use crate::daemon::models::Attachment;
|
||||
use crate::daemon::Daemon;
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use uuid::Uuid;
|
||||
|
||||
mod target {
|
||||
pub static ATTACHMENTS: &str = "attachments";
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AttachmentStoreEvent {
|
||||
// Get the attachment info for a given attachment guid.
|
||||
// Args: attachment guid, reply channel.
|
||||
GetAttachmentInfo(String, Reply<Attachment>),
|
||||
|
||||
// Queue a download for a given attachment guid.
|
||||
// Args:
|
||||
// - attachment guid
|
||||
// - preview: whether to download the preview (true) or full attachment (false)
|
||||
QueueDownloadAttachment(String, bool),
|
||||
|
||||
// Queue an upload for a given attachment file.
|
||||
// Args:
|
||||
// - path: the path to the attachment file
|
||||
// - reply: a reply channel to send the pending upload guid to
|
||||
QueueUploadAttachment(PathBuf, Reply<String>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
enum AttachmentStoreError {
|
||||
#[error("attachment has already been downloaded")]
|
||||
AttachmentAlreadyDownloaded,
|
||||
|
||||
#[error("temporary file already exists, assuming download is in progress")]
|
||||
DownloadAlreadyInProgress,
|
||||
|
||||
#[error("Client error: {0}")]
|
||||
APIClientError(String),
|
||||
}
|
||||
|
||||
pub struct AttachmentStore {
|
||||
store_path: PathBuf,
|
||||
database: Arc<Mutex<Database>>,
|
||||
daemon_event_sink: Sender<DaemonEvent>,
|
||||
|
||||
event_source: Receiver<AttachmentStoreEvent>,
|
||||
event_sink: Option<Sender<AttachmentStoreEvent>>,
|
||||
}
|
||||
|
||||
impl AttachmentStore {
|
||||
pub fn get_default_store_path() -> PathBuf {
|
||||
let data_dir = Daemon::get_data_dir().expect("Unable to get data path");
|
||||
data_dir.join("attachments")
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
database: Arc<Mutex<Database>>,
|
||||
daemon_event_sink: Sender<DaemonEvent>,
|
||||
) -> AttachmentStore {
|
||||
let store_path = Self::get_default_store_path();
|
||||
log::info!(target: target::ATTACHMENTS, "Attachment store path: {}", store_path.display());
|
||||
|
||||
// Create the attachment store if it doesn't exist
|
||||
std::fs::create_dir_all(&store_path)
|
||||
.expect("Wasn't able to create the attachment store path");
|
||||
|
||||
let (event_sink, event_source) = tokio::sync::mpsc::channel(100);
|
||||
|
||||
AttachmentStore {
|
||||
store_path: store_path,
|
||||
database: database,
|
||||
daemon_event_sink: daemon_event_sink,
|
||||
event_source: event_source,
|
||||
event_sink: Some(event_sink),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_event_sink(&mut self) -> Sender<AttachmentStoreEvent> {
|
||||
self.event_sink.take().unwrap()
|
||||
}
|
||||
|
||||
fn get_attachment(&self, guid: &String) -> Attachment {
|
||||
Self::get_attachment_impl(&self.store_path, guid)
|
||||
}
|
||||
|
||||
pub fn get_attachment_impl(store_path: &PathBuf, guid: &String) -> Attachment {
|
||||
let base_path = store_path.join(guid);
|
||||
Attachment {
|
||||
guid: guid.to_owned(),
|
||||
base_path: base_path,
|
||||
metadata: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_attachment_impl(
|
||||
store_path: &PathBuf,
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
daemon_event_sink: &Sender<DaemonEvent>,
|
||||
guid: &String,
|
||||
preview: bool,
|
||||
) -> Result<()> {
|
||||
let attachment = Self::get_attachment_impl(store_path, guid);
|
||||
|
||||
if attachment.is_downloaded(preview) {
|
||||
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", attachment.guid);
|
||||
return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into());
|
||||
}
|
||||
|
||||
let temporary_path = attachment.get_path_for_preview_scratch(preview, true);
|
||||
if std::fs::exists(&temporary_path).unwrap_or(false) {
|
||||
log::warn!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", temporary_path.display());
|
||||
return Err(AttachmentStoreError::DownloadAlreadyInProgress.into());
|
||||
}
|
||||
|
||||
log::debug!(target: target::ATTACHMENTS, "Starting download for attachment: {}", attachment.guid);
|
||||
|
||||
let file = std::fs::File::create(&temporary_path)?;
|
||||
let mut writer = BufWriter::new(&file);
|
||||
let mut client = Daemon::get_client_impl(database).await?;
|
||||
let mut stream = client
|
||||
.fetch_attachment_data(&attachment.guid, preview)
|
||||
.await
|
||||
.map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?;
|
||||
|
||||
log::trace!(target: target::ATTACHMENTS, "Writing attachment {:?} data to temporary file {:?}", &attachment.guid, &temporary_path);
|
||||
while let Some(Ok(data)) = stream.next().await {
|
||||
writer.write(data.as_ref())?;
|
||||
}
|
||||
|
||||
// Flush and sync the temporary file before moving
|
||||
writer.flush()?;
|
||||
file.sync_all()?;
|
||||
|
||||
// Atomically move the temporary file to the final location
|
||||
std::fs::rename(
|
||||
&temporary_path,
|
||||
&attachment.get_path_for_preview_scratch(preview, false),
|
||||
)?;
|
||||
|
||||
log::debug!(target: target::ATTACHMENTS, "Completed download for attachment: {}", attachment.guid);
|
||||
|
||||
// Send a signal to the daemon that the attachment has been downloaded.
|
||||
let event = DaemonEvent::AttachmentDownloaded(attachment.guid.clone());
|
||||
daemon_event_sink.send(event).await.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn upload_attachment_impl(
|
||||
store_path: &PathBuf,
|
||||
incoming_path: &PathBuf,
|
||||
upload_guid: &String,
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
daemon_event_sink: &Sender<DaemonEvent>,
|
||||
) -> Result<String> {
|
||||
use tokio::fs::File;
|
||||
use tokio::io::BufReader;
|
||||
|
||||
// Create uploads directory if it doesn't exist.
|
||||
let uploads_path = store_path.join("uploads");
|
||||
std::fs::create_dir_all(&uploads_path).unwrap();
|
||||
|
||||
// First, copy the file to the store path, under /uploads/.
|
||||
log::trace!(target: target::ATTACHMENTS, "Copying attachment to uploads directory: {}", uploads_path.display());
|
||||
let temporary_path = uploads_path.join(incoming_path.file_name().unwrap());
|
||||
std::fs::copy(incoming_path, &temporary_path).unwrap();
|
||||
|
||||
// Open file handle to the temporary file,
|
||||
log::trace!(target: target::ATTACHMENTS, "Opening stream to temporary file: {}", temporary_path.display());
|
||||
let file = File::open(&temporary_path).await?;
|
||||
let reader: BufReader<File> = BufReader::new(file);
|
||||
|
||||
// Upload the file to the server.
|
||||
let filename = incoming_path.file_name().unwrap().to_str().unwrap();
|
||||
log::trace!(target: target::ATTACHMENTS, "Uploading attachment to server: {}", &filename);
|
||||
let mut client = Daemon::get_client_impl(database).await?;
|
||||
|
||||
let metadata = std::fs::metadata(&temporary_path)?;
|
||||
let size = metadata.len();
|
||||
let guid = client.upload_attachment(reader, filename, size).await?;
|
||||
|
||||
// Delete the temporary file.
|
||||
log::debug!(target: target::ATTACHMENTS, "Upload completed with guid {}, deleting temporary file: {}", guid, temporary_path.display());
|
||||
std::fs::remove_file(&temporary_path).unwrap();
|
||||
|
||||
// Send a signal to the daemon that the attachment has been uploaded.
|
||||
let event = DaemonEvent::AttachmentUploaded(upload_guid.clone(), guid.clone());
|
||||
daemon_event_sink.send(event).await.unwrap();
|
||||
|
||||
Ok(guid)
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(event) = self.event_source.recv() => {
|
||||
log::debug!(target: target::ATTACHMENTS, "Received attachment store event: {:?}", event);
|
||||
|
||||
match event {
|
||||
AttachmentStoreEvent::QueueDownloadAttachment(guid, preview) => {
|
||||
let attachment = self.get_attachment(&guid);
|
||||
if !attachment.is_downloaded(preview) {
|
||||
let store_path = self.store_path.clone();
|
||||
let mut database = self.database.clone();
|
||||
let daemon_event_sink = self.daemon_event_sink.clone();
|
||||
let _guid = guid.clone();
|
||||
|
||||
// Spawn a new task here so we don't block incoming queue events.
|
||||
tokio::spawn(async move {
|
||||
let result = Self::download_attachment_impl(
|
||||
&store_path,
|
||||
&mut database,
|
||||
&daemon_event_sink,
|
||||
&_guid,
|
||||
preview,
|
||||
).await;
|
||||
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, e);
|
||||
}
|
||||
});
|
||||
|
||||
log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid);
|
||||
} else {
|
||||
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid);
|
||||
}
|
||||
}
|
||||
|
||||
AttachmentStoreEvent::GetAttachmentInfo(guid, reply) => {
|
||||
let attachment = self.get_attachment(&guid);
|
||||
reply.send(attachment).unwrap();
|
||||
}
|
||||
|
||||
AttachmentStoreEvent::QueueUploadAttachment(path, reply) => {
|
||||
let upload_guid = Uuid::new_v4().to_string();
|
||||
let store_path = self.store_path.clone();
|
||||
let mut database = self.database.clone();
|
||||
let daemon_event_sink = self.daemon_event_sink.clone();
|
||||
|
||||
let _upload_guid = upload_guid.clone();
|
||||
tokio::spawn(async move {
|
||||
let result = Self::upload_attachment_impl(
|
||||
&store_path,
|
||||
&path,
|
||||
&_upload_guid,
|
||||
&mut database,
|
||||
&daemon_event_sink,
|
||||
).await;
|
||||
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::ATTACHMENTS, "Error uploading attachment {}: {}", &_upload_guid, e);
|
||||
}
|
||||
});
|
||||
|
||||
reply.send(upload_guid).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
87
core/kordophoned/src/daemon/auth_store.rs
Normal file
87
core/kordophoned/src/daemon/auth_store.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
use crate::daemon::SettingsKey;
|
||||
|
||||
use keyring::{Entry, Result};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use kordophone::api::{http_client::Credentials, AuthenticationStore};
|
||||
use kordophone_db::database::{Database, DatabaseAccess};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
pub struct DatabaseAuthenticationStore {
|
||||
database: Arc<Mutex<Database>>,
|
||||
}
|
||||
|
||||
impl DatabaseAuthenticationStore {
|
||||
pub fn new(database: Arc<Mutex<Database>>) -> Self {
|
||||
Self { database }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AuthenticationStore for DatabaseAuthenticationStore {
|
||||
async fn get_credentials(&mut self) -> Option<Credentials> {
|
||||
self.database
|
||||
.lock()
|
||||
.await
|
||||
.with_settings(|settings| {
|
||||
let username: Option<String> = settings
|
||||
.get::<String>(SettingsKey::USERNAME)
|
||||
.unwrap_or_else(|e| {
|
||||
log::warn!("error getting username from database: {}", e);
|
||||
None
|
||||
});
|
||||
|
||||
match username {
|
||||
Some(username) => {
|
||||
let credential_res = Entry::new("net.buzzert.kordophonecd", &username);
|
||||
let password: Result<String> = match credential_res {
|
||||
Ok(credential) => credential.get_password(),
|
||||
Err(e) => {
|
||||
log::error!("error creating keyring credential: {}", e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
match password {
|
||||
Ok(password) => Some(Credentials { username, password }),
|
||||
Err(e) => {
|
||||
log::error!("error getting password from keyring: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_token(&mut self) -> Option<String> {
|
||||
self.database
|
||||
.lock()
|
||||
.await
|
||||
.with_settings(
|
||||
|settings| match settings.get::<String>(SettingsKey::TOKEN) {
|
||||
Ok(token) => token,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to get token from settings: {}", e);
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn set_token(&mut self, token: String) {
|
||||
self.database
|
||||
.lock()
|
||||
.await
|
||||
.with_settings(|settings| settings.put(SettingsKey::TOKEN, &token))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("Failed to set token: {}", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
294
core/kordophoned/src/daemon/contact_resolver/eds.rs
Normal file
294
core/kordophoned/src/daemon/contact_resolver/eds.rs
Normal file
@@ -0,0 +1,294 @@
|
||||
use super::ContactResolverBackend;
|
||||
use dbus::arg::{RefArg, Variant};
|
||||
use dbus::blocking::Connection;
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EDSContactResolverBackend;
|
||||
|
||||
// Cache the UID of the default local address book so we do not have to scan
|
||||
// all sources over and over again. Discovering the address book requires a
|
||||
// D-Bus round-trip that we would rather avoid on every lookup.
|
||||
static ADDRESS_BOOK_SOURCE_UID: OnceCell<String> = OnceCell::new();
|
||||
|
||||
/// Holds a D-Bus connection and the identifiers needed to create an address-book proxy.
|
||||
struct AddressBookHandle {
|
||||
connection: Connection,
|
||||
object_path: String,
|
||||
bus_name: String,
|
||||
}
|
||||
|
||||
impl AddressBookHandle {
|
||||
fn new() -> anyhow::Result<Self> {
|
||||
let connection = new_session_connection()?;
|
||||
let source_uid = ensure_address_book_uid(&connection)?;
|
||||
let (object_path, bus_name) = open_address_book(&connection, &source_uid)?;
|
||||
|
||||
Ok(Self {
|
||||
connection,
|
||||
object_path,
|
||||
bus_name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Obtain the global address-book handle, initialising it on the first call.
|
||||
static ADDRESS_BOOK_HANDLE: OnceCell<Mutex<AddressBookHandle>> = OnceCell::new();
|
||||
|
||||
/// Check whether a given well-known name currently has an owner on the bus.
|
||||
fn name_has_owner(conn: &Connection, name: &str) -> bool {
|
||||
let proxy = conn.with_proxy(
|
||||
"org.freedesktop.DBus",
|
||||
"/org/freedesktop/DBus",
|
||||
Duration::from_secs(2),
|
||||
);
|
||||
let result: Result<(bool,), _> =
|
||||
proxy.method_call("org.freedesktop.DBus", "NameHasOwner", (name.to_string(),));
|
||||
result.map(|(b,)| b).unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Returns a fresh handle, ensuring the cached one is still valid. If the backend owning the
|
||||
/// address-book disappeared, the cache is cleared and we try to create a new handle.
|
||||
fn obtain_handle() -> Option<std::sync::MutexGuard<'static, AddressBookHandle>> {
|
||||
// Initialize cell if necessary.
|
||||
let cell = ADDRESS_BOOK_HANDLE
|
||||
.get_or_try_init(|| AddressBookHandle::new().map(Mutex::new))
|
||||
.ok()?;
|
||||
|
||||
// Validate existing handle.
|
||||
{
|
||||
let mut guard = cell.lock().ok()?;
|
||||
if !name_has_owner(&guard.connection, &guard.bus_name) {
|
||||
// Try to refresh the handle in-place.
|
||||
match AddressBookHandle::new() {
|
||||
Ok(new_h) => {
|
||||
*guard = new_h;
|
||||
}
|
||||
Err(e) => {
|
||||
log::debug!("EDS resolver: failed to refresh address book handle: {}", e);
|
||||
// keep the stale handle but report failure
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Return guard after ensuring validity.
|
||||
return Some(guard);
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper that returns a blocking D-Bus session connection. Creating the
|
||||
/// connection is cheap (<1 ms) but we still keep it around because the
|
||||
/// underlying socket is re-used by the dbus crate.
|
||||
fn new_session_connection() -> Result<Connection, dbus::Error> {
|
||||
Connection::new_session()
|
||||
}
|
||||
|
||||
/// Scan Evolution-Data-Server sources to find a suitable address-book source
|
||||
/// UID. The implementation mirrors what `gdbus introspect` reveals for the
|
||||
/// EDS interfaces. We search all `org.gnome.evolution.dataserver.Source`
|
||||
/// objects and pick the first one that advertises the `[Address Book]` section
|
||||
/// with a `BackendName=` entry in its INI-style `Data` property.
|
||||
fn ensure_address_book_uid(conn: &Connection) -> anyhow::Result<String> {
|
||||
if let Some(uid) = ADDRESS_BOOK_SOURCE_UID.get() {
|
||||
return Ok(uid.clone());
|
||||
}
|
||||
|
||||
let source_manager_proxy = conn.with_proxy(
|
||||
"org.gnome.evolution.dataserver.Sources5",
|
||||
"/org/gnome/evolution/dataserver/SourceManager",
|
||||
Duration::from_secs(5),
|
||||
);
|
||||
|
||||
// The GetManagedObjects reply is the usual ObjectManager map.
|
||||
let (managed_objects,): (
|
||||
HashMap<dbus::Path<'static>, HashMap<String, HashMap<String, Variant<Box<dyn RefArg>>>>>,
|
||||
) = source_manager_proxy.method_call(
|
||||
"org.freedesktop.DBus.ObjectManager",
|
||||
"GetManagedObjects",
|
||||
(),
|
||||
)?;
|
||||
|
||||
let uid = managed_objects
|
||||
.values()
|
||||
.filter_map(|ifaces| ifaces.get("org.gnome.evolution.dataserver.Source"))
|
||||
.filter_map(|props| {
|
||||
let uid = props.get("UID")?.as_str()?;
|
||||
if uid == "system-address-book" {
|
||||
// Decoy.
|
||||
return None;
|
||||
}
|
||||
|
||||
let data = props.get("Data")?.as_str()?;
|
||||
if data_contains_address_book_backend(data) {
|
||||
Some(uid.to_owned())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("No address book source found"))?;
|
||||
|
||||
// Remember for future look-ups.
|
||||
log::debug!("EDS resolver: found address book source UID: {}", uid);
|
||||
let _ = ADDRESS_BOOK_SOURCE_UID.set(uid.clone());
|
||||
Ok(uid)
|
||||
}
|
||||
|
||||
fn data_contains_address_book_backend(data: &str) -> bool {
|
||||
let mut in_address_book_section = false;
|
||||
for line in data.lines() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.starts_with('[') && trimmed.ends_with(']') {
|
||||
in_address_book_section = trimmed == "[Address Book]";
|
||||
continue;
|
||||
}
|
||||
if in_address_book_section && trimmed.starts_with("BackendName=") {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Open the Evolution address book referenced by `source_uid` and return the
|
||||
/// pair `(object_path, bus_name)` that identifies the newly created D-Bus
|
||||
/// proxy.
|
||||
fn open_address_book(conn: &Connection, source_uid: &str) -> anyhow::Result<(String, String)> {
|
||||
let factory_proxy = conn.with_proxy(
|
||||
"org.gnome.evolution.dataserver.AddressBook10",
|
||||
"/org/gnome/evolution/dataserver/AddressBookFactory",
|
||||
Duration::from_secs(60),
|
||||
);
|
||||
|
||||
let (object_path, bus_name): (String, String) = factory_proxy.method_call(
|
||||
"org.gnome.evolution.dataserver.AddressBookFactory",
|
||||
"OpenAddressBook",
|
||||
(source_uid.to_owned(),),
|
||||
)?;
|
||||
|
||||
Ok((object_path, bus_name))
|
||||
}
|
||||
|
||||
/// Ensure that the backend for the given address-book proxy is opened.
|
||||
/// Evolution-Data-Server returns "Backend is not opened yet" until someone
|
||||
/// calls the `Open` method once per process. We ignore any error here
|
||||
/// because the backend might already be open.
|
||||
fn ensure_address_book_open(proxy: &dbus::blocking::Proxy<&Connection>) {
|
||||
let _: Result<(), _> =
|
||||
proxy.method_call("org.gnome.evolution.dataserver.AddressBook", "Open", ());
|
||||
}
|
||||
|
||||
impl ContactResolverBackend for EDSContactResolverBackend {
|
||||
type ContactID = String;
|
||||
|
||||
fn resolve_contact_id(&self, address: &str) -> Option<Self::ContactID> {
|
||||
let handle = match obtain_handle() {
|
||||
Some(h) => h,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let address_book_proxy = handle.connection.with_proxy(
|
||||
&handle.bus_name,
|
||||
&handle.object_path,
|
||||
Duration::from_secs(60),
|
||||
);
|
||||
|
||||
ensure_address_book_open(&address_book_proxy);
|
||||
|
||||
let filter = if address.contains('@') {
|
||||
format!("(is \"email\" \"{}\")", address)
|
||||
} else {
|
||||
let mut filters: Vec<String> = Vec::new();
|
||||
filters.push(format!("(is \"phone\" \"{}\")", address));
|
||||
|
||||
let normalized_address = address
|
||||
.chars()
|
||||
.filter(|c| c.is_numeric())
|
||||
.collect::<String>();
|
||||
|
||||
filters.push(format!("(is \"phone\" \"{}\")", normalized_address));
|
||||
|
||||
let local_address = address
|
||||
.replace('+', "")
|
||||
.chars()
|
||||
.skip_while(|c| c.is_numeric() || *c == '(' || *c == ')')
|
||||
.collect::<String>()
|
||||
.chars()
|
||||
.filter(|c| c.is_numeric())
|
||||
.collect::<String>();
|
||||
|
||||
if !local_address.is_empty() {
|
||||
filters.push(format!("(is \"phone\" \"{}\")", local_address));
|
||||
}
|
||||
|
||||
format!("(or {})", filters.join(" "))
|
||||
};
|
||||
|
||||
log::trace!(
|
||||
"EDS resolver: GetContactListUids filter: {}, address: {}",
|
||||
filter,
|
||||
address
|
||||
);
|
||||
|
||||
let uids_result: Result<(Vec<String>,), _> = address_book_proxy.method_call(
|
||||
"org.gnome.evolution.dataserver.AddressBook",
|
||||
"GetContactListUids",
|
||||
(filter,),
|
||||
);
|
||||
|
||||
let (uids,) = match uids_result {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
log::debug!("EDS resolver: GetContactListUids failed: {}", e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
uids.into_iter().next()
|
||||
}
|
||||
|
||||
fn get_contact_display_name(&self, contact_id: &Self::ContactID) -> Option<String> {
|
||||
let handle = match obtain_handle() {
|
||||
Some(h) => h,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let address_book_proxy = handle.connection.with_proxy(
|
||||
&handle.bus_name,
|
||||
&handle.object_path,
|
||||
Duration::from_secs(60),
|
||||
);
|
||||
|
||||
ensure_address_book_open(&address_book_proxy);
|
||||
|
||||
let vcard_result: Result<(String,), _> = address_book_proxy.method_call(
|
||||
"org.gnome.evolution.dataserver.AddressBook",
|
||||
"GetContact",
|
||||
(contact_id.clone(),),
|
||||
);
|
||||
|
||||
let (vcard,) = match vcard_result {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
log::debug!("EDS resolver: GetContact failed: {}", e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
for line in vcard.lines() {
|
||||
if let Some(rest) = line.strip_prefix("FN:") {
|
||||
return Some(rest.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EDSContactResolverBackend {
|
||||
fn default() -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
16
core/kordophoned/src/daemon/contact_resolver/generic.rs
Normal file
16
core/kordophoned/src/daemon/contact_resolver/generic.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
use super::ContactResolverBackend;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct GenericContactResolverBackend;
|
||||
|
||||
impl ContactResolverBackend for GenericContactResolverBackend {
|
||||
type ContactID = String;
|
||||
|
||||
fn resolve_contact_id(&self, address: &str) -> Option<Self::ContactID> {
|
||||
None
|
||||
}
|
||||
|
||||
fn get_contact_display_name(&self, contact_id: &Self::ContactID) -> Option<String> {
|
||||
None
|
||||
}
|
||||
}
|
||||
107
core/kordophoned/src/daemon/contact_resolver/mod.rs
Normal file
107
core/kordophoned/src/daemon/contact_resolver/mod.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod eds;
|
||||
|
||||
pub mod generic;
|
||||
|
||||
// Convenient alias for the platform's default backend
|
||||
#[cfg(target_os = "linux")]
|
||||
pub type DefaultContactResolverBackend = eds::EDSContactResolverBackend;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub type DefaultContactResolverBackend = generic::GenericContactResolverBackend;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
#[derive(Clone)]
|
||||
pub struct EDSContactResolverBackend;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
impl Default for EDSContactResolverBackend {
|
||||
fn default() -> Self {
|
||||
EDSContactResolverBackend
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
impl ContactResolverBackend for EDSContactResolverBackend {
|
||||
type ContactID = String;
|
||||
|
||||
fn resolve_contact_id(&self, _address: &str) -> Option<Self::ContactID> {
|
||||
None
|
||||
}
|
||||
|
||||
fn get_contact_display_name(&self, _contact_id: &Self::ContactID) -> Option<String> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub trait ContactResolverBackend {
|
||||
type ContactID;
|
||||
|
||||
fn resolve_contact_id(&self, address: &str) -> Option<Self::ContactID>;
|
||||
fn get_contact_display_name(&self, contact_id: &Self::ContactID) -> Option<String>;
|
||||
}
|
||||
|
||||
pub type AnyContactID = String;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ContactResolver<T: ContactResolverBackend> {
|
||||
backend: T,
|
||||
display_name_cache: HashMap<AnyContactID, String>,
|
||||
contact_id_cache: HashMap<String, AnyContactID>,
|
||||
}
|
||||
|
||||
impl<T: ContactResolverBackend> ContactResolver<T>
|
||||
where
|
||||
T::ContactID: From<AnyContactID>,
|
||||
T::ContactID: Into<AnyContactID>,
|
||||
T: Default,
|
||||
{
|
||||
pub fn new(backend: T) -> Self {
|
||||
Self {
|
||||
backend,
|
||||
display_name_cache: HashMap::new(),
|
||||
contact_id_cache: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resolve_contact_id(&mut self, address: &str) -> Option<AnyContactID> {
|
||||
if let Some(id) = self.contact_id_cache.get(address) {
|
||||
return Some(id.clone());
|
||||
}
|
||||
|
||||
let id = self.backend.resolve_contact_id(address).map(|id| id.into());
|
||||
if let Some(ref id) = id {
|
||||
self.contact_id_cache
|
||||
.insert(address.to_string(), id.clone());
|
||||
}
|
||||
|
||||
id
|
||||
}
|
||||
|
||||
pub fn get_contact_display_name(&mut self, contact_id: &AnyContactID) -> Option<String> {
|
||||
if let Some(display_name) = self.display_name_cache.get(contact_id) {
|
||||
return Some(display_name.clone());
|
||||
}
|
||||
|
||||
let backend_contact_id: T::ContactID = T::ContactID::from((*contact_id).clone());
|
||||
let display_name = self.backend.get_contact_display_name(&backend_contact_id);
|
||||
if let Some(ref display_name) = display_name {
|
||||
self.display_name_cache
|
||||
.insert(contact_id.to_string(), display_name.clone());
|
||||
}
|
||||
|
||||
display_name
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ContactResolverBackend> Default for ContactResolver<T>
|
||||
where
|
||||
T::ContactID: From<AnyContactID>,
|
||||
T::ContactID: Into<AnyContactID>,
|
||||
T: Default,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self::new(T::default())
|
||||
}
|
||||
}
|
||||
103
core/kordophoned/src/daemon/events.rs
Normal file
103
core/kordophoned/src/daemon/events.rs
Normal file
@@ -0,0 +1,103 @@
|
||||
use tokio::sync::oneshot;
|
||||
use uuid::Uuid;
|
||||
|
||||
use kordophone::model::ConversationID;
|
||||
use kordophone::model::OutgoingMessage;
|
||||
use kordophone_db::models::Conversation;
|
||||
|
||||
use crate::daemon::settings::Settings;
|
||||
use crate::daemon::{Attachment, Message};
|
||||
|
||||
pub type Reply<T> = oneshot::Sender<T>;
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Event {
|
||||
/// Get the version of the daemon.
|
||||
GetVersion(Reply<String>),
|
||||
|
||||
/// Asynchronous event for syncing the conversation list with the server.
|
||||
SyncConversationList(Reply<()>),
|
||||
|
||||
/// Asynchronous event for syncing all conversations with the server.
|
||||
SyncAllConversations(Reply<()>),
|
||||
|
||||
/// Asynchronous event for syncing a single conversation with the server.
|
||||
SyncConversation(String, Reply<()>),
|
||||
|
||||
/// Asynchronous event for marking a conversation as read.
|
||||
MarkConversationAsRead(String, Reply<()>),
|
||||
|
||||
/// Asynchronous event for updating the metadata for a conversation.
|
||||
UpdateConversationMetadata(Conversation, Reply<()>),
|
||||
|
||||
/// Sent when the update stream is reconnected after a timeout or configuration change.
|
||||
UpdateStreamReconnected,
|
||||
|
||||
/// Returns all known conversations from the database.
|
||||
/// Parameters:
|
||||
/// - limit: The maximum number of conversations to return. (-1 for no limit)
|
||||
/// - offset: The offset into the conversation list to start returning conversations from.
|
||||
GetAllConversations(i32, i32, Reply<Vec<Conversation>>),
|
||||
|
||||
/// Returns all known settings from the database.
|
||||
GetAllSettings(Reply<Settings>),
|
||||
|
||||
/// Update settings in the database.
|
||||
UpdateSettings(Settings, Reply<()>),
|
||||
|
||||
/// Returns all messages for a conversation from the database.
|
||||
/// Parameters:
|
||||
/// - conversation_id: The ID of the conversation to get messages for.
|
||||
/// - last_message_id: (optional) The ID of the last message to get. If None, all messages are returned.
|
||||
GetMessages(String, Option<String>, Reply<Vec<Message>>),
|
||||
|
||||
/// Enqueues a message to be sent to the server.
|
||||
/// Parameters:
|
||||
/// - conversation_id: The ID of the conversation to send the message to.
|
||||
/// - text: The text of the message to send.
|
||||
/// - attachment_guids: The GUIDs of the attachments to send.
|
||||
/// - reply: The outgoing message ID (not the server-assigned message ID).
|
||||
SendMessage(String, String, Vec<String>, Reply<Uuid>),
|
||||
|
||||
/// Notifies the daemon that a message has been sent.
|
||||
/// Parameters:
|
||||
/// - message: The message that was sent.
|
||||
/// - outgoing_message: The outgoing message that was sent.
|
||||
/// - conversation_id: The ID of the conversation that the message was sent to.
|
||||
MessageSent(Message, OutgoingMessage, ConversationID),
|
||||
|
||||
/// Gets an attachment object from the attachment store.
|
||||
/// Parameters:
|
||||
/// - guid: The attachment guid
|
||||
/// - reply: Reply of the attachment object, if known.
|
||||
GetAttachment(String, Reply<Attachment>),
|
||||
|
||||
/// Downloads an attachment from the server.
|
||||
/// Parameters:
|
||||
/// - attachment_id: The attachment ID to download
|
||||
/// - preview: Whether to download the preview (true) or full attachment (false)
|
||||
/// - reply: Reply indicating success or failure
|
||||
DownloadAttachment(String, bool, Reply<()>),
|
||||
|
||||
/// Delete all conversations from the database.
|
||||
DeleteAllConversations(Reply<()>),
|
||||
|
||||
/// Notifies the daemon that an attachment has been downloaded.
|
||||
/// Parameters:
|
||||
/// - attachment_id: The attachment ID that was downloaded.
|
||||
AttachmentDownloaded(String),
|
||||
|
||||
/// Upload an attachment to the server.
|
||||
/// Parameters:
|
||||
/// - path: The path to the attachment file
|
||||
/// - reply: Reply indicating the upload GUID
|
||||
UploadAttachment(PathBuf, Reply<String>),
|
||||
|
||||
/// Notifies the daemon that an attachment has been uploaded.
|
||||
/// Parameters:
|
||||
/// - upload_id: The upload ID that was uploaded.
|
||||
/// - attachment_id: The attachment ID that was uploaded.
|
||||
AttachmentUploaded(String, String),
|
||||
}
|
||||
745
core/kordophoned/src/daemon/mod.rs
Normal file
745
core/kordophoned/src/daemon/mod.rs
Normal file
@@ -0,0 +1,745 @@
|
||||
pub mod settings;
|
||||
use settings::keys as SettingsKey;
|
||||
use settings::Settings;
|
||||
|
||||
pub mod events;
|
||||
use events::*;
|
||||
|
||||
pub mod signals;
|
||||
use signals::*;
|
||||
|
||||
use anyhow::Result;
|
||||
use directories::ProjectDirs;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use uuid::Uuid;
|
||||
|
||||
use kordophone_db::{
|
||||
database::{Database, DatabaseAccess},
|
||||
models::Conversation,
|
||||
};
|
||||
|
||||
use kordophone::api::http_client::HTTPAPIClient;
|
||||
use kordophone::api::APIInterface;
|
||||
use kordophone::model::outgoing_message::OutgoingMessage;
|
||||
use kordophone::model::{ConversationID, MessageID};
|
||||
|
||||
mod update_monitor;
|
||||
use update_monitor::{UpdateMonitor, UpdateMonitorCommand};
|
||||
|
||||
mod auth_store;
|
||||
use auth_store::DatabaseAuthenticationStore;
|
||||
|
||||
mod post_office;
|
||||
use post_office::Event as PostOfficeEvent;
|
||||
use post_office::PostOffice;
|
||||
|
||||
mod models;
|
||||
pub use models::Attachment;
|
||||
pub use models::Message;
|
||||
|
||||
mod attachment_store;
|
||||
pub use attachment_store::AttachmentStore;
|
||||
pub use attachment_store::AttachmentStoreEvent;
|
||||
|
||||
pub mod contact_resolver;
|
||||
use contact_resolver::ContactResolver;
|
||||
use contact_resolver::DefaultContactResolverBackend;
|
||||
|
||||
use kordophone_db::models::participant::Participant as DbParticipant;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DaemonError {
|
||||
#[error("Client Not Configured")]
|
||||
ClientNotConfigured,
|
||||
}
|
||||
|
||||
pub type DaemonResult<T> = Result<T, Box<dyn Error + Send + Sync>>;
|
||||
|
||||
pub mod target {
|
||||
pub static SYNC: &str = "sync";
|
||||
pub static EVENT: &str = "event";
|
||||
pub static SETTINGS: &str = "settings";
|
||||
pub static UPDATES: &str = "updates";
|
||||
pub static ATTACHMENTS: &str = "attachments";
|
||||
pub static DAEMON: &str = "daemon";
|
||||
}
|
||||
|
||||
pub struct Daemon {
|
||||
pub event_sender: Sender<Event>,
|
||||
event_receiver: Receiver<Event>,
|
||||
|
||||
signal_receiver: Option<Receiver<Signal>>,
|
||||
signal_sender: Sender<Signal>,
|
||||
|
||||
post_office_sink: Sender<PostOfficeEvent>,
|
||||
post_office_source: Option<Receiver<PostOfficeEvent>>,
|
||||
|
||||
outgoing_messages: HashMap<ConversationID, Vec<OutgoingMessage>>,
|
||||
|
||||
attachment_store_sink: Option<Sender<AttachmentStoreEvent>>,
|
||||
update_monitor_command_tx: Option<Sender<UpdateMonitorCommand>>,
|
||||
|
||||
version: String,
|
||||
database: Arc<Mutex<Database>>,
|
||||
runtime: tokio::runtime::Runtime,
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
pub fn new() -> Result<Self> {
|
||||
let database_path = Self::get_database_path();
|
||||
log::info!("Database path: {}", database_path.display());
|
||||
|
||||
// Create the database directory if it doesn't exist
|
||||
let database_dir = database_path.parent().unwrap();
|
||||
std::fs::create_dir_all(database_dir)?;
|
||||
|
||||
// Create event channels
|
||||
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
|
||||
let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100);
|
||||
let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100);
|
||||
|
||||
// Create background task runtime
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let database_impl = Database::new(&database_path.to_string_lossy())?;
|
||||
let database = Arc::new(Mutex::new(database_impl));
|
||||
|
||||
Ok(Self {
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
database,
|
||||
event_receiver,
|
||||
event_sender,
|
||||
signal_receiver: Some(signal_receiver),
|
||||
signal_sender,
|
||||
post_office_sink,
|
||||
post_office_source: Some(post_office_source),
|
||||
outgoing_messages: HashMap::new(),
|
||||
attachment_store_sink: None,
|
||||
update_monitor_command_tx: None,
|
||||
runtime,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
log::info!("Starting daemon version {}", self.version);
|
||||
log::debug!("Debug logging enabled.");
|
||||
|
||||
// Update monitor
|
||||
let mut update_monitor =
|
||||
UpdateMonitor::new(self.database.clone(), self.event_sender.clone());
|
||||
self.update_monitor_command_tx = Some(update_monitor.take_command_channel());
|
||||
tokio::spawn(async move {
|
||||
update_monitor.run().await; // should run indefinitely
|
||||
});
|
||||
|
||||
// Post office
|
||||
{
|
||||
let mut database = self.database.clone();
|
||||
let event_sender = self.event_sender.clone();
|
||||
let post_office_source = self.post_office_source.take().unwrap();
|
||||
tokio::spawn(async move {
|
||||
let mut post_office =
|
||||
PostOffice::new(post_office_source, event_sender, async move || {
|
||||
Self::get_client_impl(&mut database).await
|
||||
});
|
||||
post_office.run().await;
|
||||
});
|
||||
}
|
||||
|
||||
// Attachment store
|
||||
let mut attachment_store =
|
||||
AttachmentStore::new(self.database.clone(), self.event_sender.clone());
|
||||
self.attachment_store_sink = Some(attachment_store.get_event_sink());
|
||||
tokio::spawn(async move {
|
||||
attachment_store.run().await;
|
||||
});
|
||||
|
||||
while let Some(event) = self.event_receiver.recv().await {
|
||||
log::debug!(target: target::EVENT, "Received event: {:?}", event);
|
||||
self.handle_event(event).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_conversation_list_sync(&mut self) {
|
||||
let mut db_clone = self.database.clone();
|
||||
let signal_sender = self.signal_sender.clone();
|
||||
self.runtime.spawn(async move {
|
||||
let result = Self::sync_conversation_list(&mut db_clone, &signal_sender).await;
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::SYNC, "Error handling sync event: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_event(&mut self, event: Event) {
|
||||
match event {
|
||||
Event::GetVersion(reply) => {
|
||||
reply.send(self.version.clone()).unwrap();
|
||||
}
|
||||
|
||||
Event::SyncConversationList(reply) => {
|
||||
self.spawn_conversation_list_sync();
|
||||
|
||||
// This is a background operation, so return right away.
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::SyncAllConversations(reply) => {
|
||||
let mut db_clone = self.database.clone();
|
||||
let signal_sender = self.signal_sender.clone();
|
||||
self.runtime.spawn(async move {
|
||||
let result =
|
||||
Self::sync_all_conversations_impl(&mut db_clone, &signal_sender).await;
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::SYNC, "Error handling sync event: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
// This is a background operation, so return right away.
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::SyncConversation(conversation_id, reply) => {
|
||||
let mut db_clone = self.database.clone();
|
||||
let signal_sender = self.signal_sender.clone();
|
||||
self.runtime.spawn(async move {
|
||||
let result = Self::sync_conversation_impl(
|
||||
&mut db_clone,
|
||||
&signal_sender,
|
||||
conversation_id,
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::SYNC, "Error handling sync event: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::MarkConversationAsRead(conversation_id, reply) => {
|
||||
let mut db_clone = self.database.clone();
|
||||
self.runtime.spawn(async move {
|
||||
let result = Self::mark_conversation_as_read_impl(&mut db_clone, conversation_id).await;
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::DAEMON, "Error handling mark conversation as read event: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::UpdateConversationMetadata(conversation, reply) => {
|
||||
let mut db_clone = self.database.clone();
|
||||
let signal_sender = self.signal_sender.clone();
|
||||
self.runtime.spawn(async move {
|
||||
let result = Self::update_conversation_metadata_impl(&mut db_clone, conversation, &signal_sender).await;
|
||||
if let Err(e) = result {
|
||||
log::error!(target: target::DAEMON, "Error handling update conversation metadata event: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::UpdateStreamReconnected => {
|
||||
log::info!(target: target::UPDATES, "Update stream reconnected");
|
||||
|
||||
// The ui client will respond differently, but we'll almost certainly want to do a sync-list in response to this.
|
||||
self.spawn_conversation_list_sync();
|
||||
|
||||
// Send signal to the client that the update stream has been reconnected.
|
||||
self.signal_sender
|
||||
.send(Signal::UpdateStreamReconnected)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Event::GetAllConversations(limit, offset, reply) => {
|
||||
let conversations = self.get_conversations_limit_offset(limit, offset).await;
|
||||
reply.send(conversations).unwrap();
|
||||
}
|
||||
|
||||
Event::GetAllSettings(reply) => {
|
||||
let settings = self.get_settings().await.unwrap_or_else(|e| {
|
||||
log::error!(target: target::SETTINGS, "Failed to get settings: {:#?}", e);
|
||||
Settings::default()
|
||||
});
|
||||
|
||||
reply.send(settings).unwrap();
|
||||
}
|
||||
|
||||
Event::UpdateSettings(settings, reply) => {
|
||||
let previous_settings = self.get_settings().await.unwrap_or_default();
|
||||
let previous_server_url = previous_settings.server_url;
|
||||
|
||||
self.update_settings(&settings).await.unwrap_or_else(|e| {
|
||||
log::error!(target: target::SETTINGS, "Failed to update settings: {}", e);
|
||||
});
|
||||
|
||||
// Only trigger re-sync if both URLs are Some and different, or if one is Some and other is None
|
||||
if previous_server_url.as_deref() != settings.server_url.as_deref() {
|
||||
// If the server url has changed, we'll need to do a full re-sync.
|
||||
self.delete_all_conversations().await.unwrap_or_else(|e| {
|
||||
log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e);
|
||||
});
|
||||
|
||||
// Do a sync-list to get the new conversations.
|
||||
self.spawn_conversation_list_sync();
|
||||
|
||||
// Also restart the update monitor.
|
||||
if let Err(e) = self
|
||||
.update_monitor_command_tx
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.try_send(UpdateMonitorCommand::Restart)
|
||||
{
|
||||
log::warn!(target: target::UPDATES, "Failed to send restart command to update monitor: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::GetMessages(conversation_id, last_message_id, reply) => {
|
||||
let messages = self.get_messages(conversation_id, last_message_id).await;
|
||||
reply.send(messages).unwrap();
|
||||
}
|
||||
|
||||
Event::DeleteAllConversations(reply) => {
|
||||
self.delete_all_conversations().await.unwrap_or_else(|e| {
|
||||
log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e);
|
||||
});
|
||||
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::SendMessage(conversation_id, text, attachment_guids, reply) => {
|
||||
let conversation_id = conversation_id.clone();
|
||||
let uuid = self
|
||||
.enqueue_outgoing_message(text, conversation_id.clone(), attachment_guids)
|
||||
.await;
|
||||
reply.send(uuid).unwrap();
|
||||
|
||||
// Send message updated signal, we have a placeholder message we will return.
|
||||
self.signal_sender
|
||||
.send(Signal::MessagesUpdated(conversation_id.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Event::MessageSent(message, outgoing_message, conversation_id) => {
|
||||
log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id);
|
||||
|
||||
// Insert the message into the database.
|
||||
log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id);
|
||||
self.database
|
||||
.lock()
|
||||
.await
|
||||
.with_repository(|r| r.insert_message(&conversation_id, message.into()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Remove from outgoing messages.
|
||||
log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid);
|
||||
self.outgoing_messages
|
||||
.get_mut(&conversation_id)
|
||||
.map(|messages| messages.retain(|m| m.guid != outgoing_message.guid));
|
||||
|
||||
// Send message updated signal.
|
||||
self.signal_sender
|
||||
.send(Signal::MessagesUpdated(conversation_id))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Event::GetAttachment(guid, reply) => {
|
||||
self.attachment_store_sink
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.send(AttachmentStoreEvent::GetAttachmentInfo(guid, reply))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Event::DownloadAttachment(attachment_id, preview, reply) => {
|
||||
log::debug!(target: target::ATTACHMENTS, "Download requested for attachment: {}, preview: {}", &attachment_id, preview);
|
||||
|
||||
self.attachment_store_sink
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.send(AttachmentStoreEvent::QueueDownloadAttachment(
|
||||
attachment_id,
|
||||
preview,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
reply.send(()).unwrap();
|
||||
}
|
||||
|
||||
Event::AttachmentDownloaded(attachment_id) => {
|
||||
log::debug!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id);
|
||||
|
||||
// Send signal to the client that the attachment has been downloaded.
|
||||
self.signal_sender
|
||||
.send(Signal::AttachmentDownloaded(attachment_id))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Event::UploadAttachment(path, reply) => {
|
||||
self.attachment_store_sink
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.send(AttachmentStoreEvent::QueueUploadAttachment(path, reply))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Event::AttachmentUploaded(upload_guid, attachment_guid) => {
|
||||
log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid);
|
||||
|
||||
self.signal_sender
|
||||
.send(Signal::AttachmentUploaded(upload_guid, attachment_guid))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Panics if the signal receiver has already been taken.
|
||||
pub fn obtain_signal_receiver(&mut self) -> Receiver<Signal> {
|
||||
self.signal_receiver.take().unwrap()
|
||||
}
|
||||
|
||||
async fn get_conversations_limit_offset(
|
||||
&mut self,
|
||||
limit: i32,
|
||||
offset: i32,
|
||||
) -> Vec<Conversation> {
|
||||
self.database
|
||||
.lock()
|
||||
.await
|
||||
.with_repository(|r| r.all_conversations(limit, offset).unwrap())
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_messages(
|
||||
&mut self,
|
||||
conversation_id: String,
|
||||
_last_message_id: Option<MessageID>,
|
||||
) -> Vec<Message> {
|
||||
// Get outgoing messages for this conversation.
|
||||
let empty_vec: Vec<OutgoingMessage> = vec![];
|
||||
let outgoing_messages: &Vec<OutgoingMessage> = self
|
||||
.outgoing_messages
|
||||
.get(&conversation_id)
|
||||
.unwrap_or(&empty_vec);
|
||||
|
||||
self.database
|
||||
.lock()
|
||||
.await
|
||||
.with_repository(|r| {
|
||||
r.get_messages_for_conversation(&conversation_id)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|m| m.into()) // Convert db::Message to daemon::Message
|
||||
.chain(outgoing_messages.into_iter().map(|m| m.into()))
|
||||
.collect()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn enqueue_outgoing_message(
|
||||
&mut self,
|
||||
text: String,
|
||||
conversation_id: String,
|
||||
attachment_guids: Vec<String>,
|
||||
) -> Uuid {
|
||||
let conversation_id = conversation_id.clone();
|
||||
let outgoing_message = OutgoingMessage::builder()
|
||||
.text(text)
|
||||
.conversation_id(conversation_id.clone())
|
||||
.file_transfer_guids(attachment_guids)
|
||||
.build();
|
||||
|
||||
// Keep a record of this so we can provide a consistent model to the client.
|
||||
self.outgoing_messages
|
||||
.entry(conversation_id)
|
||||
.or_insert(vec![])
|
||||
.push(outgoing_message.clone());
|
||||
|
||||
let guid = outgoing_message.guid.clone();
|
||||
self.post_office_sink
|
||||
.send(PostOfficeEvent::EnqueueOutgoingMessage(outgoing_message))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
guid
|
||||
}
|
||||
|
||||
async fn sync_conversation_list(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
signal_sender: &Sender<Signal>,
|
||||
) -> Result<()> {
|
||||
log::info!(target: target::SYNC, "Starting list conversation sync");
|
||||
|
||||
let mut client = Self::get_client_impl(database).await?;
|
||||
|
||||
// Fetch conversations from server
|
||||
let fetched_conversations = client.get_conversations().await?;
|
||||
let db_conversations: Vec<kordophone_db::models::Conversation> = fetched_conversations
|
||||
.into_iter()
|
||||
.map(kordophone_db::models::Conversation::from)
|
||||
.collect();
|
||||
|
||||
// Insert each conversation
|
||||
let num_conversations = db_conversations.len();
|
||||
let mut contact_resolver = ContactResolver::new(DefaultContactResolverBackend::default());
|
||||
for conversation in db_conversations {
|
||||
// Insert or update conversation and its participants
|
||||
database
|
||||
.with_repository(|r| r.insert_conversation(conversation.clone()))
|
||||
.await?;
|
||||
|
||||
// Resolve any new participants via the contact resolver and store their contact_id
|
||||
log::trace!(target: target::SYNC, "Resolving participants for conversation: {}", conversation.guid);
|
||||
let guid = conversation.guid.clone();
|
||||
if let Some(saved) = database
|
||||
.with_repository(|r| r.get_conversation_by_guid(&guid))
|
||||
.await?
|
||||
{
|
||||
for p in &saved.participants {
|
||||
if let DbParticipant::Remote {
|
||||
handle,
|
||||
contact_id: None,
|
||||
} = p
|
||||
{
|
||||
log::trace!(target: target::SYNC, "Resolving contact id for participant: {}", handle);
|
||||
if let Some(contact) = contact_resolver.resolve_contact_id(handle) {
|
||||
log::trace!(target: target::SYNC, "Resolved contact id for participant: {}", contact);
|
||||
let _ = database
|
||||
.with_repository(|r| {
|
||||
r.update_participant_contact(&handle, &contact)
|
||||
})
|
||||
.await;
|
||||
} else {
|
||||
log::trace!(target: target::SYNC, "No contact id found for participant: {}", handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send conversations updated signal
|
||||
signal_sender.send(Signal::ConversationsUpdated).await?;
|
||||
|
||||
log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sync_all_conversations_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
signal_sender: &Sender<Signal>,
|
||||
) -> Result<()> {
|
||||
log::info!(target: target::SYNC, "Starting full conversation sync");
|
||||
|
||||
let mut client = Self::get_client_impl(database).await?;
|
||||
|
||||
// Fetch conversations from server
|
||||
let fetched_conversations = client.get_conversations().await?;
|
||||
let db_conversations: Vec<kordophone_db::models::Conversation> = fetched_conversations
|
||||
.into_iter()
|
||||
.map(kordophone_db::models::Conversation::from)
|
||||
.collect();
|
||||
|
||||
// Process each conversation
|
||||
let num_conversations = db_conversations.len();
|
||||
for conversation in db_conversations {
|
||||
let conversation_id = conversation.guid.clone();
|
||||
|
||||
// Insert the conversation
|
||||
database
|
||||
.with_repository(|r| r.insert_conversation(conversation))
|
||||
.await?;
|
||||
|
||||
// Sync individual conversation.
|
||||
Self::sync_conversation_impl(database, signal_sender, conversation_id).await?;
|
||||
}
|
||||
|
||||
// Send conversations updated signal.
|
||||
signal_sender.send(Signal::ConversationsUpdated).await?;
|
||||
|
||||
log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sync_conversation_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
signal_sender: &Sender<Signal>,
|
||||
conversation_id: String,
|
||||
) -> Result<()> {
|
||||
log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id);
|
||||
|
||||
let mut client = Self::get_client_impl(database).await?;
|
||||
|
||||
// Check if conversation exists in database.
|
||||
let conversation = database
|
||||
.with_repository(|r| r.get_conversation_by_guid(&conversation_id))
|
||||
.await?;
|
||||
if conversation.is_none() {
|
||||
// If the conversation doesn't exist, first do a conversation list sync.
|
||||
log::warn!(target: target::SYNC, "Conversation {} not found, performing list sync", conversation_id);
|
||||
Self::sync_conversation_list(database, signal_sender).await?;
|
||||
}
|
||||
|
||||
// Fetch and sync messages for this conversation
|
||||
let last_message_id = database
|
||||
.with_repository(|r| -> Option<String> {
|
||||
r.get_last_message_for_conversation(&conversation_id)
|
||||
.unwrap_or(None)
|
||||
.map(|m| m.id)
|
||||
})
|
||||
.await;
|
||||
|
||||
log::debug!(target: target::SYNC, "Fetching messages for conversation {}", &conversation_id);
|
||||
log::debug!(target: target::SYNC, "Last message id: {:?}", last_message_id);
|
||||
|
||||
let messages = client
|
||||
.get_messages(&conversation_id, None, None, last_message_id)
|
||||
.await?;
|
||||
|
||||
// Filter messages that have an empty body, or a body that is just whitespace.
|
||||
// This is a workaround for a bug in the server where it returns messages with an empty body, which is usually
|
||||
// the typing indicator or stuff like that. In the future, we need to move to ChatItems instead of Messages.
|
||||
let insertable_messages: Vec<kordophone::model::Message> = messages
|
||||
.into_iter()
|
||||
.filter(|m| {
|
||||
(!m.text.is_empty() && !m.text.trim().is_empty())
|
||||
|| !m.file_transfer_guids.is_empty()
|
||||
})
|
||||
.collect();
|
||||
|
||||
let db_messages: Vec<kordophone_db::models::Message> = insertable_messages
|
||||
.into_iter()
|
||||
.map(kordophone_db::models::Message::from)
|
||||
.collect();
|
||||
|
||||
// Insert each message
|
||||
let num_messages = db_messages.len();
|
||||
log::debug!(target: target::SYNC, "Inserting {} messages for conversation {}", num_messages, &conversation_id);
|
||||
database
|
||||
.with_repository(|r| r.insert_messages(&conversation_id, db_messages))
|
||||
.await?;
|
||||
|
||||
// Send messages updated signal, if we actually inserted any messages.
|
||||
if num_messages > 0 {
|
||||
signal_sender
|
||||
.send(Signal::MessagesUpdated(conversation_id.clone()))
|
||||
.await?;
|
||||
}
|
||||
|
||||
log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_conversation_as_read_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
conversation_id: String,
|
||||
) -> Result<()> {
|
||||
log::debug!(target: target::DAEMON, "Marking conversation as read: {}", conversation_id);
|
||||
|
||||
let mut client = Self::get_client_impl(database).await?;
|
||||
client.mark_conversation_as_read(&conversation_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_conversation_metadata_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
conversation: Conversation,
|
||||
signal_sender: &Sender<Signal>,
|
||||
) -> Result<()> {
|
||||
log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid);
|
||||
let updated = database
|
||||
.with_repository(|r| r.merge_conversation_metadata(conversation))
|
||||
.await?;
|
||||
if updated {
|
||||
signal_sender.send(Signal::ConversationsUpdated).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_settings(&mut self) -> Result<Settings> {
|
||||
let settings = self.database.with_settings(Settings::from_db).await?;
|
||||
Ok(settings)
|
||||
}
|
||||
|
||||
async fn update_settings(&mut self, settings: &Settings) -> Result<()> {
|
||||
self.database.with_settings(|s| settings.save(s)).await
|
||||
}
|
||||
|
||||
async fn get_client_impl(
|
||||
database: &mut Arc<Mutex<Database>>,
|
||||
) -> Result<HTTPAPIClient<DatabaseAuthenticationStore>> {
|
||||
let settings = database.with_settings(Settings::from_db).await?;
|
||||
|
||||
let server_url = settings
|
||||
.server_url
|
||||
.ok_or(DaemonError::ClientNotConfigured)?;
|
||||
|
||||
let client = HTTPAPIClient::new(
|
||||
match server_url.parse() {
|
||||
Ok(url) => url,
|
||||
Err(_) => {
|
||||
log::error!(target: target::DAEMON, "Invalid server URL: {}", server_url);
|
||||
return Err(DaemonError::ClientNotConfigured.into());
|
||||
}
|
||||
},
|
||||
DatabaseAuthenticationStore::new(database.clone()),
|
||||
);
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
async fn delete_all_conversations(&mut self) -> Result<()> {
|
||||
self.database
|
||||
.with_repository(|r| -> Result<()> {
|
||||
r.delete_all_conversations()?;
|
||||
r.delete_all_messages()?;
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
self.signal_sender
|
||||
.send(Signal::ConversationsUpdated)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_data_dir() -> Option<PathBuf> {
|
||||
ProjectDirs::from("net", "buzzert", "kordophonecd").map(|p| PathBuf::from(p.data_dir()))
|
||||
}
|
||||
|
||||
fn get_database_path() -> PathBuf {
|
||||
if let Some(data_dir) = Self::get_data_dir() {
|
||||
data_dir.join("database.db")
|
||||
} else {
|
||||
// Fallback to a local path if we can't get the system directories
|
||||
PathBuf::from("database.db")
|
||||
}
|
||||
}
|
||||
}
|
||||
83
core/kordophoned/src/daemon/models/attachment.rs
Normal file
83
core/kordophoned/src/daemon/models/attachment.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AttachmentMetadata {
|
||||
pub attribution_info: Option<AttributionInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AttributionInfo {
|
||||
pub width: Option<u32>,
|
||||
pub height: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Attachment {
|
||||
pub guid: String,
|
||||
pub base_path: PathBuf,
|
||||
pub metadata: Option<AttachmentMetadata>,
|
||||
}
|
||||
|
||||
impl Attachment {
|
||||
pub fn get_path(&self) -> PathBuf {
|
||||
self.get_path_for_preview_scratch(false, false)
|
||||
}
|
||||
|
||||
pub fn get_path_for_preview(&self, preview: bool) -> PathBuf {
|
||||
self.get_path_for_preview_scratch(preview, false)
|
||||
}
|
||||
|
||||
pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf {
|
||||
let extension = if preview { "preview" } else { "full" };
|
||||
if scratch {
|
||||
self.base_path
|
||||
.with_extension(format!("{}.download", extension))
|
||||
} else {
|
||||
self.base_path.with_extension(extension)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_downloaded(&self, preview: bool) -> bool {
|
||||
std::fs::exists(&self.get_path_for_preview(preview)).expect(
|
||||
format!(
|
||||
"Wasn't able to check for the existence of an attachment file path at {}",
|
||||
&self.get_path_for_preview(preview).display()
|
||||
)
|
||||
.as_str(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<kordophone::model::message::AttachmentMetadata> for AttachmentMetadata {
|
||||
fn from(metadata: kordophone::model::message::AttachmentMetadata) -> Self {
|
||||
Self {
|
||||
attribution_info: metadata.attribution_info.map(|info| info.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<kordophone::model::message::AttributionInfo> for AttributionInfo {
|
||||
fn from(info: kordophone::model::message::AttributionInfo) -> Self {
|
||||
Self {
|
||||
width: info.width,
|
||||
height: info.height,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<AttachmentMetadata> for kordophone::model::message::AttachmentMetadata {
|
||||
fn from(metadata: AttachmentMetadata) -> Self {
|
||||
Self {
|
||||
attribution_info: metadata.attribution_info.map(|info| info.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<AttributionInfo> for kordophone::model::message::AttributionInfo {
|
||||
fn from(info: AttributionInfo) -> Self {
|
||||
Self {
|
||||
width: info.width,
|
||||
height: info.height,
|
||||
}
|
||||
}
|
||||
}
|
||||
186
core/kordophoned/src/daemon/models/message.rs
Normal file
186
core/kordophoned/src/daemon/models/message.rs
Normal file
@@ -0,0 +1,186 @@
|
||||
use chrono::DateTime;
|
||||
use chrono::NaiveDateTime;
|
||||
|
||||
use crate::daemon::attachment_store::AttachmentStore;
|
||||
use crate::daemon::models::Attachment;
|
||||
use kordophone::model::message::AttachmentMetadata;
|
||||
use kordophone::model::outgoing_message::OutgoingMessage;
|
||||
use kordophone_db::models::participant::Participant as DbParticipant;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Participant {
|
||||
Me,
|
||||
Remote {
|
||||
handle: String,
|
||||
contact_id: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<String> for Participant {
|
||||
fn from(display_name: String) -> Self {
|
||||
Participant::Remote {
|
||||
handle: display_name,
|
||||
contact_id: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for Participant {
|
||||
fn from(display_name: &str) -> Self {
|
||||
Participant::Remote {
|
||||
handle: display_name.to_string(),
|
||||
contact_id: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<kordophone_db::models::Participant> for Participant {
|
||||
fn from(participant: kordophone_db::models::Participant) -> Self {
|
||||
match participant {
|
||||
kordophone_db::models::Participant::Me => Participant::Me,
|
||||
kordophone_db::models::Participant::Remote { handle, contact_id } => {
|
||||
Participant::Remote { handle, contact_id }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Participant {
|
||||
pub fn display_name(&self) -> String {
|
||||
match self {
|
||||
Participant::Me => "(Me)".to_string(),
|
||||
Participant::Remote { handle, .. } => handle.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Message {
|
||||
pub id: String,
|
||||
pub sender: Participant,
|
||||
pub text: String,
|
||||
pub date: NaiveDateTime,
|
||||
pub attachments: Vec<Attachment>,
|
||||
}
|
||||
|
||||
fn attachments_from(
|
||||
file_transfer_guids: &Vec<String>,
|
||||
attachment_metadata: &Option<HashMap<String, AttachmentMetadata>>,
|
||||
) -> Vec<Attachment> {
|
||||
file_transfer_guids
|
||||
.iter()
|
||||
.map(|guid| {
|
||||
let mut attachment = AttachmentStore::get_attachment_impl(
|
||||
&AttachmentStore::get_default_store_path(),
|
||||
guid,
|
||||
);
|
||||
attachment.metadata = match attachment_metadata {
|
||||
Some(attachment_metadata) => attachment_metadata
|
||||
.get(guid)
|
||||
.cloned()
|
||||
.map(|metadata| metadata.into()),
|
||||
None => None,
|
||||
};
|
||||
|
||||
attachment
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
impl From<kordophone_db::models::Message> for Message {
|
||||
fn from(message: kordophone_db::models::Message) -> Self {
|
||||
let attachments =
|
||||
attachments_from(&message.file_transfer_guids, &message.attachment_metadata);
|
||||
Self {
|
||||
id: message.id,
|
||||
sender: message.sender.into(),
|
||||
text: message.text,
|
||||
date: message.date,
|
||||
attachments,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Message> for kordophone_db::models::Message {
|
||||
fn from(message: Message) -> Self {
|
||||
Self {
|
||||
id: message.id,
|
||||
sender: match message.sender {
|
||||
Participant::Me => kordophone_db::models::Participant::Me,
|
||||
Participant::Remote { handle, contact_id } => {
|
||||
kordophone_db::models::Participant::Remote { handle, contact_id }
|
||||
}
|
||||
},
|
||||
text: message.text,
|
||||
date: message.date,
|
||||
file_transfer_guids: message.attachments.iter().map(|a| a.guid.clone()).collect(),
|
||||
attachment_metadata: {
|
||||
let metadata_map: HashMap<String, kordophone::model::message::AttachmentMetadata> =
|
||||
message
|
||||
.attachments
|
||||
.iter()
|
||||
.filter_map(|a| {
|
||||
a.metadata
|
||||
.as_ref()
|
||||
.map(|m| (a.guid.clone(), m.clone().into()))
|
||||
})
|
||||
.collect();
|
||||
if metadata_map.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(metadata_map)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<kordophone::model::Message> for Message {
|
||||
fn from(message: kordophone::model::Message) -> Self {
|
||||
let attachments =
|
||||
attachments_from(&message.file_transfer_guids, &message.attachment_metadata);
|
||||
Self {
|
||||
id: message.guid,
|
||||
sender: match message.sender {
|
||||
Some(sender) => Participant::Remote {
|
||||
handle: sender,
|
||||
contact_id: None,
|
||||
},
|
||||
None => Participant::Me,
|
||||
},
|
||||
text: message.text,
|
||||
date: DateTime::from_timestamp(
|
||||
message.date.unix_timestamp(),
|
||||
message.date.unix_timestamp_nanos().try_into().unwrap_or(0),
|
||||
)
|
||||
.unwrap()
|
||||
.naive_local(),
|
||||
attachments,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&OutgoingMessage> for Message {
|
||||
fn from(value: &OutgoingMessage) -> Self {
|
||||
Self {
|
||||
id: value.guid.to_string(),
|
||||
sender: Participant::Me,
|
||||
text: value.text.clone(),
|
||||
date: value.date,
|
||||
attachments: Vec::new(), // Outgoing messages don't have attachments initially
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Participant> for DbParticipant {
|
||||
fn from(participant: Participant) -> Self {
|
||||
match participant {
|
||||
Participant::Me => DbParticipant::Me,
|
||||
Participant::Remote { handle, contact_id } => DbParticipant::Remote {
|
||||
handle,
|
||||
contact_id: contact_id.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
5
core/kordophoned/src/daemon/models/mod.rs
Normal file
5
core/kordophoned/src/daemon/models/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod attachment;
|
||||
pub mod message;
|
||||
|
||||
pub use attachment::Attachment;
|
||||
pub use message::Message;
|
||||
130
core/kordophoned/src/daemon/post_office.rs
Normal file
130
core/kordophoned/src/daemon/post_office.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_condvar::Condvar;
|
||||
|
||||
use crate::daemon::events::Event as DaemonEvent;
|
||||
use kordophone::api::APIInterface;
|
||||
use kordophone::model::outgoing_message::OutgoingMessage;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
mod target {
|
||||
pub static POST_OFFICE: &str = "post_office";
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Event {
|
||||
EnqueueOutgoingMessage(OutgoingMessage),
|
||||
}
|
||||
|
||||
pub struct PostOffice<C: APIInterface, F: AsyncFnMut() -> Result<C>> {
|
||||
event_source: Receiver<Event>,
|
||||
event_sink: Sender<DaemonEvent>,
|
||||
make_client: F,
|
||||
message_queue: Mutex<VecDeque<OutgoingMessage>>,
|
||||
message_available: Condvar,
|
||||
}
|
||||
|
||||
impl<C: APIInterface, F: AsyncFnMut() -> Result<C>> PostOffice<C, F> {
|
||||
pub fn new(
|
||||
event_source: Receiver<Event>,
|
||||
event_sink: Sender<DaemonEvent>,
|
||||
make_client: F,
|
||||
) -> Self {
|
||||
Self {
|
||||
event_source,
|
||||
event_sink,
|
||||
make_client,
|
||||
message_queue: Mutex::new(VecDeque::new()),
|
||||
message_available: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn queue_message(&mut self, message: &OutgoingMessage) {
|
||||
self.message_queue.lock().await.push_back(message.clone());
|
||||
self.message_available.notify_one();
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
log::info!(target: target::POST_OFFICE, "Starting post office");
|
||||
|
||||
loop {
|
||||
let mut retry_messages = Vec::new();
|
||||
tokio::select! {
|
||||
// Incoming events
|
||||
Some(event) = self.event_source.recv() => {
|
||||
match event {
|
||||
Event::EnqueueOutgoingMessage(message) => {
|
||||
log::debug!(target: target::POST_OFFICE, "Received enqueue outgoing message event");
|
||||
self.message_queue.lock().await.push_back(message);
|
||||
self.message_available.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Message queue
|
||||
mut lock = self.message_available.wait(self.message_queue.lock().await) => {
|
||||
log::debug!(target: target::POST_OFFICE, "Message available in queue");
|
||||
|
||||
// Get the next message to send, if any
|
||||
let message = lock.pop_front();
|
||||
drop(lock); // Release the lock before sending, we dont want to remain locked while sending.
|
||||
|
||||
if let Some(message) = message {
|
||||
retry_messages = Self::try_send_message(&mut self.make_client, &self.event_sink, message).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !retry_messages.is_empty() {
|
||||
log::debug!(target: target::POST_OFFICE, "Queueing {} messages for retry", retry_messages.len());
|
||||
for message in retry_messages {
|
||||
self.queue_message(&message).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_send_message(
|
||||
make_client: &mut F,
|
||||
event_sink: &Sender<DaemonEvent>,
|
||||
message: OutgoingMessage,
|
||||
) -> Vec<OutgoingMessage> {
|
||||
let mut retry_messages = Vec::new();
|
||||
|
||||
match (make_client)().await {
|
||||
Ok(mut client) => {
|
||||
log::debug!(target: target::POST_OFFICE, "Obtained client, sending message.");
|
||||
match client.send_message(&message).await {
|
||||
Ok(sent_message) => {
|
||||
log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid);
|
||||
|
||||
let conversation_id = message.conversation_id.clone();
|
||||
let event =
|
||||
DaemonEvent::MessageSent(sent_message.into(), message, conversation_id);
|
||||
event_sink.send(event).await.unwrap();
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
log::error!(target: target::POST_OFFICE, "Error sending message: {:?}", e);
|
||||
log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
retry_messages.push(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
log::error!(target: target::POST_OFFICE, "Error creating client: {:?}", e);
|
||||
log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
retry_messages.push(message);
|
||||
}
|
||||
}
|
||||
|
||||
retry_messages
|
||||
}
|
||||
}
|
||||
48
core/kordophoned/src/daemon/settings.rs
Normal file
48
core/kordophoned/src/daemon/settings.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use anyhow::Result;
|
||||
use kordophone_db::settings::Settings as DbSettings;
|
||||
|
||||
pub mod keys {
|
||||
pub static SERVER_URL: &str = "ServerURL";
|
||||
pub static USERNAME: &str = "Username";
|
||||
pub static TOKEN: &str = "Token";
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Settings {
|
||||
pub server_url: Option<String>,
|
||||
pub username: Option<String>,
|
||||
pub token: Option<String>,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
pub fn from_db(db_settings: &mut DbSettings) -> Result<Self> {
|
||||
let server_url = db_settings.get(keys::SERVER_URL)?;
|
||||
let username = db_settings.get(keys::USERNAME)?;
|
||||
let token = db_settings.get(keys::TOKEN)?;
|
||||
|
||||
// Create the settings struct with the results
|
||||
let settings = Self {
|
||||
server_url,
|
||||
username,
|
||||
token,
|
||||
};
|
||||
|
||||
// Load bearing
|
||||
log::debug!("Loaded settings: {:?}", settings);
|
||||
|
||||
Ok(settings)
|
||||
}
|
||||
|
||||
pub fn save(&self, db_settings: &mut DbSettings) -> Result<()> {
|
||||
if let Some(server_url) = &self.server_url {
|
||||
db_settings.put(keys::SERVER_URL, &server_url)?;
|
||||
}
|
||||
if let Some(username) = &self.username {
|
||||
db_settings.put(keys::USERNAME, &username)?;
|
||||
}
|
||||
if let Some(token) = &self.token {
|
||||
db_settings.put(keys::TOKEN, &token)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
24
core/kordophoned/src/daemon/signals.rs
Normal file
24
core/kordophoned/src/daemon/signals.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Signal {
|
||||
/// Emitted when the list of conversations is updated.
|
||||
ConversationsUpdated,
|
||||
|
||||
/// Emitted when the list of messages for a conversation is updated.
|
||||
/// Parameters:
|
||||
/// - conversation_id: The ID of the conversation that was updated.
|
||||
MessagesUpdated(String),
|
||||
|
||||
/// Emitted when an attachment has been downloaded.
|
||||
/// Parameters:
|
||||
/// - attachment_id: The ID of the attachment that was downloaded.
|
||||
AttachmentDownloaded(String),
|
||||
|
||||
/// Emitted when an attachment has been uploaded.
|
||||
/// Parameters:
|
||||
/// - upload_guid: The GUID of the upload.
|
||||
/// - attachment_guid: The GUID of the attachment on the server.
|
||||
AttachmentUploaded(String, String),
|
||||
|
||||
/// Emitted when the update stream is reconnected after a timeout or configuration change.
|
||||
UpdateStreamReconnected,
|
||||
}
|
||||
241
core/kordophoned/src/daemon/update_monitor.rs
Normal file
241
core/kordophoned/src/daemon/update_monitor.rs
Normal file
@@ -0,0 +1,241 @@
|
||||
use crate::daemon::{
|
||||
events::{Event, Reply},
|
||||
target, Daemon, DaemonResult,
|
||||
};
|
||||
|
||||
use futures_util::SinkExt;
|
||||
use kordophone::api::event_socket::{EventSocket, SinkMessage};
|
||||
use kordophone::model::event::Event as UpdateEvent;
|
||||
use kordophone::model::event::EventData as UpdateEventData;
|
||||
use kordophone::APIInterface;
|
||||
|
||||
use kordophone_db::database::Database;
|
||||
use kordophone_db::database::DatabaseAccess;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub enum UpdateMonitorCommand {
|
||||
Restart,
|
||||
}
|
||||
|
||||
pub struct UpdateMonitor {
|
||||
command_tx: Option<Sender<UpdateMonitorCommand>>,
|
||||
command_rx: Receiver<UpdateMonitorCommand>,
|
||||
database: Arc<Mutex<Database>>,
|
||||
event_sender: Sender<Event>,
|
||||
last_sync_times: HashMap<String, Instant>,
|
||||
update_seq: Option<u64>,
|
||||
first_connection: bool,
|
||||
}
|
||||
|
||||
impl UpdateMonitor {
|
||||
pub fn new(database: Arc<Mutex<Database>>, event_sender: Sender<Event>) -> Self {
|
||||
let (command_tx, command_rx) = tokio::sync::mpsc::channel(100);
|
||||
Self {
|
||||
database,
|
||||
event_sender,
|
||||
last_sync_times: HashMap::new(),
|
||||
update_seq: None,
|
||||
first_connection: false, // optimistic assumption that we're not reconnecting the first time.
|
||||
command_tx: Some(command_tx),
|
||||
command_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn take_command_channel(&mut self) -> Sender<UpdateMonitorCommand> {
|
||||
self.command_tx.take().unwrap()
|
||||
}
|
||||
|
||||
async fn send_event<T>(&self, make_event: impl FnOnce(Reply<T>) -> Event) -> DaemonResult<T> {
|
||||
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
|
||||
self.event_sender
|
||||
.send(make_event(reply_tx))
|
||||
.await
|
||||
.map_err(|_| "Failed to send event")?;
|
||||
|
||||
reply_rx.await.map_err(|_| "Failed to receive reply".into())
|
||||
}
|
||||
|
||||
async fn handle_update(&mut self, update: UpdateEvent) {
|
||||
match update.data {
|
||||
UpdateEventData::ConversationChanged(conversation) => {
|
||||
log::info!(target: target::UPDATES, "Conversation changed: {}", conversation.guid);
|
||||
|
||||
// Explicitly update the unread count, we assume this is fresh from the notification.
|
||||
let db_conversation: kordophone_db::models::Conversation =
|
||||
conversation.clone().into();
|
||||
self.send_event(|r| Event::UpdateConversationMetadata(db_conversation, r))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("Failed to send daemon event: {}", e);
|
||||
});
|
||||
|
||||
// Check if we've synced this conversation recently (within 5 seconds)
|
||||
// This is currently a hack/workaround to prevent an infinite loop of sync events, because for some reason
|
||||
// imagent will post a conversation changed notification when we call getMessages.
|
||||
if let Some(last_sync) = self.last_sync_times.get(&conversation.guid) {
|
||||
if last_sync.elapsed() < Duration::from_secs(1) {
|
||||
log::warn!(target: target::UPDATES, "Skipping sync for conversation id: {}. Last sync was {} seconds ago.",
|
||||
conversation.guid, last_sync.elapsed().as_secs_f64());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// This is the non-hacky path once we can reason about chat items with associatedMessageGUIDs (e.g., reactions).
|
||||
let last_message = self
|
||||
.database
|
||||
.with_repository(|r| r.get_last_message_for_conversation(&conversation.guid))
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
match (&last_message, &conversation.last_message) {
|
||||
(Some(message), Some(conversation_message)) => {
|
||||
if message.id == conversation_message.guid {
|
||||
log::warn!(target: target::UPDATES, "Skipping sync for conversation id: {}. We already have this message.", &conversation.guid);
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
|
||||
// Update the last sync time and proceed with sync
|
||||
self.last_sync_times
|
||||
.insert(conversation.guid.clone(), Instant::now());
|
||||
|
||||
log::info!(target: target::UPDATES, "Syncing new messages for conversation id: {}", conversation.guid);
|
||||
self.send_event(|r| Event::SyncConversation(conversation.guid, r))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("Failed to send daemon event: {}", e);
|
||||
});
|
||||
}
|
||||
|
||||
UpdateEventData::MessageReceived(conversation, message) => {
|
||||
log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid);
|
||||
log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid);
|
||||
self.send_event(|r| Event::SyncConversation(conversation.guid, r))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("Failed to send daemon event: {}", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
use futures_util::stream::StreamExt;
|
||||
|
||||
log::info!(target: target::UPDATES, "Starting update monitor");
|
||||
|
||||
loop {
|
||||
log::debug!(target: target::UPDATES, "Creating client");
|
||||
let mut client = match Daemon::get_client_impl(&mut self.database).await {
|
||||
Ok(client) => client,
|
||||
Err(e) => {
|
||||
log::error!("Failed to get client: {}", e);
|
||||
log::warn!("Retrying in 5 seconds...");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
log::debug!(target: target::UPDATES, "Opening event socket");
|
||||
let socket = match client.open_event_socket(self.update_seq).await {
|
||||
Ok(events) => events,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to open event socket: {}", e);
|
||||
log::warn!("Retrying in 5 seconds...");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
log::debug!(target: target::UPDATES, "Starting event stream");
|
||||
let (mut event_stream, mut sink) = socket.events().await;
|
||||
|
||||
// We won't know if the websocket is dead until we try to send a message, so time out waiting for
|
||||
// a message every 30 seconds.
|
||||
let mut timeout = tokio::time::interval(Duration::from_secs(10));
|
||||
timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
// First tick will happen immediately
|
||||
timeout.tick().await;
|
||||
|
||||
// Track when the last ping was sent so we know when to give up
|
||||
// waiting for the corresponding pong.
|
||||
let mut ping_sent_at: Option<Instant> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(result) = event_stream.next() => {
|
||||
match result {
|
||||
Ok(socket_event) => {
|
||||
match socket_event {
|
||||
kordophone::api::event_socket::SocketEvent::Update(event) => {
|
||||
self.handle_update(event).await;
|
||||
}
|
||||
|
||||
kordophone::api::event_socket::SocketEvent::Pong => {
|
||||
log::debug!(target: target::UPDATES, "Received websocket pong");
|
||||
}
|
||||
}
|
||||
|
||||
if self.first_connection {
|
||||
self.event_sender.send(Event::UpdateStreamReconnected).await.unwrap();
|
||||
self.first_connection = false;
|
||||
}
|
||||
|
||||
// Any successfully handled message (update or pong) keeps the connection alive.
|
||||
ping_sent_at = None;
|
||||
timeout.reset();
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Error in event stream: {}", e);
|
||||
self.first_connection = true;
|
||||
break; // Break inner loop to reconnect
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ = timeout.tick() => {
|
||||
// If we previously sent a ping and haven't heard back since the timeout, we'll assume the connection is dead.
|
||||
if let Some(_) = ping_sent_at {
|
||||
log::error!(target: target::UPDATES, "Ping timed out. Restarting stream.");
|
||||
self.first_connection = true;
|
||||
break;
|
||||
}
|
||||
|
||||
log::debug!("Sending websocket ping on timer");
|
||||
match sink.send(SinkMessage::Ping).await {
|
||||
Ok(_) => {
|
||||
ping_sent_at = Some(Instant::now());
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
log::error!(target: target::UPDATES, "Error writing ping to event socket: {}, restarting stream.", e);
|
||||
self.first_connection = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(command) = self.command_rx.recv() => {
|
||||
match command {
|
||||
UpdateMonitorCommand::Restart => {
|
||||
log::info!(target: target::UPDATES, "Restarting update monitor");
|
||||
self.first_connection = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add a small delay before reconnecting to avoid tight reconnection loops
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user