Implements /sendMessage using a message queue
This commit is contained in:
@@ -1,2 +0,0 @@
|
||||
package net.buzzert.kordophone.backend
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
package net.buzzert.kordophone.backend.events
|
||||
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
|
||||
data class MessageDeliveredEvent(
|
||||
val message: Message,
|
||||
val conversation: Conversation,
|
||||
)
|
||||
@@ -1,13 +1,27 @@
|
||||
package net.buzzert.kordophone.backend.server
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import okhttp3.ResponseBody
|
||||
import retrofit2.Call
|
||||
import retrofit2.Response
|
||||
import retrofit2.http.Body
|
||||
import retrofit2.http.GET
|
||||
import retrofit2.http.POST
|
||||
import retrofit2.http.Query
|
||||
|
||||
data class SendMessageRequest(
|
||||
@SerializedName("guid")
|
||||
val conversationGUID: String,
|
||||
|
||||
@SerializedName("body")
|
||||
val body: String,
|
||||
|
||||
@SerializedName("fileTransferGUIDs")
|
||||
val transferGUIDs: List<String>?,
|
||||
)
|
||||
|
||||
interface APIInterface {
|
||||
@GET("/version")
|
||||
suspend fun getVersion(): ResponseBody
|
||||
@@ -17,4 +31,7 @@ interface APIInterface {
|
||||
|
||||
@GET("/messages")
|
||||
suspend fun getMessages(@Query("guid") conversationGUID: String): Response<List<Message>>
|
||||
|
||||
@POST("/sendMessage")
|
||||
suspend fun sendMessage(@Body request: SendMessageRequest): Response<Void>
|
||||
}
|
||||
@@ -1,14 +1,22 @@
|
||||
package net.buzzert.kordophone.backend.server
|
||||
|
||||
import android.util.Log
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import net.buzzert.kordophone.backend.events.MessageDeliveredEvent
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import java.net.URL
|
||||
import java.util.Queue
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
|
||||
class ChatRepository(apiInterface: APIInterface) {
|
||||
// private val client: APIClient = APIClient(baseURL)
|
||||
// private val apiInterface: APIInterface = client.getClient().create(APIInterface::class.java)
|
||||
const val REPO_LOG: String = "ChatRepository"
|
||||
|
||||
private val apiInterface: APIInterface = apiInterface
|
||||
class ChatRepository(private val apiInterface: APIInterface) {
|
||||
public val messageDeliveredChannel = Channel<MessageDeliveredEvent>()
|
||||
|
||||
private val outgoingMessageQueue: ArrayBlockingQueue<Pair<Message, Conversation>> = ArrayBlockingQueue(16)
|
||||
private var outgoingMessageThread: Thread? = null
|
||||
|
||||
suspend fun getVersion(): String {
|
||||
return apiInterface.getVersion().string()
|
||||
@@ -21,4 +29,44 @@ class ChatRepository(apiInterface: APIInterface) {
|
||||
suspend fun fetchMessages(conversation: Conversation): List<Message> {
|
||||
return apiInterface.getMessages(conversation.guid).body()!!
|
||||
}
|
||||
|
||||
suspend fun enqueueOutgoingMessage(message: Message, conversation: Conversation) {
|
||||
Log.d(REPO_LOG, "Enqueuing outgoing message: $message")
|
||||
outgoingMessageQueue.add(Pair(message, conversation))
|
||||
|
||||
if (outgoingMessageThread == null) {
|
||||
outgoingMessageThread = Thread { outgoingMessageQueueMain() }
|
||||
outgoingMessageThread?.start()
|
||||
}
|
||||
}
|
||||
|
||||
// - private
|
||||
|
||||
private fun outgoingMessageQueueMain() {
|
||||
Log.d(REPO_LOG, "Outgoing Message Queue Main")
|
||||
while (true) {
|
||||
val outgoingMessageRequest = outgoingMessageQueue.poll()?.let {
|
||||
runBlocking {
|
||||
val (message, conversation) = it
|
||||
Log.d(REPO_LOG, "Sending message to $conversation: $message")
|
||||
|
||||
val result = apiInterface.sendMessage(
|
||||
SendMessageRequest(
|
||||
conversationGUID = conversation.guid,
|
||||
body = message.text,
|
||||
transferGUIDs = null,
|
||||
)
|
||||
)
|
||||
|
||||
if (result.isSuccessful) {
|
||||
Log.d(REPO_LOG, "Successfully sent message.")
|
||||
messageDeliveredChannel.send(MessageDeliveredEvent(message, conversation))
|
||||
} else {
|
||||
Log.e(REPO_LOG, "Error sending message. Enqueuing for retry.")
|
||||
outgoingMessageQueue.add(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,8 @@ import net.buzzert.kordophone.backend.model.Message
|
||||
import okhttp3.MediaType
|
||||
import okhttp3.ResponseBody
|
||||
import retrofit2.Response
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
class MockServer: APIInterface {
|
||||
val version = "Kordophone-2.0"
|
||||
@@ -23,6 +25,19 @@ class MockServer: APIInterface {
|
||||
return Response.success(messages[conversationGUID])
|
||||
}
|
||||
|
||||
override suspend fun sendMessage(request: SendMessageRequest): Response<Void> {
|
||||
messages[request.conversationGUID]!!.add(
|
||||
Message(
|
||||
text = request.body,
|
||||
date = Date(),
|
||||
guid = UUID.randomUUID().toString(),
|
||||
sender = null, // me
|
||||
)
|
||||
)
|
||||
|
||||
return Response.success(null)
|
||||
}
|
||||
|
||||
fun addConversation(conversation: Conversation) {
|
||||
conversations.add(conversation)
|
||||
}
|
||||
|
||||
@@ -7,19 +7,13 @@ import net.buzzert.kordophone.backend.server.APIClient
|
||||
import net.buzzert.kordophone.backend.server.APIInterface
|
||||
import net.buzzert.kordophone.backend.server.ChatRepository
|
||||
import net.buzzert.kordophone.backend.server.MockServer
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Test
|
||||
|
||||
import org.junit.Assert.*
|
||||
import java.net.URL
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* Example local unit test, which will execute on the development machine (host).
|
||||
*
|
||||
* See [testing documentation](http://d.android.com/tools/testing).
|
||||
*/
|
||||
class ExampleUnitTest {
|
||||
class BackendTests {
|
||||
private val mockServer = MockServer().also {
|
||||
val conversation = Conversation(
|
||||
date = Date(),
|
||||
@@ -74,4 +68,19 @@ class ExampleUnitTest {
|
||||
assertEquals(message.text, "Hey")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testSendMessage() = runBlocking {
|
||||
val conversation = repository.fetchConversations().first()
|
||||
val outgoingMessage = Message(
|
||||
date = Date(),
|
||||
text = "Hello there!",
|
||||
guid = UUID.randomUUID().toString(),
|
||||
sender = null,
|
||||
)
|
||||
|
||||
repository.enqueueOutgoingMessage(outgoingMessage, conversation)
|
||||
|
||||
val event = repository.messageDeliveredChannel.receive()
|
||||
assertEquals(event.message.text, outgoingMessage.text)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package android.util
|
||||
|
||||
public class Log {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun d(tag: String, msg: String): Int {
|
||||
println("DEBUG: $tag: $msg")
|
||||
return 0
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun i(tag: String, msg: String): Int {
|
||||
println("INFO: $tag: $msg")
|
||||
return 0
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun w(tag: String, msg: String): Int {
|
||||
println("WARN: $tag: $msg")
|
||||
return 0
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun e(tag: String, msg: String): Int {
|
||||
println("ERROR: $tag: $msg")
|
||||
return 0
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user