SendMessage: plumb a guid through to uniquely identify a pending message
This commit is contained in:
@@ -1,9 +1,11 @@
|
|||||||
package net.buzzert.kordophone.backend.events
|
package net.buzzert.kordophone.backend.events
|
||||||
|
|
||||||
import net.buzzert.kordophone.backend.model.Conversation
|
import net.buzzert.kordophone.backend.model.Conversation
|
||||||
|
import net.buzzert.kordophone.backend.model.GUID
|
||||||
import net.buzzert.kordophone.backend.model.Message
|
import net.buzzert.kordophone.backend.model.Message
|
||||||
|
|
||||||
data class MessageDeliveredEvent(
|
data class MessageDeliveredEvent(
|
||||||
|
val guid: GUID,
|
||||||
val message: Message,
|
val message: Message,
|
||||||
val conversation: Conversation,
|
val conversation: Conversation,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ package net.buzzert.kordophone.backend.model
|
|||||||
import com.google.gson.annotations.SerializedName
|
import com.google.gson.annotations.SerializedName
|
||||||
import java.util.Date
|
import java.util.Date
|
||||||
|
|
||||||
|
typealias GUID = String
|
||||||
|
|
||||||
data class Conversation(
|
data class Conversation(
|
||||||
@SerializedName("date")
|
@SerializedName("date")
|
||||||
val date: Date,
|
val date: Date,
|
||||||
@@ -20,5 +22,5 @@ data class Conversation(
|
|||||||
val lastMessagePreview: String,
|
val lastMessagePreview: String,
|
||||||
|
|
||||||
@SerializedName("guid")
|
@SerializedName("guid")
|
||||||
val guid: String,
|
val guid: GUID,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ data class Message(
|
|||||||
val text: String,
|
val text: String,
|
||||||
|
|
||||||
@SerializedName("guid")
|
@SerializedName("guid")
|
||||||
val guid: String,
|
val guid: GUID,
|
||||||
|
|
||||||
@SerializedName("sender")
|
@SerializedName("sender")
|
||||||
val sender: String?, // optional: nil means "from me"
|
val sender: String?, // optional: nil means "from me"
|
||||||
|
|||||||
@@ -5,17 +5,25 @@ import kotlinx.coroutines.channels.Channel
|
|||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import net.buzzert.kordophone.backend.events.MessageDeliveredEvent
|
import net.buzzert.kordophone.backend.events.MessageDeliveredEvent
|
||||||
import net.buzzert.kordophone.backend.model.Conversation
|
import net.buzzert.kordophone.backend.model.Conversation
|
||||||
|
import net.buzzert.kordophone.backend.model.GUID
|
||||||
import net.buzzert.kordophone.backend.model.Message
|
import net.buzzert.kordophone.backend.model.Message
|
||||||
import java.net.URL
|
import java.net.URL
|
||||||
import java.util.Queue
|
import java.util.Queue
|
||||||
|
import java.util.UUID
|
||||||
import java.util.concurrent.ArrayBlockingQueue
|
import java.util.concurrent.ArrayBlockingQueue
|
||||||
|
|
||||||
const val REPO_LOG: String = "ChatRepository"
|
const val REPO_LOG: String = "ChatRepository"
|
||||||
|
|
||||||
class ChatRepository(private val apiInterface: APIInterface) {
|
class ChatRepository(private val apiInterface: APIInterface) {
|
||||||
public val messageDeliveredChannel = Channel<MessageDeliveredEvent>()
|
val messageDeliveredChannel = Channel<MessageDeliveredEvent>()
|
||||||
|
|
||||||
private val outgoingMessageQueue: ArrayBlockingQueue<Pair<Message, Conversation>> = ArrayBlockingQueue(16)
|
private data class OutgoingMessageInfo (
|
||||||
|
val message: Message,
|
||||||
|
val conversation: Conversation,
|
||||||
|
val guid: GUID,
|
||||||
|
)
|
||||||
|
|
||||||
|
private val outgoingMessageQueue: ArrayBlockingQueue<OutgoingMessageInfo> = ArrayBlockingQueue(16)
|
||||||
private var outgoingMessageThread: Thread? = null
|
private var outgoingMessageThread: Thread? = null
|
||||||
|
|
||||||
suspend fun getVersion(): String {
|
suspend fun getVersion(): String {
|
||||||
@@ -30,14 +38,18 @@ class ChatRepository(private val apiInterface: APIInterface) {
|
|||||||
return apiInterface.getMessages(conversation.guid).body()!!
|
return apiInterface.getMessages(conversation.guid).body()!!
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun enqueueOutgoingMessage(message: Message, conversation: Conversation) {
|
suspend fun enqueueOutgoingMessage(message: Message, conversation: Conversation): GUID {
|
||||||
Log.d(REPO_LOG, "Enqueuing outgoing message: $message")
|
val guid = UUID.randomUUID().toString()
|
||||||
outgoingMessageQueue.add(Pair(message, conversation))
|
|
||||||
|
Log.d(REPO_LOG, "Enqueuing outgoing message: $message ($guid)")
|
||||||
|
outgoingMessageQueue.add(OutgoingMessageInfo(message, conversation, guid))
|
||||||
|
|
||||||
if (outgoingMessageThread == null) {
|
if (outgoingMessageThread == null) {
|
||||||
outgoingMessageThread = Thread { outgoingMessageQueueMain() }
|
outgoingMessageThread = Thread { outgoingMessageQueueMain() }
|
||||||
outgoingMessageThread?.start()
|
outgoingMessageThread?.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return guid
|
||||||
}
|
}
|
||||||
|
|
||||||
// - private
|
// - private
|
||||||
@@ -47,7 +59,10 @@ class ChatRepository(private val apiInterface: APIInterface) {
|
|||||||
while (true) {
|
while (true) {
|
||||||
val outgoingMessageRequest = outgoingMessageQueue.poll()?.let {
|
val outgoingMessageRequest = outgoingMessageQueue.poll()?.let {
|
||||||
runBlocking {
|
runBlocking {
|
||||||
val (message, conversation) = it
|
val guid = it.guid
|
||||||
|
val message = it.message
|
||||||
|
val conversation = it.conversation
|
||||||
|
|
||||||
Log.d(REPO_LOG, "Sending message to $conversation: $message")
|
Log.d(REPO_LOG, "Sending message to $conversation: $message")
|
||||||
|
|
||||||
val result = apiInterface.sendMessage(
|
val result = apiInterface.sendMessage(
|
||||||
@@ -60,7 +75,7 @@ class ChatRepository(private val apiInterface: APIInterface) {
|
|||||||
|
|
||||||
if (result.isSuccessful) {
|
if (result.isSuccessful) {
|
||||||
Log.d(REPO_LOG, "Successfully sent message.")
|
Log.d(REPO_LOG, "Successfully sent message.")
|
||||||
messageDeliveredChannel.send(MessageDeliveredEvent(message, conversation))
|
messageDeliveredChannel.send(MessageDeliveredEvent(guid, message, conversation))
|
||||||
} else {
|
} else {
|
||||||
Log.e(REPO_LOG, "Error sending message. Enqueuing for retry.")
|
Log.e(REPO_LOG, "Error sending message. Enqueuing for retry.")
|
||||||
outgoingMessageQueue.add(it)
|
outgoingMessageQueue.add(it)
|
||||||
|
|||||||
@@ -78,9 +78,10 @@ class BackendTests {
|
|||||||
sender = null,
|
sender = null,
|
||||||
)
|
)
|
||||||
|
|
||||||
repository.enqueueOutgoingMessage(outgoingMessage, conversation)
|
val guid = repository.enqueueOutgoingMessage(outgoingMessage, conversation)
|
||||||
|
|
||||||
val event = repository.messageDeliveredChannel.receive()
|
val event = repository.messageDeliveredChannel.receive()
|
||||||
|
assertEquals(event.guid, guid)
|
||||||
assertEquals(event.message.text, outgoingMessage.text)
|
assertEquals(event.message.text, outgoingMessage.text)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user