From f6affec830dee65ccf36c14da09420f69bb072c8 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sun, 13 Aug 2023 19:59:04 -0700 Subject: [PATCH] Adds update/db monitoring of messages added --- .../backend/db/CachedChatDatabase.kt | 9 +- .../backend/db/model/Conversation.kt | 23 +++-- .../kordophone/backend/db/model/Message.kt | 10 +- .../kordophone/backend/model/Conversation.kt | 15 ++- .../kordophone/backend/model/Message.kt | 22 ++++- .../backend/server/ChatRepository.kt | 20 +++- .../backend/server/UpdateMonitor.kt | 10 +- .../kordophone/backend/BackendTests.kt | 98 +++++++++++++++++-- .../kordophone/backend/DatabaseTests.kt | 3 +- .../buzzert/kordophone/backend/MockServer.kt | 25 +++-- 10 files changed, 191 insertions(+), 44 deletions(-) diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/db/CachedChatDatabase.kt b/backend/src/main/java/net/buzzert/kordophone/backend/db/CachedChatDatabase.kt index 4fc95f3..dd0506f 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/db/CachedChatDatabase.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/db/CachedChatDatabase.kt @@ -4,6 +4,7 @@ import io.realm.kotlin.Realm import io.realm.kotlin.RealmConfiguration import io.realm.kotlin.UpdatePolicy import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.map import net.buzzert.kordophone.backend.db.model.Conversation import net.buzzert.kordophone.backend.db.model.Message @@ -36,11 +37,17 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) { } // Flow for watching changes to the database - val changes: Flow> + val conversationChanges: Flow> get() = realm.query(Conversation::class).find().asFlow().map { it.list.map { it.toConversation() } } + // For for watching messages added to the database + val messagesAdded: Flow> + get() = realm.query(Message::class).find().asFlow().map { + it.list.map { it.toMessage() } + } + private val realm = Realm.open(realmConfig) fun writeConversations(conversations: List) { diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Conversation.kt b/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Conversation.kt index 31c136a..9613157 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Conversation.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Conversation.kt @@ -22,9 +22,6 @@ open class Conversation( var date: RealmInstant, var unreadCount: Int, - var lastMessagePreview: String?, - var lastMessage: Message?, - var messages: RealmList, ): RealmObject { @@ -36,22 +33,26 @@ open class Conversation( date = RealmInstant.now(), unreadCount = 0, - lastMessagePreview = null, - lastMessage = null, - messages = realmListOf() ) fun toConversation(): ModelConversation { - return ModelConversation( + val conversation = ModelConversation( displayName = displayName, - participants = participants!!.toList(), + participants = participants.toList(), date = Date.from(date.toInstant()), unreadCount = unreadCount, guid = guid, - lastMessagePreview = lastMessagePreview, - lastMessage = lastMessage?.toMessage(), + lastMessage = null, + lastMessagePreview = null, ) + + if (messages.isNotEmpty()) { + conversation.lastMessage = messages.last().toMessage(conversation) + conversation.lastMessagePreview = messages.last().text + } + + return conversation } } @@ -62,8 +63,6 @@ fun ModelConversation.toDatabaseConversation(): Conversation { participants = from.participants.toRealmList() date = from.date.toInstant().toRealmInstant() unreadCount = from.unreadCount - lastMessagePreview = from.lastMessagePreview - lastMessage = from.lastMessage?.toDatabaseMessage() guid = from.guid } } diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Message.kt b/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Message.kt index a2be142..dc63309 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Message.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Message.kt @@ -2,12 +2,15 @@ package net.buzzert.kordophone.backend.db.model import android.view.Display.Mode import io.realm.kotlin.Realm +import io.realm.kotlin.types.EmbeddedRealmObject import io.realm.kotlin.types.RealmInstant import io.realm.kotlin.types.RealmObject import io.realm.kotlin.types.annotations.PrimaryKey +import net.buzzert.kordophone.backend.db.model.Conversation import net.buzzert.kordophone.backend.model.GUID import org.mongodb.kbson.ObjectId import net.buzzert.kordophone.backend.model.Message as ModelMessage +import net.buzzert.kordophone.backend.model.Conversation as ModelConversation import java.util.Date open class Message( @@ -17,6 +20,8 @@ open class Message( var text: String, var sender: String?, var date: RealmInstant, + + var conversation: Conversation?, ): RealmObject { constructor() : this( @@ -24,14 +29,16 @@ open class Message( text = "", sender = null, date = RealmInstant.now(), + conversation = null, ) - fun toMessage(): ModelMessage { + fun toMessage(parentConversation: ModelConversation = conversation!!.toConversation()): ModelMessage { return ModelMessage( text = text, guid = guid, sender = sender, date = Date.from(date.toInstant()), + conversation = parentConversation, ) } } @@ -43,5 +50,6 @@ fun ModelMessage.toDatabaseMessage(): Message { guid = from.guid sender = from.sender date = from.date.toInstant().toRealmInstant() + conversation = from.conversation.toDatabaseConversation() } } diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/model/Conversation.kt b/backend/src/main/java/net/buzzert/kordophone/backend/model/Conversation.kt index 4ccd9ed..d03b0b6 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/model/Conversation.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/model/Conversation.kt @@ -26,4 +26,17 @@ data class Conversation( @SerializedName("lastMessage") var lastMessage: Message?, -) +) { + override fun equals(other: Any?): Boolean { + if (other == null || javaClass != other.javaClass) return false + + val o = other as Conversation + return ( + guid == o.guid && + date == o.date && + participants == o.participants && + displayName == o.displayName && + unreadCount == o.unreadCount + ) + } +} diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/model/Message.kt b/backend/src/main/java/net/buzzert/kordophone/backend/model/Message.kt index c1cdab3..668f591 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/model/Message.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/model/Message.kt @@ -15,4 +15,24 @@ data class Message( @SerializedName("date") val date: Date, -) {} \ No newline at end of file + + @Transient + var conversation: Conversation, +) { + override fun toString(): String { + return "Message(guid=$guid, text=$text, sender=$sender, date=$date, parent=${conversation.guid})" + } + + override fun equals(other: Any?): Boolean { + if (other == null || javaClass != other.javaClass) return false + + val o = other as Message + return ( + guid == o.guid && + text == o.text && + sender == o.sender && + date == o.date && + conversation == o.conversation + ) + } +} \ No newline at end of file diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/server/ChatRepository.kt b/backend/src/main/java/net/buzzert/kordophone/backend/server/ChatRepository.kt index 15160ee..e3b4b8d 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/server/ChatRepository.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/server/ChatRepository.kt @@ -15,6 +15,7 @@ import java.util.UUID import java.util.concurrent.ArrayBlockingQueue const val REPO_LOG: String = "ChatRepository" +const val CONVERSATION_MESSAGE_SYNC_COUNT = 10 class ChatRepository( private val apiInterface: APIInterface, @@ -28,8 +29,11 @@ class ChatRepository( val messageDeliveredChannel = Channel() // Changes Flow - val changes: Flow> - get() = database.changes + val conversationChanges: Flow> + get() = database.conversationChanges + + val messagesAdded: Flow> + get() = database.messagesAdded private data class OutgoingMessageInfo ( val message: Message, @@ -49,7 +53,9 @@ class ChatRepository( } suspend fun fetchMessages(conversation: Conversation): List { - return apiInterface.getMessages(conversation.guid).bodyOnSuccessOrThrow() + return apiInterface.getMessages(conversation.guid) + .bodyOnSuccessOrThrow() + .onEach { it.conversation = conversation } } suspend fun enqueueOutgoingMessage(message: Message, conversation: Conversation): GUID { @@ -73,10 +79,16 @@ class ChatRepository( suspend fun synchronize() { Log.d(REPO_LOG, "Synchronizing conversations") + // Sync conversations val conversations = fetchConversations() database.writeConversations(conversations) - // TODO: Sync messages too? How many? + // Sync top N number of conversations' message content + val sortedConversations = conversations.sortedBy { it.date } + for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) { + val messages = fetchMessages(conversation) + database.writeMessages(messages, conversation) + } } fun close() { diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/server/UpdateMonitor.kt b/backend/src/main/java/net/buzzert/kordophone/backend/server/UpdateMonitor.kt index aa4b1af..c1c5aab 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/server/UpdateMonitor.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/server/UpdateMonitor.kt @@ -34,7 +34,6 @@ class UpdateMonitor(private val client: APIClient) : WebSocketListener() { private val _conversationChanged: MutableSharedFlow = MutableSharedFlow() private val _messageAdded: MutableSharedFlow = MutableSharedFlow() - fun beginMonitoringUpdates() { Log.d(UPMON_LOG, "Opening websocket connection") this.webSocket = client.getWebSocketClient("/updates", null, this) @@ -50,12 +49,15 @@ class UpdateMonitor(private val client: APIClient) : WebSocketListener() { val updateItems: List = gson.fromJson(message, updateItemsType) for (updateItem: UpdateItem in updateItems) { - if (updateItem.conversationChanged != null) { - _conversationChanged.emit(updateItem.conversationChanged) + val conversationChanged = updateItem.conversationChanged + if (conversationChanged != null) { + _conversationChanged.emit(conversationChanged) } if (updateItem.messageAdded != null) { - _messageAdded.emit(updateItem.messageAdded) + _messageAdded.emit(updateItem.messageAdded.also { + it.conversation = conversationChanged!! + }) } } } diff --git a/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt b/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt index 15c739d..fd62a92 100644 --- a/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt +++ b/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import net.buzzert.kordophone.backend.db.CachedChatDatabase +import net.buzzert.kordophone.backend.model.Message import net.buzzert.kordophone.backend.server.APIClient import net.buzzert.kordophone.backend.server.APIInterface import net.buzzert.kordophone.backend.server.ChatRepository @@ -82,7 +83,7 @@ class BackendTests { val (repository, mockServer) = mockRepository() val conversation = mockServer.addTestConversations(1).first() - val outgoingMessage = MockServer.generateMessage() + val outgoingMessage = MockServer.generateMessage(conversation) val guid = repository.enqueueOutgoingMessage(outgoingMessage, conversation) @@ -114,7 +115,7 @@ class BackendTests { // Say unread count + lastMessage preview changes on server. val someConversation = conversations.first().apply { - lastMessagePreview = "COOL" + displayName = "COOL" unreadCount = 2 } @@ -123,8 +124,8 @@ class BackendTests { // Make sure change is reflected val readConversation = repo.conversationForGuid(someConversation.guid) - assertEquals(readConversation.lastMessagePreview, "COOL") - assertEquals(readConversation.unreadCount, 2) + assertEquals("COOL", readConversation.displayName) + assertEquals(2, readConversation.unreadCount) repo.close() } @@ -137,7 +138,7 @@ class BackendTests { val updateLatch = CountDownLatch(1) val job = launch { println("Watching for conversations changes...") - repo.changes.collect { + repo.conversationChanges.collect { println("Changed conversations: $it") // We got it. @@ -166,7 +167,52 @@ class BackendTests { } @Test - fun testUpdateMonitor() = runBlocking { + fun testMessageFlowUpdates() = runBlocking { + val (repo, mockServer) = mockRepository() + + // Add an existing conversation + println("Adding conversation") + val conversation = mockServer.addTestConversations(1).first() + + // Initial sync + println("Initial sync") + repo.synchronize() + + // Set up flow watcher, asynchronously + var messageAdded: Message? = null + val updateLatch = CountDownLatch(1) + val job = launch { + println("Watching for messages to be added...") + repo.messagesAdded.collect { + println("Messages added: $it") + + messageAdded = it.first() + updateLatch.countDown() + cancel() + } + } + + withContext(Dispatchers.IO) { + // Add a message + val message = mockServer.addTestMessages(1, conversation).first() + + // Sync. This should trigger an update + println("Synchronizing...") + repo.synchronize() + + // Wait for the coroutine that is collecting the flow to finish + job.join() + + // Ensure the updates have been processed before proceeding + assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) + + // Check what we got back + assertEquals(message, messageAdded) + } + } + + @Test + fun testUpdateMonitorForConversations() = runBlocking { val mockServer = MockServer() val mockAPIClient = mockServer.getClient() val updateMonitor = UpdateMonitor(mockAPIClient) @@ -201,4 +247,44 @@ class BackendTests { assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) } } + + @Test + fun testUpdateMonitorForMessages() = runBlocking { + val mockServer = MockServer() + val mockAPIClient = mockServer.getClient() + val updateMonitor = UpdateMonitor(mockAPIClient) + + // Set up flow watcher, asynchronously + val updateLatch = CountDownLatch(1) + val job = launch { + updateMonitor.beginMonitoringUpdates() + updateMonitor.messageAdded.collect { + println("Got message added: $it") + updateLatch.countDown() + + updateMonitor.stopMonitoringForUpdates() + mockAPIClient.stopWatchingForUpdates() + cancel() + } + } + + withContext(Dispatchers.IO) { + mockAPIClient.startWatchingForUpdates(this) + + Thread.sleep(500) + + // Add a conversation + println("Adding conversation") + val convo = mockServer.addTestConversations(1).first() + + // Add a test message + mockServer.addTestMessages(1, convo) + + // Wait for the coroutine that is collecting the flow to finish + job.join() + + // Ensure the updates have been processed before proceeding + assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) + } + } } \ No newline at end of file diff --git a/backend/src/test/java/net/buzzert/kordophone/backend/DatabaseTests.kt b/backend/src/test/java/net/buzzert/kordophone/backend/DatabaseTests.kt index 6906878..c9bcd93 100644 --- a/backend/src/test/java/net/buzzert/kordophone/backend/DatabaseTests.kt +++ b/backend/src/test/java/net/buzzert/kordophone/backend/DatabaseTests.kt @@ -32,7 +32,7 @@ class DatabaseTests { val conversation = MockServer.generateConversation() db.writeConversations(listOf(conversation)) - val message = MockServer.generateMessage() + val message = MockServer.generateMessage(conversation) db.writeMessages(listOf(message), conversation) val readMessages = db.fetchMessages(conversation) @@ -40,6 +40,7 @@ class DatabaseTests { val readMessage = readMessages[0] assertEquals(readMessage, message) + assertEquals(readMessage.conversation, conversation) db.close() } diff --git a/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt b/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt index 60a8cdc..e26b2aa 100644 --- a/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt +++ b/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt @@ -1,13 +1,11 @@ package net.buzzert.kordophone.backend import com.google.gson.Gson -import com.google.gson.reflect.TypeToken import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import net.buzzert.kordophone.backend.model.Conversation @@ -19,7 +17,6 @@ import net.buzzert.kordophone.backend.server.APIInterface import net.buzzert.kordophone.backend.server.SendMessageRequest import net.buzzert.kordophone.backend.server.authenticatedWebSocketURL import okhttp3.HttpUrl -import okhttp3.MediaType import okhttp3.MediaType.Companion.toMediaType import okhttp3.OkHttpClient import okhttp3.Request @@ -30,7 +27,6 @@ import okhttp3.WebSocketListener import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer import retrofit2.Response -import java.net.URL import java.util.Date import java.util.UUID @@ -44,24 +40,24 @@ class MockServer { private val _updateFlow: MutableSharedFlow = MutableSharedFlow() companion object { - fun generateMessage(): Message { + fun generateMessage(parentConversation: Conversation): Message { return Message( date = Date(), text = "This is a test!", guid = UUID.randomUUID().toString(), sender = null, + conversation = parentConversation, ) } fun generateConversation(): Conversation { - val lastMessage = generateMessage() return Conversation( date = Date(), participants = listOf("james@magahern.com"), displayName = null, unreadCount = 0, - lastMessagePreview = lastMessage.text, - lastMessage = lastMessage, + lastMessagePreview = null, + lastMessage = null, guid = UUID.randomUUID().toString() ) } @@ -73,6 +69,7 @@ class MockServer { fun addConversation(conversation: Conversation) { conversations.add(conversation) + messages[conversation.guid] = mutableListOf() runBlocking { _updateFlow.emit(UpdateItem( @@ -84,9 +81,9 @@ class MockServer { fun addMessagesToConversation(conversation: Conversation, messages: List) { val guid = conversation.guid - this.messages[guid]?.addAll(messages) ?: run { - this.messages[guid] = messages.toMutableList() - } + this.messages[guid]?.addAll(messages) + conversation.lastMessage = messages.last() + conversation.lastMessagePreview = messages.last().text runBlocking { _updateFlow.emit(UpdateItem( @@ -111,7 +108,7 @@ class MockServer { fun addTestMessages(count: Int, conversation: Conversation): List { val testMessages = ArrayList() for (i in 0..