Private
Public Access
1
0

Better error handling

This commit is contained in:
2023-12-10 19:37:53 -08:00
parent a8886279c6
commit 7db8c39042
6 changed files with 134 additions and 49 deletions

View File

@@ -6,6 +6,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
@@ -20,6 +21,7 @@ import net.buzzert.kordophone.backend.model.Conversation
import net.buzzert.kordophone.backend.model.GUID
import net.buzzert.kordophone.backend.model.Message
import net.buzzert.kordophone.backend.model.UpdateItem
import java.lang.Error
import java.net.URL
import java.util.Queue
import java.util.UUID
@@ -33,6 +35,16 @@ class ChatRepository(
private val apiClient: APIClient,
private val database: CachedChatDatabase,
) {
sealed class Error {
open val title: String = "Error"
open val description: String = "Generic Error"
data class ConnectionError(val exception: java.lang.Exception): Error() {
override val title: String = "Connection Error"
override val description: String = exception.message ?: "???"
}
}
// All (Cached) Conversations
val conversations: List<Conversation>
get() = database.fetchConversations()
@@ -46,6 +58,10 @@ class ChatRepository(
get() = database.conversationChanges
.onEach { Log.d(REPO_LOG, "Got database conversations changed") }
// Errors channel
val errorEncounteredChannel: SharedFlow<Error>
get() = _errorEncounteredChannel
fun messagesChanged(conversation: Conversation): Flow<List<Message>> =
database.messagesChanged(conversation)
@@ -72,6 +88,7 @@ class ChatRepository(
private val outgoingMessageQueue: ArrayBlockingQueue<OutgoingMessageInfo> = ArrayBlockingQueue(16)
private var outgoingMessageThread: Thread? = null
private val _messageDeliveredChannel = MutableSharedFlow<MessageDeliveredEvent>()
private val _errorEncounteredChannel = MutableSharedFlow<Error>()
private val updateMonitor = UpdateMonitor(apiClient)
private var updateWatchJob: Job? = null
@@ -126,7 +143,7 @@ class ChatRepository(
return database.fetchMessages(conversation)
}
suspend fun synchronize() {
suspend fun synchronize() = try {
Log.d(REPO_LOG, "Synchronizing conversations")
// Sync conversations
@@ -139,13 +156,17 @@ class ChatRepository(
for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) {
synchronizeConversation(conversation)
}
} catch (e: java.lang.Exception) {
_errorEncounteredChannel.emit(Error.ConnectionError(e))
}
suspend fun synchronizeConversation(conversation: Conversation) {
suspend fun synchronizeConversation(conversation: Conversation) = try {
// TODO: Should only fetch messages after the last GUID we know about.
// But keep in mind that outgoing message GUIDs are fake...
val messages = fetchMessages(conversation, limit = 15)
database.writeMessages(messages, conversation)
} catch (e: java.lang.Exception) {
_errorEncounteredChannel.emit(Error.ConnectionError(e))
}
fun close() {
@@ -187,6 +208,11 @@ class ChatRepository(
database.writeMessages(listOf(event.message), event.conversation, outgoing = true)
}
private suspend fun retryMessageSend(info: OutgoingMessageInfo) {
delay(5000L)
outgoingMessageQueue.add(info)
}
private fun outgoingMessageQueueMain() {
Log.d(REPO_LOG, "Outgoing Message Queue Main")
while (true) {
@@ -198,36 +224,41 @@ class ChatRepository(
Log.d(REPO_LOG, "Sending message to $conversation: $outgoingMessage")
val result = apiInterface.sendMessage(
SendMessageRequest(
conversationGUID = conversation.guid,
body = outgoingMessage.text,
transferGUIDs = null,
)
)
if (result.isSuccessful) {
val messageGuid = result.body()?.sentMessageGUID ?: outgoingMessage.guid
Log.d(REPO_LOG, "Successfully sent message: $messageGuid")
val newMessage = Message(
guid = messageGuid,
text = outgoingMessage.text,
sender = null,
conversation = it.conversation,
date = outgoingMessage.date
)
_messageDeliveredChannel.emit(
MessageDeliveredEvent(
newMessage,
conversation,
requestGuid
try {
val result = apiInterface.sendMessage(
SendMessageRequest(
conversationGUID = conversation.guid,
body = outgoingMessage.text,
transferGUIDs = null,
)
)
} else {
Log.e(REPO_LOG, "Error sending message. Enqueuing for retry.")
outgoingMessageQueue.add(it)
if (result.isSuccessful) {
val messageGuid = result.body()?.sentMessageGUID ?: outgoingMessage.guid
Log.d(REPO_LOG, "Successfully sent message: $messageGuid")
val newMessage = Message(
guid = messageGuid,
text = outgoingMessage.text,
sender = null,
conversation = it.conversation,
date = outgoingMessage.date
)
_messageDeliveredChannel.emit(
MessageDeliveredEvent(
newMessage,
conversation,
requestGuid
)
)
} else {
Log.e(REPO_LOG, "Error sending message. Enqueuing for retry.")
retryMessageSend(it)
}
} catch (e: java.lang.Exception) {
Log.e(REPO_LOG, "Error sending message: (${e.message}). Enqueuing for retry in 5 sec.")
retryMessageSend(it)
}
}
}

View File

@@ -3,8 +3,12 @@ package net.buzzert.kordophone.backend.server
import android.util.Log
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import net.buzzert.kordophone.backend.model.Conversation
import net.buzzert.kordophone.backend.model.Message
@@ -15,6 +19,7 @@ import okhttp3.WebSocketListener
import okio.ByteString
import retrofit2.converter.gson.GsonConverterFactory
import java.lang.reflect.Type
import kotlin.time.Duration
const val UPMON_LOG: String = "UpdateMonitor"
@@ -30,6 +35,7 @@ class UpdateMonitor(private val client: APIClient) : WebSocketListener() {
private val gson: Gson = Gson()
private val updateItemsType: Type = object : TypeToken<ArrayList<UpdateItem>>() {}.type
private var webSocket: WebSocket? = null
private var needsSocketReconnect: Boolean = false
private val _conversationChanged: MutableSharedFlow<Conversation> = MutableSharedFlow()
private val _messageAdded: MutableSharedFlow<Message> = MutableSharedFlow()
@@ -62,6 +68,22 @@ class UpdateMonitor(private val client: APIClient) : WebSocketListener() {
}
}
@OptIn(DelicateCoroutinesApi::class)
private fun setNeedsSocketReconnect() {
if (!needsSocketReconnect) {
needsSocketReconnect = true
GlobalScope.launch {
needsSocketReconnect = false
// Delay 5 seconds
delay(5000L)
beginMonitoringUpdates()
}
}
}
// <WebSocketListener>
override fun onOpen(webSocket: WebSocket, response: Response) {
@@ -72,11 +94,13 @@ class UpdateMonitor(private val client: APIClient) : WebSocketListener() {
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
super.onClosed(webSocket, code, reason)
Log.d(UPMON_LOG, "Update monitor socket closed")
setNeedsSocketReconnect()
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
super.onFailure(webSocket, t, response)
Log.d(UPMON_LOG, "Update monitor socket failure: ${t.message} :: Response: ${response?.body()}")
Log.d(UPMON_LOG, "Update monitor socket failure: ${t.message} :: Response: ${response?.body()}. Reconnecting in 5 seconds.")
setNeedsSocketReconnect()
}
override fun onMessage(webSocket: WebSocket, text: String) {