Private
Public Access
1
0

new message: initial commit

This commit is contained in:
2026-04-01 15:29:37 -07:00
parent 45285892de
commit c2a697f2c1
23 changed files with 674 additions and 262 deletions

View File

@@ -24,7 +24,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use crate::{
model::{
Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage,
UpdateItem,
OutgoingMessageTarget, ResolveHandleResponse, SendMessageResponse, UpdateItem,
},
APIInterface,
};
@@ -312,16 +312,26 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
async fn send_message(
&mut self,
outgoing_message: &OutgoingMessage,
) -> Result<Message, Self::Error> {
let message: Message = self
) -> Result<SendMessageResponse, Self::Error> {
let message: SendMessageResponse = self
.deserialized_response_with_body("sendMessage", Method::POST, || {
serde_json::to_string(&outgoing_message).unwrap().into()
Self::send_message_request_body(outgoing_message)
})
.await?;
Ok(message)
}
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error> {
let endpoint = format!("resolveHandle?id={}", urlencoding::encode(handle_id));
let response: ResolveHandleResponse =
self.deserialized_response(&endpoint, Method::GET).await?;
Ok(response)
}
async fn fetch_attachment_data(
&mut self,
guid: &str,
@@ -394,8 +404,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
None => "updates".to_string(),
};
let uri = self
.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
let uri = self.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
loop {
log::debug!("Connecting to websocket: {:?}", uri);
@@ -426,18 +435,20 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
log::debug!("Websocket request: {:?}", request);
let mut should_retry = true; // retry once after authenticating.
let should_retry = true; // retry once after authenticating.
match connect_async(request).await.map_err(Error::from) {
Ok((socket, response)) => {
log::debug!("Websocket connected: {:?}", response.status());
break Ok(WebsocketEventSocket::new(socket))
break Ok(WebsocketEventSocket::new(socket));
}
Err(e) => match &e {
Error::ClientError(ce) => match ce.as_str() {
"HTTP error: 401 Unauthorized" | "Unauthorized" => {
// Try to authenticate
if let Some(credentials) = &self.auth_store.get_credentials().await {
log::warn!("Websocket connection failed, attempting to authenticate");
log::warn!(
"Websocket connection failed, attempting to authenticate"
);
let new_token = self.authenticate(credentials.clone()).await?;
self.auth_store.set_token(new_token.to_string()).await;
@@ -473,16 +484,44 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
.build();
let client = Client::builder().build::<_, Body>(https);
HTTPAPIClient { base_url, auth_store, client }
HTTPAPIClient {
base_url,
auth_store,
client,
}
}
fn send_message_request_body(outgoing_message: &OutgoingMessage) -> Body {
#[derive(Serialize)]
struct SendMessageRequest<'a> {
#[serde(rename = "body")]
text: &'a str,
#[serde(rename = "guid", skip_serializing_if = "Option::is_none")]
conversation_id: Option<&'a ConversationID>,
#[serde(rename = "handleIDs", skip_serializing_if = "Option::is_none")]
handle_ids: Option<&'a [String]>,
#[serde(rename = "fileTransferGUIDs")]
file_transfer_guids: &'a Vec<String>,
}
let (conversation_id, handle_ids) = match &outgoing_message.target {
OutgoingMessageTarget::Conversation(conversation_id) => (Some(conversation_id), None),
OutgoingMessageTarget::Handles(handle_ids) => (None, Some(handle_ids.as_slice())),
};
serde_json::to_string(&SendMessageRequest {
text: &outgoing_message.text,
conversation_id,
handle_ids,
file_transfer_guids: &outgoing_message.file_transfer_guids,
})
.unwrap()
.into()
}
fn uri_for_endpoint(&self, endpoint: &str, scheme: Option<&str>) -> Result<Uri, Error> {
let mut parts = self.base_url.clone().into_parts();
let root_path: PathBuf = parts
.path_and_query
.ok_or(Error::URLError)?
.path()
.into();
let root_path: PathBuf = parts.path_and_query.ok_or(Error::URLError)?.path().into();
let path = root_path.join(endpoint);
let path_str = path.to_str().ok_or(Error::URLError)?;

View File

@@ -1,4 +1,7 @@
pub use crate::model::{Conversation, ConversationID, Message, MessageID, OutgoingMessage};
pub use crate::model::{
Conversation, ConversationID, Message, MessageID, OutgoingMessage, ResolveHandleResponse,
SendMessageResponse,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -42,7 +45,13 @@ pub trait APIInterface {
async fn send_message(
&mut self,
outgoing_message: &OutgoingMessage,
) -> Result<Message, Self::Error>;
) -> Result<SendMessageResponse, Self::Error>;
// (GET) /resolveHandle
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error>;
// (GET) /attachment
async fn fetch_attachment_data(

View File

@@ -0,0 +1,28 @@
use serde::{Deserialize, Serialize};
use super::conversation::ConversationID;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolvedHandle {
pub id: String,
pub name: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum HandleResolutionStatus {
Valid,
Invalid,
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResolveHandleResponse {
#[serde(rename = "resolvedHandle")]
pub resolved_handle: ResolvedHandle,
pub status: HandleResolutionStatus,
#[serde(rename = "existingChat")]
pub existing_chat: Option<ConversationID>,
}

View File

@@ -1,7 +1,9 @@
pub mod conversation;
pub mod event;
pub mod handle;
pub mod message;
pub mod outgoing_message;
pub mod send_message_response;
pub mod update;
pub use conversation::Conversation;
@@ -10,8 +12,15 @@ pub use conversation::ConversationID;
pub use message::Message;
pub use message::MessageID;
pub use handle::HandleResolutionStatus;
pub use handle::ResolveHandleResponse;
pub use handle::ResolvedHandle;
pub use outgoing_message::OutgoingMessage;
pub use outgoing_message::OutgoingMessageBuilder;
pub use outgoing_message::OutgoingMessageTarget;
pub use send_message_response::SendMessageResponse;
pub use update::UpdateItem;

View File

@@ -1,23 +1,23 @@
use super::conversation::ConversationID;
use chrono::NaiveDateTime;
use serde::Serialize;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OutgoingMessageTarget {
Conversation(ConversationID),
Handles(Vec<String>),
}
#[derive(Debug, Clone)]
pub struct OutgoingMessage {
#[serde(skip)]
pub guid: Uuid,
#[serde(skip)]
pub date: NaiveDateTime,
#[serde(rename = "body")]
pub text: String,
#[serde(rename = "guid")]
pub conversation_id: ConversationID,
pub target: OutgoingMessageTarget,
#[serde(rename = "fileTransferGUIDs")]
pub file_transfer_guids: Vec<String>,
}
@@ -25,13 +25,27 @@ impl OutgoingMessage {
pub fn builder() -> OutgoingMessageBuilder {
OutgoingMessageBuilder::new()
}
pub fn conversation_id(&self) -> Option<&ConversationID> {
match &self.target {
OutgoingMessageTarget::Conversation(conversation_id) => Some(conversation_id),
OutgoingMessageTarget::Handles(_) => None,
}
}
pub fn handle_ids(&self) -> Option<&[String]> {
match &self.target {
OutgoingMessageTarget::Conversation(_) => None,
OutgoingMessageTarget::Handles(handle_ids) => Some(handle_ids.as_slice()),
}
}
}
#[derive(Default)]
pub struct OutgoingMessageBuilder {
guid: Option<Uuid>,
text: Option<String>,
conversation_id: Option<ConversationID>,
target: Option<OutgoingMessageTarget>,
file_transfer_guids: Option<Vec<String>>,
}
@@ -50,8 +64,18 @@ impl OutgoingMessageBuilder {
self
}
pub fn target(mut self, target: OutgoingMessageTarget) -> Self {
self.target = Some(target);
self
}
pub fn conversation_id(mut self, conversation_id: ConversationID) -> Self {
self.conversation_id = Some(conversation_id);
self.target = Some(OutgoingMessageTarget::Conversation(conversation_id));
self
}
pub fn handle_ids(mut self, handle_ids: Vec<String>) -> Self {
self.target = Some(OutgoingMessageTarget::Handles(handle_ids));
self
}
@@ -64,7 +88,7 @@ impl OutgoingMessageBuilder {
OutgoingMessage {
guid: self.guid.unwrap_or_else(Uuid::new_v4),
text: self.text.unwrap(),
conversation_id: self.conversation_id.unwrap(),
target: self.target.unwrap(),
file_transfer_guids: self.file_transfer_guids.unwrap_or_default(),
date: chrono::Utc::now().naive_utc(),
}

View File

@@ -0,0 +1,12 @@
use serde::Deserialize;
use super::{conversation::ConversationID, message::Message};
#[derive(Debug, Clone, Deserialize)]
pub struct SendMessageResponse {
#[serde(flatten)]
pub message: Message,
#[serde(rename = "conversationGUID")]
pub conversation_id: Option<ConversationID>,
}

View File

@@ -3,7 +3,7 @@ use self::test_client::TestClient;
use crate::APIInterface;
pub mod api_interface {
use crate::model::Conversation;
use crate::model::{Conversation, HandleResolutionStatus, OutgoingMessage};
use super::*;
@@ -28,4 +28,28 @@ pub mod api_interface {
assert_eq!(conversations.len(), 1);
assert_eq!(conversations[0].display_name, test_convo.display_name);
}
#[tokio::test]
async fn test_resolve_handle() {
let mut client = TestClient::new();
let resolved = client.resolve_handle("user@example.com").await.unwrap();
assert_eq!(resolved.resolved_handle.id, "user@example.com");
assert_eq!(resolved.status, HandleResolutionStatus::Valid);
assert_eq!(resolved.existing_chat, None);
}
#[tokio::test]
async fn test_send_message_with_handles() {
let mut client = TestClient::new();
let outgoing_message = OutgoingMessage::builder()
.text("hello".to_string())
.handle_ids(vec!["user@example.com".to_string()])
.build();
let sent = client.send_message(&outgoing_message).await.unwrap();
assert_eq!(sent.message.text, "hello");
assert_eq!(sent.conversation_id, None);
}
}

View File

@@ -9,14 +9,17 @@ use crate::{
api::event_socket::{EventSocket, SinkMessage, SocketEvent, SocketUpdate},
api::http_client::Credentials,
model::{
Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage,
UpdateItem,
Conversation, ConversationID, Event, HandleResolutionStatus, JwtToken, Message, MessageID,
OutgoingMessage, OutgoingMessageTarget, ResolveHandleResponse, ResolvedHandle,
SendMessageResponse,
},
};
use bytes::Bytes;
use futures_util::sink::drain;
use futures_util::stream::BoxStream;
use futures_util::Sink;
use futures_util::SinkExt;
use futures_util::StreamExt;
pub struct TestClient {
@@ -63,13 +66,18 @@ impl EventSocket for TestEventSocket {
impl Sink<SinkMessage, Error = Self::Error>,
) {
(
futures_util::stream::iter(self.events.into_iter().map(Ok)).boxed(),
futures_util::sink::sink(),
futures_util::stream::iter(
self.events
.into_iter()
.map(|event| Ok(SocketEvent::Update(event))),
)
.boxed(),
drain().sink_map_err(|err| match err {}),
)
}
async fn raw_updates(self) -> Self::UpdateStream {
let results: Vec<Result<Vec<UpdateItem>, TestError>> = vec![];
let results: Vec<Result<SocketUpdate, TestError>> = vec![];
futures_util::stream::iter(results.into_iter()).boxed()
}
}
@@ -94,9 +102,9 @@ impl APIInterface for TestClient {
async fn get_messages(
&mut self,
conversation_id: &ConversationID,
limit: Option<u32>,
before: Option<MessageID>,
after: Option<MessageID>,
_limit: Option<u32>,
_before: Option<MessageID>,
_after: Option<MessageID>,
) -> Result<Vec<Message>, Self::Error> {
if let Some(messages) = self.messages.get(conversation_id) {
return Ok(messages.clone());
@@ -108,18 +116,42 @@ impl APIInterface for TestClient {
async fn send_message(
&mut self,
outgoing_message: &OutgoingMessage,
) -> Result<Message, Self::Error> {
) -> Result<SendMessageResponse, Self::Error> {
let message = Message::builder()
.guid(Uuid::new_v4().to_string())
.text(outgoing_message.text.clone())
.date(OffsetDateTime::now_utc())
.build();
self.messages
.entry(outgoing_message.conversation_id.clone())
.or_insert(vec![])
.push(message.clone());
Ok(message)
let conversation_id = match &outgoing_message.target {
OutgoingMessageTarget::Conversation(conversation_id) => {
self.messages
.entry(conversation_id.clone())
.or_insert(vec![])
.push(message.clone());
None
}
OutgoingMessageTarget::Handles(_) => None,
};
Ok(SendMessageResponse {
message,
conversation_id,
})
}
async fn resolve_handle(
&mut self,
handle_id: &str,
) -> Result<ResolveHandleResponse, Self::Error> {
Ok(ResolveHandleResponse {
resolved_handle: ResolvedHandle {
id: handle_id.to_string(),
name: None,
},
status: HandleResolutionStatus::Valid,
existing_chat: None,
})
}
async fn open_event_socket(
@@ -131,17 +163,17 @@ impl APIInterface for TestClient {
async fn fetch_attachment_data(
&mut self,
guid: &str,
preview: bool,
_guid: &str,
_preview: bool,
) -> Result<Self::ResponseStream, Self::Error> {
Ok(futures_util::stream::iter(vec![Ok(Bytes::from_static(b"test"))]).boxed())
}
async fn upload_attachment<R>(
&mut self,
data: tokio::io::BufReader<R>,
filename: &str,
size: u64,
_data: tokio::io::BufReader<R>,
_filename: &str,
_size: u64,
) -> Result<String, Self::Error>
where
R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
@@ -151,7 +183,7 @@ impl APIInterface for TestClient {
async fn mark_conversation_as_read(
&mut self,
conversation_id: &ConversationID,
_conversation_id: &ConversationID,
) -> Result<(), Self::Error> {
Ok(())
}