Adds update/db monitoring of messages added
This commit is contained in:
@@ -4,6 +4,7 @@ import io.realm.kotlin.Realm
|
||||
import io.realm.kotlin.RealmConfiguration
|
||||
import io.realm.kotlin.UpdatePolicy
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import net.buzzert.kordophone.backend.db.model.Conversation
|
||||
import net.buzzert.kordophone.backend.db.model.Message
|
||||
@@ -36,11 +37,17 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
}
|
||||
|
||||
// Flow for watching changes to the database
|
||||
val changes: Flow<List<ModelConversation>>
|
||||
val conversationChanges: Flow<List<ModelConversation>>
|
||||
get() = realm.query(Conversation::class).find().asFlow().map {
|
||||
it.list.map { it.toConversation() }
|
||||
}
|
||||
|
||||
// For for watching messages added to the database
|
||||
val messagesAdded: Flow<List<ModelMessage>>
|
||||
get() = realm.query(Message::class).find().asFlow().map {
|
||||
it.list.map { it.toMessage() }
|
||||
}
|
||||
|
||||
private val realm = Realm.open(realmConfig)
|
||||
|
||||
fun writeConversations(conversations: List<ModelConversation>) {
|
||||
|
||||
@@ -22,9 +22,6 @@ open class Conversation(
|
||||
var date: RealmInstant,
|
||||
var unreadCount: Int,
|
||||
|
||||
var lastMessagePreview: String?,
|
||||
var lastMessage: Message?,
|
||||
|
||||
var messages: RealmList<Message>,
|
||||
): RealmObject
|
||||
{
|
||||
@@ -36,22 +33,26 @@ open class Conversation(
|
||||
date = RealmInstant.now(),
|
||||
unreadCount = 0,
|
||||
|
||||
lastMessagePreview = null,
|
||||
lastMessage = null,
|
||||
|
||||
messages = realmListOf<Message>()
|
||||
)
|
||||
|
||||
fun toConversation(): ModelConversation {
|
||||
return ModelConversation(
|
||||
val conversation = ModelConversation(
|
||||
displayName = displayName,
|
||||
participants = participants!!.toList(),
|
||||
participants = participants.toList(),
|
||||
date = Date.from(date.toInstant()),
|
||||
unreadCount = unreadCount,
|
||||
guid = guid,
|
||||
lastMessagePreview = lastMessagePreview,
|
||||
lastMessage = lastMessage?.toMessage(),
|
||||
lastMessage = null,
|
||||
lastMessagePreview = null,
|
||||
)
|
||||
|
||||
if (messages.isNotEmpty()) {
|
||||
conversation.lastMessage = messages.last().toMessage(conversation)
|
||||
conversation.lastMessagePreview = messages.last().text
|
||||
}
|
||||
|
||||
return conversation
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,8 +63,6 @@ fun ModelConversation.toDatabaseConversation(): Conversation {
|
||||
participants = from.participants.toRealmList()
|
||||
date = from.date.toInstant().toRealmInstant()
|
||||
unreadCount = from.unreadCount
|
||||
lastMessagePreview = from.lastMessagePreview
|
||||
lastMessage = from.lastMessage?.toDatabaseMessage()
|
||||
guid = from.guid
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,12 +2,15 @@ package net.buzzert.kordophone.backend.db.model
|
||||
|
||||
import android.view.Display.Mode
|
||||
import io.realm.kotlin.Realm
|
||||
import io.realm.kotlin.types.EmbeddedRealmObject
|
||||
import io.realm.kotlin.types.RealmInstant
|
||||
import io.realm.kotlin.types.RealmObject
|
||||
import io.realm.kotlin.types.annotations.PrimaryKey
|
||||
import net.buzzert.kordophone.backend.db.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import org.mongodb.kbson.ObjectId
|
||||
import net.buzzert.kordophone.backend.model.Message as ModelMessage
|
||||
import net.buzzert.kordophone.backend.model.Conversation as ModelConversation
|
||||
import java.util.Date
|
||||
|
||||
open class Message(
|
||||
@@ -17,6 +20,8 @@ open class Message(
|
||||
var text: String,
|
||||
var sender: String?,
|
||||
var date: RealmInstant,
|
||||
|
||||
var conversation: Conversation?,
|
||||
): RealmObject
|
||||
{
|
||||
constructor() : this(
|
||||
@@ -24,14 +29,16 @@ open class Message(
|
||||
text = "",
|
||||
sender = null,
|
||||
date = RealmInstant.now(),
|
||||
conversation = null,
|
||||
)
|
||||
|
||||
fun toMessage(): ModelMessage {
|
||||
fun toMessage(parentConversation: ModelConversation = conversation!!.toConversation()): ModelMessage {
|
||||
return ModelMessage(
|
||||
text = text,
|
||||
guid = guid,
|
||||
sender = sender,
|
||||
date = Date.from(date.toInstant()),
|
||||
conversation = parentConversation,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -43,5 +50,6 @@ fun ModelMessage.toDatabaseMessage(): Message {
|
||||
guid = from.guid
|
||||
sender = from.sender
|
||||
date = from.date.toInstant().toRealmInstant()
|
||||
conversation = from.conversation.toDatabaseConversation()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,4 +26,17 @@ data class Conversation(
|
||||
|
||||
@SerializedName("lastMessage")
|
||||
var lastMessage: Message?,
|
||||
)
|
||||
) {
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (other == null || javaClass != other.javaClass) return false
|
||||
|
||||
val o = other as Conversation
|
||||
return (
|
||||
guid == o.guid &&
|
||||
date == o.date &&
|
||||
participants == o.participants &&
|
||||
displayName == o.displayName &&
|
||||
unreadCount == o.unreadCount
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,4 +15,24 @@ data class Message(
|
||||
|
||||
@SerializedName("date")
|
||||
val date: Date,
|
||||
) {}
|
||||
|
||||
@Transient
|
||||
var conversation: Conversation,
|
||||
) {
|
||||
override fun toString(): String {
|
||||
return "Message(guid=$guid, text=$text, sender=$sender, date=$date, parent=${conversation.guid})"
|
||||
}
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (other == null || javaClass != other.javaClass) return false
|
||||
|
||||
val o = other as Message
|
||||
return (
|
||||
guid == o.guid &&
|
||||
text == o.text &&
|
||||
sender == o.sender &&
|
||||
date == o.date &&
|
||||
conversation == o.conversation
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import java.util.UUID
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
|
||||
const val REPO_LOG: String = "ChatRepository"
|
||||
const val CONVERSATION_MESSAGE_SYNC_COUNT = 10
|
||||
|
||||
class ChatRepository(
|
||||
private val apiInterface: APIInterface,
|
||||
@@ -28,8 +29,11 @@ class ChatRepository(
|
||||
val messageDeliveredChannel = Channel<MessageDeliveredEvent>()
|
||||
|
||||
// Changes Flow
|
||||
val changes: Flow<List<Conversation>>
|
||||
get() = database.changes
|
||||
val conversationChanges: Flow<List<Conversation>>
|
||||
get() = database.conversationChanges
|
||||
|
||||
val messagesAdded: Flow<List<Message>>
|
||||
get() = database.messagesAdded
|
||||
|
||||
private data class OutgoingMessageInfo (
|
||||
val message: Message,
|
||||
@@ -49,7 +53,9 @@ class ChatRepository(
|
||||
}
|
||||
|
||||
suspend fun fetchMessages(conversation: Conversation): List<Message> {
|
||||
return apiInterface.getMessages(conversation.guid).bodyOnSuccessOrThrow()
|
||||
return apiInterface.getMessages(conversation.guid)
|
||||
.bodyOnSuccessOrThrow()
|
||||
.onEach { it.conversation = conversation }
|
||||
}
|
||||
|
||||
suspend fun enqueueOutgoingMessage(message: Message, conversation: Conversation): GUID {
|
||||
@@ -73,10 +79,16 @@ class ChatRepository(
|
||||
suspend fun synchronize() {
|
||||
Log.d(REPO_LOG, "Synchronizing conversations")
|
||||
|
||||
// Sync conversations
|
||||
val conversations = fetchConversations()
|
||||
database.writeConversations(conversations)
|
||||
|
||||
// TODO: Sync messages too? How many?
|
||||
// Sync top N number of conversations' message content
|
||||
val sortedConversations = conversations.sortedBy { it.date }
|
||||
for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) {
|
||||
val messages = fetchMessages(conversation)
|
||||
database.writeMessages(messages, conversation)
|
||||
}
|
||||
}
|
||||
|
||||
fun close() {
|
||||
|
||||
@@ -34,7 +34,6 @@ class UpdateMonitor(private val client: APIClient) : WebSocketListener() {
|
||||
private val _conversationChanged: MutableSharedFlow<Conversation> = MutableSharedFlow()
|
||||
private val _messageAdded: MutableSharedFlow<Message> = MutableSharedFlow()
|
||||
|
||||
|
||||
fun beginMonitoringUpdates() {
|
||||
Log.d(UPMON_LOG, "Opening websocket connection")
|
||||
this.webSocket = client.getWebSocketClient("/updates", null, this)
|
||||
@@ -50,12 +49,15 @@ class UpdateMonitor(private val client: APIClient) : WebSocketListener() {
|
||||
|
||||
val updateItems: List<UpdateItem> = gson.fromJson(message, updateItemsType)
|
||||
for (updateItem: UpdateItem in updateItems) {
|
||||
if (updateItem.conversationChanged != null) {
|
||||
_conversationChanged.emit(updateItem.conversationChanged)
|
||||
val conversationChanged = updateItem.conversationChanged
|
||||
if (conversationChanged != null) {
|
||||
_conversationChanged.emit(conversationChanged)
|
||||
}
|
||||
|
||||
if (updateItem.messageAdded != null) {
|
||||
_messageAdded.emit(updateItem.messageAdded)
|
||||
_messageAdded.emit(updateItem.messageAdded.also {
|
||||
it.conversation = conversationChanged!!
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
import net.buzzert.kordophone.backend.db.CachedChatDatabase
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import net.buzzert.kordophone.backend.server.APIClient
|
||||
import net.buzzert.kordophone.backend.server.APIInterface
|
||||
import net.buzzert.kordophone.backend.server.ChatRepository
|
||||
@@ -82,7 +83,7 @@ class BackendTests {
|
||||
val (repository, mockServer) = mockRepository()
|
||||
|
||||
val conversation = mockServer.addTestConversations(1).first()
|
||||
val outgoingMessage = MockServer.generateMessage()
|
||||
val outgoingMessage = MockServer.generateMessage(conversation)
|
||||
|
||||
val guid = repository.enqueueOutgoingMessage(outgoingMessage, conversation)
|
||||
|
||||
@@ -114,7 +115,7 @@ class BackendTests {
|
||||
|
||||
// Say unread count + lastMessage preview changes on server.
|
||||
val someConversation = conversations.first().apply {
|
||||
lastMessagePreview = "COOL"
|
||||
displayName = "COOL"
|
||||
unreadCount = 2
|
||||
}
|
||||
|
||||
@@ -123,8 +124,8 @@ class BackendTests {
|
||||
|
||||
// Make sure change is reflected
|
||||
val readConversation = repo.conversationForGuid(someConversation.guid)
|
||||
assertEquals(readConversation.lastMessagePreview, "COOL")
|
||||
assertEquals(readConversation.unreadCount, 2)
|
||||
assertEquals("COOL", readConversation.displayName)
|
||||
assertEquals(2, readConversation.unreadCount)
|
||||
|
||||
repo.close()
|
||||
}
|
||||
@@ -137,7 +138,7 @@ class BackendTests {
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
println("Watching for conversations changes...")
|
||||
repo.changes.collect {
|
||||
repo.conversationChanges.collect {
|
||||
println("Changed conversations: $it")
|
||||
|
||||
// We got it.
|
||||
@@ -166,7 +167,52 @@ class BackendTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpdateMonitor() = runBlocking {
|
||||
fun testMessageFlowUpdates() = runBlocking {
|
||||
val (repo, mockServer) = mockRepository()
|
||||
|
||||
// Add an existing conversation
|
||||
println("Adding conversation")
|
||||
val conversation = mockServer.addTestConversations(1).first()
|
||||
|
||||
// Initial sync
|
||||
println("Initial sync")
|
||||
repo.synchronize()
|
||||
|
||||
// Set up flow watcher, asynchronously
|
||||
var messageAdded: Message? = null
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
println("Watching for messages to be added...")
|
||||
repo.messagesAdded.collect {
|
||||
println("Messages added: $it")
|
||||
|
||||
messageAdded = it.first()
|
||||
updateLatch.countDown()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
// Add a message
|
||||
val message = mockServer.addTestMessages(1, conversation).first()
|
||||
|
||||
// Sync. This should trigger an update
|
||||
println("Synchronizing...")
|
||||
repo.synchronize()
|
||||
|
||||
// Wait for the coroutine that is collecting the flow to finish
|
||||
job.join()
|
||||
|
||||
// Ensure the updates have been processed before proceeding
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
|
||||
// Check what we got back
|
||||
assertEquals(message, messageAdded)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpdateMonitorForConversations() = runBlocking {
|
||||
val mockServer = MockServer()
|
||||
val mockAPIClient = mockServer.getClient()
|
||||
val updateMonitor = UpdateMonitor(mockAPIClient)
|
||||
@@ -201,4 +247,44 @@ class BackendTests {
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpdateMonitorForMessages() = runBlocking {
|
||||
val mockServer = MockServer()
|
||||
val mockAPIClient = mockServer.getClient()
|
||||
val updateMonitor = UpdateMonitor(mockAPIClient)
|
||||
|
||||
// Set up flow watcher, asynchronously
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
updateMonitor.beginMonitoringUpdates()
|
||||
updateMonitor.messageAdded.collect {
|
||||
println("Got message added: $it")
|
||||
updateLatch.countDown()
|
||||
|
||||
updateMonitor.stopMonitoringForUpdates()
|
||||
mockAPIClient.stopWatchingForUpdates()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
mockAPIClient.startWatchingForUpdates(this)
|
||||
|
||||
Thread.sleep(500)
|
||||
|
||||
// Add a conversation
|
||||
println("Adding conversation")
|
||||
val convo = mockServer.addTestConversations(1).first()
|
||||
|
||||
// Add a test message
|
||||
mockServer.addTestMessages(1, convo)
|
||||
|
||||
// Wait for the coroutine that is collecting the flow to finish
|
||||
job.join()
|
||||
|
||||
// Ensure the updates have been processed before proceeding
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -32,7 +32,7 @@ class DatabaseTests {
|
||||
val conversation = MockServer.generateConversation()
|
||||
db.writeConversations(listOf(conversation))
|
||||
|
||||
val message = MockServer.generateMessage()
|
||||
val message = MockServer.generateMessage(conversation)
|
||||
db.writeMessages(listOf(message), conversation)
|
||||
|
||||
val readMessages = db.fetchMessages(conversation)
|
||||
@@ -40,6 +40,7 @@ class DatabaseTests {
|
||||
|
||||
val readMessage = readMessages[0]
|
||||
assertEquals(readMessage, message)
|
||||
assertEquals(readMessage.conversation, conversation)
|
||||
|
||||
db.close()
|
||||
}
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
package net.buzzert.kordophone.backend
|
||||
|
||||
import com.google.gson.Gson
|
||||
import com.google.gson.reflect.TypeToken
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.cancelAndJoin
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
@@ -19,7 +17,6 @@ import net.buzzert.kordophone.backend.server.APIInterface
|
||||
import net.buzzert.kordophone.backend.server.SendMessageRequest
|
||||
import net.buzzert.kordophone.backend.server.authenticatedWebSocketURL
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.MediaType
|
||||
import okhttp3.MediaType.Companion.toMediaType
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
@@ -30,7 +27,6 @@ import okhttp3.WebSocketListener
|
||||
import okhttp3.mockwebserver.MockResponse
|
||||
import okhttp3.mockwebserver.MockWebServer
|
||||
import retrofit2.Response
|
||||
import java.net.URL
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
@@ -44,24 +40,24 @@ class MockServer {
|
||||
private val _updateFlow: MutableSharedFlow<UpdateItem> = MutableSharedFlow()
|
||||
|
||||
companion object {
|
||||
fun generateMessage(): Message {
|
||||
fun generateMessage(parentConversation: Conversation): Message {
|
||||
return Message(
|
||||
date = Date(),
|
||||
text = "This is a test!",
|
||||
guid = UUID.randomUUID().toString(),
|
||||
sender = null,
|
||||
conversation = parentConversation,
|
||||
)
|
||||
}
|
||||
|
||||
fun generateConversation(): Conversation {
|
||||
val lastMessage = generateMessage()
|
||||
return Conversation(
|
||||
date = Date(),
|
||||
participants = listOf("james@magahern.com"),
|
||||
displayName = null,
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = lastMessage.text,
|
||||
lastMessage = lastMessage,
|
||||
lastMessagePreview = null,
|
||||
lastMessage = null,
|
||||
guid = UUID.randomUUID().toString()
|
||||
)
|
||||
}
|
||||
@@ -73,6 +69,7 @@ class MockServer {
|
||||
|
||||
fun addConversation(conversation: Conversation) {
|
||||
conversations.add(conversation)
|
||||
messages[conversation.guid] = mutableListOf()
|
||||
|
||||
runBlocking {
|
||||
_updateFlow.emit(UpdateItem(
|
||||
@@ -84,9 +81,9 @@ class MockServer {
|
||||
|
||||
fun addMessagesToConversation(conversation: Conversation, messages: List<Message>) {
|
||||
val guid = conversation.guid
|
||||
this.messages[guid]?.addAll(messages) ?: run {
|
||||
this.messages[guid] = messages.toMutableList()
|
||||
}
|
||||
this.messages[guid]?.addAll(messages)
|
||||
conversation.lastMessage = messages.last()
|
||||
conversation.lastMessagePreview = messages.last().text
|
||||
|
||||
runBlocking {
|
||||
_updateFlow.emit(UpdateItem(
|
||||
@@ -111,7 +108,7 @@ class MockServer {
|
||||
fun addTestMessages(count: Int, conversation: Conversation): List<Message> {
|
||||
val testMessages = ArrayList<Message>()
|
||||
for (i in 0..<count) {
|
||||
val message = MockServer.generateMessage()
|
||||
val message = MockServer.generateMessage(conversation)
|
||||
testMessages.add(message)
|
||||
}
|
||||
|
||||
@@ -124,14 +121,16 @@ class MockServer {
|
||||
}
|
||||
|
||||
internal fun sendMessage(body: String, toConversationGUID: GUID) {
|
||||
val conversation = conversations.first { it.guid == toConversationGUID }
|
||||
|
||||
val message = Message(
|
||||
text = body,
|
||||
date = Date(),
|
||||
guid = UUID.randomUUID().toString(),
|
||||
sender = null, // me
|
||||
conversation = conversation
|
||||
)
|
||||
|
||||
val conversation = conversations.first { it.guid == toConversationGUID }
|
||||
addMessagesToConversation(conversation, listOf(message))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user