Private
Public Access
1
0

daemon: adds conversation list limit, fixes auth saving in db auth store

This commit is contained in:
2025-05-03 18:19:48 -07:00
parent 26d54f91d5
commit 0d61b6f2d7
13 changed files with 69 additions and 37 deletions

View File

@@ -84,11 +84,14 @@ impl<'a> Repository<'a> {
Ok(None) Ok(None)
} }
pub fn all_conversations(&mut self) -> Result<Vec<Conversation>> { pub fn all_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<Conversation>> {
use crate::schema::conversations::dsl::*; use crate::schema::conversations::dsl::*;
use crate::schema::participants::dsl::*; use crate::schema::participants::dsl::*;
let db_conversations = conversations let db_conversations = conversations
.order(schema::conversations::date.desc())
.offset(offset as i64)
.limit(limit as i64)
.load::<ConversationRecord>(self.connection)?; .load::<ConversationRecord>(self.connection)?;
let mut result = Vec::new(); let mut result = Vec::new();

View File

@@ -5,8 +5,8 @@ use async_trait::async_trait;
#[async_trait] #[async_trait]
pub trait AuthenticationStore { pub trait AuthenticationStore {
async fn get_credentials(&mut self) -> Option<Credentials>; async fn get_credentials(&mut self) -> Option<Credentials>;
async fn get_token(&mut self) -> Option<JwtToken>; async fn get_token(&mut self) -> Option<String>;
async fn set_token(&mut self, token: JwtToken); async fn set_token(&mut self, token: String);
} }
pub struct InMemoryAuthenticationStore { pub struct InMemoryAuthenticationStore {
@@ -35,11 +35,11 @@ impl AuthenticationStore for InMemoryAuthenticationStore {
self.credentials.clone() self.credentials.clone()
} }
async fn get_token(&mut self) -> Option<JwtToken> { async fn get_token(&mut self) -> Option<String> {
self.token.clone() self.token.clone().map(|token| token.to_string())
} }
async fn set_token(&mut self, token: JwtToken) { async fn set_token(&mut self, token: String) {
self.token = Some(token); self.token = Some(JwtToken::new(&token).unwrap());
} }
} }

View File

@@ -92,6 +92,7 @@ impl From <tungstenite::Error> for Error {
trait AuthBuilder { trait AuthBuilder {
fn with_auth(self, token: &Option<JwtToken>) -> Self; fn with_auth(self, token: &Option<JwtToken>) -> Self;
fn with_auth_string(self, token: &Option<String>) -> Self;
} }
impl AuthBuilder for hyper::http::request::Builder { impl AuthBuilder for hyper::http::request::Builder {
@@ -100,6 +101,12 @@ impl AuthBuilder for hyper::http::request::Builder {
self.header("Authorization", token.to_header_value()) self.header("Authorization", token.to_header_value())
} else { self } } else { self }
} }
fn with_auth_string(self, token: &Option<String>) -> Self {
if let Some(token) = &token {
self.header("Authorization", format!("Bearer: {}", token))
} else { self }
}
} }
#[cfg(test)] #[cfg(test)]
@@ -196,7 +203,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
let token = JwtToken::new(&token.jwt).map_err(|e| Error::DecodeError(e.to_string()))?; let token = JwtToken::new(&token.jwt).map_err(|e| Error::DecodeError(e.to_string()))?;
log::debug!("Saving token: {:?}", token); log::debug!("Saving token: {:?}", token);
self.auth_store.set_token(token.clone()).await; self.auth_store.set_token(token.to_string()).await;
Ok(token) Ok(token)
} }
@@ -261,8 +268,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
match &auth { match &auth {
Some(token) => { Some(token) => {
let header_value = token.to_header_value().to_str().unwrap().parse().unwrap(); // ugh request.headers_mut().insert("Authorization", format!("Bearer: {}", token).parse().unwrap());
request.headers_mut().insert("Authorization", header_value);
} }
None => { None => {
log::warn!(target: "websocket", "Proceeding without auth token."); log::warn!(target: "websocket", "Proceeding without auth token.");
@@ -276,14 +282,14 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
log::debug!("Websocket connected: {:?}", response.status()); log::debug!("Websocket connected: {:?}", response.status());
Ok(WebsocketEventSocket::new(socket)) Ok(WebsocketEventSocket::new(socket))
} }
Err(e) => match e { Err(e) => match &e {
Error::ClientError(ce) => match ce.as_str() { Error::ClientError(ce) => match ce.as_str() {
"HTTP error: 401 Unauthorized" | "Unauthorized" => { "HTTP error: 401 Unauthorized" | "Unauthorized" => {
// Try to authenticate // Try to authenticate
if let Some(credentials) = &self.auth_store.get_credentials().await { if let Some(credentials) = &self.auth_store.get_credentials().await {
log::warn!("Websocket connection failed, attempting to authenticate"); log::warn!("Websocket connection failed, attempting to authenticate");
let new_token = self.authenticate(credentials.clone()).await?; let new_token = self.authenticate(credentials.clone()).await?;
self.auth_store.set_token(new_token).await; self.auth_store.set_token(new_token.to_string()).await;
// try again on the next attempt. // try again on the next attempt.
return Err(Error::Unauthorized); return Err(Error::Unauthorized);
@@ -292,7 +298,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
return Err(Error::ClientError("Unauthorized, no credentials provided".into())); return Err(Error::ClientError("Unauthorized, no credentials provided".into()));
} }
} }
_ => Err(Error::Unauthorized) _ => Err(e)
} }
_ => Err(e) _ => Err(e)
@@ -355,12 +361,12 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
let uri = self.uri_for_endpoint(endpoint, None); let uri = self.uri_for_endpoint(endpoint, None);
log::debug!("Requesting {:?} {:?}", method, uri); log::debug!("Requesting {:?} {:?}", method, uri);
let build_request = move |auth: &Option<JwtToken>| { let build_request = move |auth: &Option<String>| {
let body = body_fn(); let body = body_fn();
Request::builder() Request::builder()
.method(&method) .method(&method)
.uri(&uri) .uri(&uri)
.with_auth(auth) .with_auth_string(auth)
.body(body) .body(body)
.expect("Unable to build request") .expect("Unable to build request")
}; };
@@ -384,7 +390,7 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
log::debug!("Renewing token using credentials: u: {:?}", credentials.username); log::debug!("Renewing token using credentials: u: {:?}", credentials.username);
let new_token = self.authenticate(credentials.clone()).await?; let new_token = self.authenticate(credentials.clone()).await?;
let request = build_request(&Some(new_token)); let request = build_request(&Some(new_token.to_string()));
response = self.client.request(request).await?; response = self.client.request(request).await?;
} else { } else {
return Err(Error::ClientError("Unauthorized, no credentials provided".into())); return Err(Error::ClientError("Unauthorized, no credentials provided".into()));

View File

@@ -137,4 +137,8 @@ impl JwtToken {
pub fn to_header_value(&self) -> HeaderValue { pub fn to_header_value(&self) -> HeaderValue {
format!("Bearer {}", self.token).parse().unwrap() format!("Bearer {}", self.token).parse().unwrap()
} }
pub fn to_string(&self) -> String {
self.token.clone()
}
} }

View File

@@ -11,6 +11,9 @@
<!-- Conversations --> <!-- Conversations -->
<method name="GetConversations"> <method name="GetConversations">
<arg type="i" name="limit" direction="in"/>
<arg type="i" name="offset" direction="in"/>
<arg type="aa{sv}" direction="out" name="conversations"> <arg type="aa{sv}" direction="out" name="conversations">
<annotation name="org.freedesktop.DBus.DocString" <annotation name="org.freedesktop.DBus.DocString"
value="Array of dictionaries. Each dictionary has keys: value="Array of dictionaries. Each dictionary has keys:

View File

@@ -54,10 +54,10 @@ impl AuthenticationStore for DatabaseAuthenticationStore {
}).await }).await
} }
async fn get_token(&mut self) -> Option<JwtToken> { async fn get_token(&mut self) -> Option<String> {
self.database.lock().await self.database.lock().await
.with_settings(|settings| { .with_settings(|settings| {
match settings.get::<JwtToken>(SettingsKey::TOKEN) { match settings.get::<String>(SettingsKey::TOKEN) {
Ok(token) => token, Ok(token) => token,
Err(e) => { Err(e) => {
log::warn!("Failed to get token from settings: {}", e); log::warn!("Failed to get token from settings: {}", e);
@@ -67,7 +67,7 @@ impl AuthenticationStore for DatabaseAuthenticationStore {
}).await }).await
} }
async fn set_token(&mut self, token: JwtToken) { async fn set_token(&mut self, token: String) {
self.database.lock().await self.database.lock().await
.with_settings(|settings| settings.put(SettingsKey::TOKEN, &token)).await.unwrap_or_else(|e| { .with_settings(|settings| settings.put(SettingsKey::TOKEN, &token)).await.unwrap_or_else(|e| {
log::error!("Failed to set token: {}", e); log::error!("Failed to set token: {}", e);

View File

@@ -21,7 +21,10 @@ pub enum Event {
SyncConversation(String, Reply<()>), SyncConversation(String, Reply<()>),
/// Returns all known conversations from the database. /// Returns all known conversations from the database.
GetAllConversations(Reply<Vec<Conversation>>), /// Parameters:
/// - limit: The maximum number of conversations to return. (-1 for no limit)
/// - offset: The offset into the conversation list to start returning conversations from.
GetAllConversations(i32, i32, Reply<Vec<Conversation>>),
/// Returns all known settings from the database. /// Returns all known settings from the database.
GetAllSettings(Reply<Settings>), GetAllSettings(Reply<Settings>),

View File

@@ -175,8 +175,8 @@ impl Daemon {
reply.send(()).unwrap(); reply.send(()).unwrap();
}, },
Event::GetAllConversations(reply) => { Event::GetAllConversations(limit, offset, reply) => {
let conversations = self.get_conversations().await; let conversations = self.get_conversations_limit_offset(limit, offset).await;
reply.send(conversations).unwrap(); reply.send(conversations).unwrap();
}, },
@@ -226,7 +226,11 @@ impl Daemon {
} }
async fn get_conversations(&mut self) -> Vec<Conversation> { async fn get_conversations(&mut self) -> Vec<Conversation> {
self.database.lock().await.with_repository(|r| r.all_conversations().unwrap()).await self.database.lock().await.with_repository(|r| r.all_conversations(i32::MAX, 0).unwrap()).await
}
async fn get_conversations_limit_offset(&mut self, limit: i32, offset: i32) -> Vec<Conversation> {
self.database.lock().await.with_repository(|r| r.all_conversations(limit, offset).unwrap()).await
} }
async fn get_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Vec<Message> { async fn get_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Vec<Message> {

View File

@@ -17,14 +17,21 @@ pub struct Settings {
impl Settings { impl Settings {
pub fn from_db(db_settings: &mut DbSettings) -> Result<Self> { pub fn from_db(db_settings: &mut DbSettings) -> Result<Self> {
let server_url: Option<String> = db_settings.get(keys::SERVER_URL)?; let server_url = db_settings.get(keys::SERVER_URL)?;
let username: Option<String> = db_settings.get(keys::USERNAME)?; let username = db_settings.get(keys::USERNAME)?;
let token: Option<String> = db_settings.get(keys::TOKEN)?; let token = db_settings.get(keys::TOKEN)?;
Ok(Self {
// Create the settings struct with the results
let settings = Self {
server_url, server_url,
username, username,
token, token,
}) };
// Load bearing
log::debug!("Loaded settings: {:?}", settings);
Ok(settings)
} }
pub fn save(&self, db_settings: &mut DbSettings) -> Result<()> { pub fn save(&self, db_settings: &mut DbSettings) -> Result<()> {

View File

@@ -42,11 +42,13 @@ impl UpdateMonitor {
match update { match update {
UpdateEvent::ConversationChanged(conversation) => { UpdateEvent::ConversationChanged(conversation) => {
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation); log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
log::info!(target: target::UPDATES, "Syncing new messages for conversation id: {}", conversation.guid); if conversation.unread_count > 0 {
self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await log::info!(target: target::UPDATES, "Syncing new messages for conversation id: {}", conversation.guid);
.unwrap_or_else(|e| { self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await
log::error!("Failed to send daemon event: {}", e); .unwrap_or_else(|e| {
}); log::error!("Failed to send daemon event: {}", e);
});
}
} }
UpdateEvent::MessageReceived(conversation, message) => { UpdateEvent::MessageReceived(conversation, message) => {

View File

@@ -51,8 +51,8 @@ impl DbusRepository for ServerImpl {
self.send_event_sync(Event::GetVersion) self.send_event_sync(Event::GetVersion)
} }
fn get_conversations(&mut self) -> Result<Vec<arg::PropMap>, dbus::MethodErr> { fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
self.send_event_sync(Event::GetAllConversations) self.send_event_sync(|r| Event::GetAllConversations(limit, offset, r))
.map(|conversations| { .map(|conversations| {
conversations.into_iter().map(|conv| { conversations.into_iter().map(|conv| {
let mut map = arg::PropMap::new(); let mut map = arg::PropMap::new();

View File

@@ -111,7 +111,7 @@ impl DaemonCli {
} }
pub async fn print_conversations(&mut self) -> Result<()> { pub async fn print_conversations(&mut self) -> Result<()> {
let conversations = KordophoneRepository::get_conversations(&self.proxy())?; let conversations = KordophoneRepository::get_conversations(&self.proxy(), 100, 0)?;
println!("Number of conversations: {}", conversations.len()); println!("Number of conversations: {}", conversations.len());
for conversation in conversations { for conversation in conversations {

View File

@@ -111,7 +111,7 @@ impl DbClient {
pub async fn print_conversations(&mut self) -> Result<()> { pub async fn print_conversations(&mut self) -> Result<()> {
let all_conversations = self.database.with_repository(|repository| { let all_conversations = self.database.with_repository(|repository| {
repository.all_conversations() repository.all_conversations(i32::MAX, 0)
}).await?; }).await?;
println!("{} Conversations: ", all_conversations.len()); println!("{} Conversations: ", all_conversations.len());