cargo fmt
This commit is contained in:
@@ -26,7 +26,12 @@ pub trait EventSocket {
|
||||
type UpdateStream: Stream<Item = Result<SocketUpdate, Self::Error>>;
|
||||
|
||||
/// Modern event pipeline
|
||||
async fn events(self) -> (Self::EventStream, impl Sink<SinkMessage, Error = Self::Error>);
|
||||
async fn events(
|
||||
self,
|
||||
) -> (
|
||||
Self::EventStream,
|
||||
impl Sink<SinkMessage, Error = Self::Error>,
|
||||
);
|
||||
|
||||
/// Raw update items from the v1 API.
|
||||
async fn raw_updates(self) -> Self::UpdateStream;
|
||||
|
||||
@@ -115,19 +115,26 @@ impl<B> AuthSetting for hyper::http::Request<B> {
|
||||
}
|
||||
}
|
||||
|
||||
type WebsocketSink = futures_util::stream::SplitSink<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, tungstenite::Message>;
|
||||
type WebsocketStream = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
|
||||
type WebsocketSink = futures_util::stream::SplitSink<
|
||||
WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
|
||||
tungstenite::Message,
|
||||
>;
|
||||
type WebsocketStream =
|
||||
futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
|
||||
|
||||
pub struct WebsocketEventSocket {
|
||||
sink: Option<WebsocketSink>,
|
||||
stream: WebsocketStream
|
||||
stream: WebsocketStream,
|
||||
}
|
||||
|
||||
impl WebsocketEventSocket {
|
||||
pub fn new(socket: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
|
||||
let (sink, stream) = socket.split();
|
||||
|
||||
Self { sink: Some(sink), stream }
|
||||
Self {
|
||||
sink: Some(sink),
|
||||
stream,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,12 +154,10 @@ impl WebsocketEventSocket {
|
||||
}
|
||||
}
|
||||
tungstenite::Message::Ping(_) => {
|
||||
// We don't expect the server to send us pings.
|
||||
// We don't expect the server to send us pings.
|
||||
Ok(None)
|
||||
}
|
||||
tungstenite::Message::Pong(_) => {
|
||||
Ok(Some(SocketUpdate::Pong))
|
||||
}
|
||||
tungstenite::Message::Pong(_) => Ok(Some(SocketUpdate::Pong)),
|
||||
tungstenite::Message::Close(_) => {
|
||||
// Connection was closed cleanly
|
||||
Err(Error::ClientError("WebSocket connection closed".into()))
|
||||
@@ -169,33 +174,40 @@ impl EventSocket for WebsocketEventSocket {
|
||||
type EventStream = BoxStream<'static, Result<SocketEvent, Error>>;
|
||||
type UpdateStream = BoxStream<'static, Result<SocketUpdate, Error>>;
|
||||
|
||||
async fn events(mut self) -> (Self::EventStream, impl Sink<SinkMessage, Error = Self::Error>) {
|
||||
async fn events(
|
||||
mut self,
|
||||
) -> (
|
||||
Self::EventStream,
|
||||
impl Sink<SinkMessage, Error = Self::Error>,
|
||||
) {
|
||||
use futures_util::stream::iter;
|
||||
|
||||
let sink = self.sink.take().unwrap().with(|f| {
|
||||
match f {
|
||||
SinkMessage::Ping => futures_util::future::ready(Ok::<tungstenite::Message, Error>(tungstenite::Message::Ping(Bytes::new())))
|
||||
}
|
||||
let sink = self.sink.take().unwrap().with(|f| match f {
|
||||
SinkMessage::Ping => futures_util::future::ready(Ok::<tungstenite::Message, Error>(
|
||||
tungstenite::Message::Ping(Bytes::new()),
|
||||
)),
|
||||
});
|
||||
|
||||
let stream = self.raw_update_stream()
|
||||
.map_ok(|updates| -> BoxStream<'static, Result<SocketEvent, Error>> {
|
||||
match updates {
|
||||
SocketUpdate::Update(updates) => {
|
||||
let iter_stream = iter(
|
||||
updates.into_iter().map(|u| Ok(SocketEvent::Update(Event::from(u))))
|
||||
);
|
||||
iter_stream.boxed()
|
||||
let stream = self
|
||||
.raw_update_stream()
|
||||
.map_ok(
|
||||
|updates| -> BoxStream<'static, Result<SocketEvent, Error>> {
|
||||
match updates {
|
||||
SocketUpdate::Update(updates) => {
|
||||
let iter_stream = iter(
|
||||
updates
|
||||
.into_iter()
|
||||
.map(|u| Ok(SocketEvent::Update(Event::from(u)))),
|
||||
);
|
||||
iter_stream.boxed()
|
||||
}
|
||||
SocketUpdate::Pong => iter(std::iter::once(Ok(SocketEvent::Pong))).boxed(),
|
||||
}
|
||||
SocketUpdate::Pong => {
|
||||
iter(std::iter::once(Ok(SocketEvent::Pong))).boxed()
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
.try_flatten()
|
||||
.boxed();
|
||||
|
||||
|
||||
(stream, sink)
|
||||
}
|
||||
|
||||
@@ -212,9 +224,7 @@ impl Stream for ResponseStream {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.body
|
||||
.poll_next_unpin(cx)
|
||||
.map_err(Error::HTTPError)
|
||||
self.body.poll_next_unpin(cx).map_err(Error::HTTPError)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,10 +338,10 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
|
||||
guid: String,
|
||||
}
|
||||
|
||||
// TODO: We can still use Body::wrap_stream here, but we need to make sure to plumb the CONTENT_LENGTH header,
|
||||
// otherwise CocoaHTTPServer will crash because of a bug.
|
||||
//
|
||||
// See ff03e73758f30c081a9319a8c04025cba69b8393 for what this was like before.
|
||||
// TODO: We can still use Body::wrap_stream here, but we need to make sure to plumb the CONTENT_LENGTH header,
|
||||
// otherwise CocoaHTTPServer will crash because of a bug.
|
||||
//
|
||||
// See ff03e73758f30c081a9319a8c04025cba69b8393 for what this was like before.
|
||||
let mut bytes = Vec::new();
|
||||
data.read_to_end(&mut bytes)
|
||||
.await
|
||||
@@ -578,7 +588,11 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
|
||||
_ => {
|
||||
let status = response.status();
|
||||
let body_str = hyper::body::to_bytes(response.into_body()).await?;
|
||||
let message = format!("Request failed ({:}). Response body: {:?}", status, String::from_utf8_lossy(&body_str));
|
||||
let message = format!(
|
||||
"Request failed ({:}). Response body: {:?}",
|
||||
status,
|
||||
String::from_utf8_lossy(&body_str)
|
||||
);
|
||||
return Err(Error::ClientError(message));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,8 +2,7 @@ use super::conversation::Conversation;
|
||||
use super::message::Message;
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[derive(Default)]
|
||||
#[derive(Debug, Clone, Deserialize, Default)]
|
||||
pub struct UpdateItem {
|
||||
#[serde(rename = "messageSequenceNumber")]
|
||||
pub seq: u64,
|
||||
@@ -17,4 +16,3 @@ pub struct UpdateItem {
|
||||
#[serde(default)]
|
||||
pub pong: bool,
|
||||
}
|
||||
|
||||
|
||||
@@ -56,7 +56,12 @@ impl EventSocket for TestEventSocket {
|
||||
type EventStream = BoxStream<'static, Result<SocketEvent, TestError>>;
|
||||
type UpdateStream = BoxStream<'static, Result<SocketUpdate, TestError>>;
|
||||
|
||||
async fn events(self) -> (Self::EventStream, impl Sink<SinkMessage, Error = Self::Error>) {
|
||||
async fn events(
|
||||
self,
|
||||
) -> (
|
||||
Self::EventStream,
|
||||
impl Sink<SinkMessage, Error = Self::Error>,
|
||||
) {
|
||||
(
|
||||
futures_util::stream::iter(self.events.into_iter().map(Ok)).boxed(),
|
||||
futures_util::sink::sink(),
|
||||
|
||||
Reference in New Issue
Block a user