core: HTTPClient: event stream should just automatically retry after auth token
This commit is contained in:
@@ -397,62 +397,64 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
|
|||||||
let uri = self
|
let uri = self
|
||||||
.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
|
.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()))?;
|
||||||
|
|
||||||
log::debug!("Connecting to websocket: {:?}", uri);
|
loop {
|
||||||
|
log::debug!("Connecting to websocket: {:?}", uri);
|
||||||
|
|
||||||
let auth = self.auth_store.get_token().await;
|
let auth = self.auth_store.get_token().await;
|
||||||
let host = uri.authority().unwrap().host();
|
let host = uri.authority().unwrap().host();
|
||||||
let mut request = TungsteniteRequest::builder()
|
let mut request = TungsteniteRequest::builder()
|
||||||
.header("Host", host)
|
.header("Host", host)
|
||||||
.header("Connection", "Upgrade")
|
.header("Connection", "Upgrade")
|
||||||
.header("Upgrade", "websocket")
|
.header("Upgrade", "websocket")
|
||||||
.header("Sec-WebSocket-Version", "13")
|
.header("Sec-WebSocket-Version", "13")
|
||||||
.header("Sec-WebSocket-Key", generate_key())
|
.header("Sec-WebSocket-Key", generate_key())
|
||||||
.uri(uri.to_string())
|
.uri(uri.to_string())
|
||||||
.body(())
|
.body(())
|
||||||
.expect("Unable to build websocket request");
|
.expect("Unable to build websocket request");
|
||||||
|
|
||||||
match &auth {
|
match &auth {
|
||||||
Some(token) => {
|
Some(token) => {
|
||||||
request.headers_mut().insert(
|
request.headers_mut().insert(
|
||||||
"Authorization",
|
"Authorization",
|
||||||
format!("Bearer: {}", token).parse().unwrap(),
|
format!("Bearer: {}", token).parse().unwrap(),
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
log::warn!(target: "websocket", "Proceeding without auth token.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
None => {
|
|
||||||
log::warn!(target: "websocket", "Proceeding without auth token.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("Websocket request: {:?}", request);
|
log::debug!("Websocket request: {:?}", request);
|
||||||
|
|
||||||
match connect_async(request).await.map_err(Error::from) {
|
match connect_async(request).await.map_err(Error::from) {
|
||||||
Ok((socket, response)) => {
|
Ok((socket, response)) => {
|
||||||
log::debug!("Websocket connected: {:?}", response.status());
|
log::debug!("Websocket connected: {:?}", response.status());
|
||||||
Ok(WebsocketEventSocket::new(socket))
|
break Ok(WebsocketEventSocket::new(socket))
|
||||||
}
|
}
|
||||||
Err(e) => match &e {
|
Err(e) => match &e {
|
||||||
Error::ClientError(ce) => match ce.as_str() {
|
Error::ClientError(ce) => match ce.as_str() {
|
||||||
"HTTP error: 401 Unauthorized" | "Unauthorized" => {
|
"HTTP error: 401 Unauthorized" | "Unauthorized" => {
|
||||||
// Try to authenticate
|
// Try to authenticate
|
||||||
if let Some(credentials) = &self.auth_store.get_credentials().await {
|
if let Some(credentials) = &self.auth_store.get_credentials().await {
|
||||||
log::warn!("Websocket connection failed, attempting to authenticate");
|
log::warn!("Websocket connection failed, attempting to authenticate");
|
||||||
let new_token = self.authenticate(credentials.clone()).await?;
|
let new_token = self.authenticate(credentials.clone()).await?;
|
||||||
self.auth_store.set_token(new_token.to_string()).await;
|
self.auth_store.set_token(new_token.to_string()).await;
|
||||||
|
|
||||||
// try again on the next attempt.
|
// try again on the next attempt.
|
||||||
return Err(Error::Unauthorized);
|
continue;
|
||||||
} else {
|
} else {
|
||||||
log::error!("Websocket unauthorized, no credentials provided");
|
log::error!("Websocket unauthorized, no credentials provided");
|
||||||
return Err(Error::ClientError(
|
break Err(Error::ClientError(
|
||||||
"Unauthorized, no credentials provided".into(),
|
"Unauthorized, no credentials provided".into(),
|
||||||
));
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
_ => break Err(e),
|
||||||
_ => Err(e),
|
},
|
||||||
},
|
|
||||||
|
|
||||||
_ => Err(e),
|
_ => break Err(e),
|
||||||
},
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user