bugfixes, better handling of server url changes
This commit is contained in:
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -1021,6 +1021,7 @@ dependencies = [
|
|||||||
"tokio-tungstenite",
|
"tokio-tungstenite",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tungstenite",
|
"tungstenite",
|
||||||
|
"urlencoding",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -2079,6 +2080,12 @@ version = "0.1.14"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
|
checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "urlencoding"
|
||||||
|
version = "2.1.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "utf-8"
|
name = "utf-8"
|
||||||
version = "0.7.6"
|
version = "0.7.6"
|
||||||
|
|||||||
@@ -24,4 +24,5 @@ tokio = { version = "1.37.0", features = ["full"] }
|
|||||||
tokio-tungstenite = "0.26.2"
|
tokio-tungstenite = "0.26.2"
|
||||||
tokio-util = { version = "0.7.15", features = ["futures-util"] }
|
tokio-util = { version = "0.7.15", features = ["futures-util"] }
|
||||||
tungstenite = "0.26.2"
|
tungstenite = "0.26.2"
|
||||||
|
urlencoding = "2.1.3"
|
||||||
uuid = { version = "1.6.1", features = ["v4", "fast-rng", "macro-diagnostics"] }
|
uuid = { version = "1.6.1", features = ["v4", "fast-rng", "macro-diagnostics"] }
|
||||||
|
|||||||
@@ -346,7 +346,8 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| Error::ClientError(e.to_string()))?;
|
.map_err(|e| Error::ClientError(e.to_string()))?;
|
||||||
|
|
||||||
let endpoint = format!("uploadAttachment?filename={}", filename);
|
let encoded_filename = urlencoding::encode(filename);
|
||||||
|
let endpoint = format!("uploadAttachment?filename={}", encoded_filename);
|
||||||
let mut bytes_opt = Some(bytes);
|
let mut bytes_opt = Some(bytes);
|
||||||
|
|
||||||
let response: UploadAttachmentResponse = self
|
let response: UploadAttachmentResponse = self
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ use kordophone::model::outgoing_message::OutgoingMessage;
|
|||||||
use kordophone::model::ConversationID;
|
use kordophone::model::ConversationID;
|
||||||
|
|
||||||
mod update_monitor;
|
mod update_monitor;
|
||||||
use update_monitor::UpdateMonitor;
|
use update_monitor::{UpdateMonitor, UpdateMonitorCommand};
|
||||||
|
|
||||||
mod auth_store;
|
mod auth_store;
|
||||||
use auth_store::DatabaseAuthenticationStore;
|
use auth_store::DatabaseAuthenticationStore;
|
||||||
@@ -80,6 +80,7 @@ pub struct Daemon {
|
|||||||
outgoing_messages: HashMap<ConversationID, Vec<OutgoingMessage>>,
|
outgoing_messages: HashMap<ConversationID, Vec<OutgoingMessage>>,
|
||||||
|
|
||||||
attachment_store_sink: Option<Sender<AttachmentStoreEvent>>,
|
attachment_store_sink: Option<Sender<AttachmentStoreEvent>>,
|
||||||
|
update_monitor_command_tx: Option<Sender<UpdateMonitorCommand>>,
|
||||||
|
|
||||||
version: String,
|
version: String,
|
||||||
database: Arc<Mutex<Database>>,
|
database: Arc<Mutex<Database>>,
|
||||||
@@ -120,6 +121,7 @@ impl Daemon {
|
|||||||
post_office_source: Some(post_office_source),
|
post_office_source: Some(post_office_source),
|
||||||
outgoing_messages: HashMap::new(),
|
outgoing_messages: HashMap::new(),
|
||||||
attachment_store_sink: None,
|
attachment_store_sink: None,
|
||||||
|
update_monitor_command_tx: None,
|
||||||
runtime,
|
runtime,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -131,6 +133,7 @@ impl Daemon {
|
|||||||
// Update monitor
|
// Update monitor
|
||||||
let mut update_monitor =
|
let mut update_monitor =
|
||||||
UpdateMonitor::new(self.database.clone(), self.event_sender.clone());
|
UpdateMonitor::new(self.database.clone(), self.event_sender.clone());
|
||||||
|
self.update_monitor_command_tx = Some(update_monitor.take_command_channel());
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
update_monitor.run().await; // should run indefinitely
|
update_monitor.run().await; // should run indefinitely
|
||||||
});
|
});
|
||||||
@@ -248,10 +251,30 @@ impl Daemon {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Event::UpdateSettings(settings, reply) => {
|
Event::UpdateSettings(settings, reply) => {
|
||||||
|
let previous_server_url = self.get_settings().await.unwrap_or_default().server_url;
|
||||||
|
|
||||||
self.update_settings(&settings).await.unwrap_or_else(|e| {
|
self.update_settings(&settings).await.unwrap_or_else(|e| {
|
||||||
log::error!(target: target::SETTINGS, "Failed to update settings: {}", e);
|
log::error!(target: target::SETTINGS, "Failed to update settings: {}", e);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if previous_server_url != settings.server_url {
|
||||||
|
// If the server url has changed, we'll need to do a full re-sync.
|
||||||
|
self.delete_all_conversations().await.unwrap_or_else(|e| {
|
||||||
|
log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Do a sync-list to get the new conversations.
|
||||||
|
self.spawn_conversation_list_sync();
|
||||||
|
|
||||||
|
// Also restart the update monitor.
|
||||||
|
self.update_monitor_command_tx
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.send(UpdateMonitorCommand::Restart)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
reply.send(()).unwrap();
|
reply.send(()).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,10 +15,16 @@ use kordophone_db::database::DatabaseAccess;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
pub enum UpdateMonitorCommand {
|
||||||
|
Restart,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct UpdateMonitor {
|
pub struct UpdateMonitor {
|
||||||
|
command_tx: Option<Sender<UpdateMonitorCommand>>,
|
||||||
|
command_rx: Receiver<UpdateMonitorCommand>,
|
||||||
database: Arc<Mutex<Database>>,
|
database: Arc<Mutex<Database>>,
|
||||||
event_sender: Sender<Event>,
|
event_sender: Sender<Event>,
|
||||||
last_sync_times: HashMap<String, Instant>,
|
last_sync_times: HashMap<String, Instant>,
|
||||||
@@ -28,16 +34,23 @@ pub struct UpdateMonitor {
|
|||||||
|
|
||||||
impl UpdateMonitor {
|
impl UpdateMonitor {
|
||||||
pub fn new(database: Arc<Mutex<Database>>, event_sender: Sender<Event>) -> Self {
|
pub fn new(database: Arc<Mutex<Database>>, event_sender: Sender<Event>) -> Self {
|
||||||
|
let (command_tx, command_rx) = tokio::sync::mpsc::channel(1);
|
||||||
Self {
|
Self {
|
||||||
database,
|
database,
|
||||||
event_sender,
|
event_sender,
|
||||||
last_sync_times: HashMap::new(),
|
last_sync_times: HashMap::new(),
|
||||||
update_seq: None,
|
update_seq: None,
|
||||||
first_connection: false, // optimistic assumption that we're not reconnecting the first time.
|
first_connection: false, // optimistic assumption that we're not reconnecting the first time.
|
||||||
|
command_tx: Some(command_tx),
|
||||||
|
command_rx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_event<T>(
|
pub fn take_command_channel(&mut self) -> Sender<UpdateMonitorCommand> {
|
||||||
|
self.command_tx.take().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_event<T>(
|
||||||
&self,
|
&self,
|
||||||
make_event: impl FnOnce(Reply<T>) -> Event,
|
make_event: impl FnOnce(Reply<T>) -> Event,
|
||||||
) -> DaemonResult<T> {
|
) -> DaemonResult<T> {
|
||||||
@@ -201,6 +214,15 @@ impl UpdateMonitor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Some(command) = self.command_rx.recv() => {
|
||||||
|
match command {
|
||||||
|
UpdateMonitorCommand::Restart => {
|
||||||
|
log::info!(target: target::UPDATES, "Restarting update monitor");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user