diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/db/CachedChatDatabase.kt b/backend/src/main/java/net/buzzert/kordophone/backend/db/CachedChatDatabase.kt index dd0506f..7a04e9a 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/db/CachedChatDatabase.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/db/CachedChatDatabase.kt @@ -3,6 +3,7 @@ package net.buzzert.kordophone.backend.db import io.realm.kotlin.Realm import io.realm.kotlin.RealmConfiguration import io.realm.kotlin.UpdatePolicy +import io.realm.kotlin.ext.toRealmList import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.map @@ -10,7 +11,9 @@ import net.buzzert.kordophone.backend.db.model.Conversation import net.buzzert.kordophone.backend.db.model.Message import net.buzzert.kordophone.backend.db.model.toDatabaseConversation import net.buzzert.kordophone.backend.db.model.toDatabaseMessage +import net.buzzert.kordophone.backend.db.model.toRealmInstant import net.buzzert.kordophone.backend.model.GUID +import java.lang.IllegalArgumentException import net.buzzert.kordophone.backend.model.Conversation as ModelConversation import net.buzzert.kordophone.backend.model.Message as ModelMessage @@ -36,26 +39,49 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) { } } + private val realm = Realm.open(realmConfig) + // Flow for watching changes to the database val conversationChanges: Flow> get() = realm.query(Conversation::class).find().asFlow().map { it.list.map { it.toConversation() } } - // For for watching messages added to the database - val messagesAdded: Flow> - get() = realm.query(Message::class).find().asFlow().map { - it.list.map { it.toMessage() } - } + // Flow for watching for message changes for a given conversation + fun messagesChanged(conversation: ModelConversation): Flow> { + return realm.query(Conversation::class, "guid == '${conversation.guid}'") + .find() + .first() + .messages + .asFlow() + .map { it.list.map { it.toMessage(conversation) } } + } - private val realm = Realm.open(realmConfig) + fun writeConversations(conversations: List) = realm.writeBlocking { + val dbConversations = conversations + // Convert to database conversations + .map { it.toDatabaseConversation() } - fun writeConversations(conversations: List) { - val dbConversations = conversations.map { it.toDatabaseConversation() } - realm.writeBlocking { - dbConversations.forEach { - copyToRealm(it, updatePolicy = UpdatePolicy.ALL) + // Look for existing conversations, if applicable + .map { + try { + val existingConvo = getConversationByGuid(it.guid) + + // Update existing record + findLatest(existingConvo)?.apply { + displayName = it.displayName + participants = it.participants + date = it.date + unreadCount = it.unreadCount + } ?: existingConvo + } catch (e: NoSuchElementException) { + // This means object is unmanaged (i.e. it's new) + it + } } + + dbConversations.forEach { + copyToRealm(it, updatePolicy = UpdatePolicy.ALL) } } @@ -67,13 +93,18 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) { fun writeMessages(messages: List, conversation: ModelConversation) { val dbConversation = getConversationByGuid(conversation.guid) realm.writeBlocking { - findLatest(dbConversation)?.messages?.addAll(messages.map { it.toDatabaseMessage() }) + val dbMessages = messages + .map { it.toDatabaseMessage() } + .map { copyToRealm(it, updatePolicy = UpdatePolicy.ALL) } + + val obj = findLatest(dbConversation) + obj!!.messages.addAll(dbMessages) } } fun fetchMessages(conversation: ModelConversation): List { val dbConversation = getConversationByGuid(conversation.guid) - return dbConversation.messages.map { it.toMessage() } + return dbConversation.messages.map { it.toMessage(dbConversation.toConversation()) } } fun close() { diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Conversation.kt b/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Conversation.kt index 9613157..7dd3c70 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Conversation.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Conversation.kt @@ -66,3 +66,4 @@ fun ModelConversation.toDatabaseConversation(): Conversation { guid = from.guid } } + diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Message.kt b/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Message.kt index dc63309..24beca9 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Message.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/db/model/Message.kt @@ -21,7 +21,7 @@ open class Message( var sender: String?, var date: RealmInstant, - var conversation: Conversation?, + var conversationGUID: GUID, ): RealmObject { constructor() : this( @@ -29,10 +29,10 @@ open class Message( text = "", sender = null, date = RealmInstant.now(), - conversation = null, + conversationGUID = ObjectId().toString(), ) - fun toMessage(parentConversation: ModelConversation = conversation!!.toConversation()): ModelMessage { + fun toMessage(parentConversation: ModelConversation): ModelMessage { return ModelMessage( text = text, guid = guid, @@ -50,6 +50,6 @@ fun ModelMessage.toDatabaseMessage(): Message { guid = from.guid sender = from.sender date = from.date.toInstant().toRealmInstant() - conversation = from.conversation.toDatabaseConversation() + conversationGUID = from.conversation.guid } } diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/server/ChatRepository.kt b/backend/src/main/java/net/buzzert/kordophone/backend/server/ChatRepository.kt index e3b4b8d..b3a7178 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/server/ChatRepository.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/server/ChatRepository.kt @@ -1,24 +1,31 @@ package net.buzzert.kordophone.backend.server import android.util.Log +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import net.buzzert.kordophone.backend.db.CachedChatDatabase import net.buzzert.kordophone.backend.events.MessageDeliveredEvent import net.buzzert.kordophone.backend.model.Conversation import net.buzzert.kordophone.backend.model.GUID import net.buzzert.kordophone.backend.model.Message +import net.buzzert.kordophone.backend.model.UpdateItem import java.net.URL import java.util.Queue import java.util.UUID import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.CancellationException const val REPO_LOG: String = "ChatRepository" const val CONVERSATION_MESSAGE_SYNC_COUNT = 10 class ChatRepository( - private val apiInterface: APIInterface, + private val apiClient: APIClient, private val database: CachedChatDatabase, ) { // All (Cached) Conversations @@ -32,8 +39,21 @@ class ChatRepository( val conversationChanges: Flow> get() = database.conversationChanges - val messagesAdded: Flow> - get() = database.messagesAdded + fun messagesChanged(conversation: Conversation): Flow> = + database.messagesChanged(conversation) + + // Testing harness + internal class TestingHarness(private val repository: ChatRepository) { + suspend fun fetchConversations(): List { + return repository.fetchConversations() + } + + suspend fun fetchMessages(conversation: Conversation): List { + return repository.fetchMessages(conversation) + } + } + + internal fun testingHarness(): TestingHarness = TestingHarness(this) private data class OutgoingMessageInfo ( val message: Message, @@ -41,24 +61,39 @@ class ChatRepository( val guid: GUID, ) + private val apiInterface = apiClient.getAPIInterface() private val outgoingMessageQueue: ArrayBlockingQueue = ArrayBlockingQueue(16) private var outgoingMessageThread: Thread? = null + private val updateMonitor = UpdateMonitor(apiClient) + private var updateWatchJob: Job? = null + suspend fun getVersion(): String { return apiInterface.getVersion().string() } - suspend fun fetchConversations(): List { - return apiInterface.getConversations().bodyOnSuccessOrThrow() + fun beginWatchingForUpdates(scope: CoroutineScope) { + updateWatchJob?.cancel() + updateWatchJob = CoroutineScope(scope.coroutineContext).launch { + launch { + updateMonitor.conversationChanged.collect { handleConversationChangedUpdate(it) } + } + launch { + updateMonitor.messageAdded.collect { handleMessageAddedUpdate(it) } + } + } + + updateMonitor.beginMonitoringUpdates() } - suspend fun fetchMessages(conversation: Conversation): List { - return apiInterface.getMessages(conversation.guid) - .bodyOnSuccessOrThrow() - .onEach { it.conversation = conversation } + fun stopWatchingForUpdates() { + updateWatchJob?.cancel() + updateWatchJob = null + + updateMonitor.stopMonitoringForUpdates() } - suspend fun enqueueOutgoingMessage(message: Message, conversation: Conversation): GUID { + fun enqueueOutgoingMessage(message: Message, conversation: Conversation): GUID { val guid = UUID.randomUUID().toString() Log.d(REPO_LOG, "Enqueuing outgoing message: $message ($guid)") @@ -76,6 +111,10 @@ class ChatRepository( return database.getConversationByGuid(guid).toConversation() } + fun messagesForConversation(conversation: Conversation): List { + return database.fetchMessages(conversation) + } + suspend fun synchronize() { Log.d(REPO_LOG, "Synchronizing conversations") @@ -84,6 +123,7 @@ class ChatRepository( database.writeConversations(conversations) // Sync top N number of conversations' message content + Log.d(REPO_LOG, "Synchronizing messages") val sortedConversations = conversations.sortedBy { it.date } for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) { val messages = fetchMessages(conversation) @@ -97,6 +137,26 @@ class ChatRepository( // - private + private suspend fun fetchConversations(): List { + return apiInterface.getConversations().bodyOnSuccessOrThrow() + } + + private suspend fun fetchMessages(conversation: Conversation): List { + return apiInterface.getMessages(conversation.guid) + .bodyOnSuccessOrThrow() + .onEach { it.conversation = conversation } + } + + private fun handleConversationChangedUpdate(conversation: Conversation) { + Log.d(REPO_LOG, "Handling conversation changed update") + database.writeConversations(conversations) + } + + private fun handleMessageAddedUpdate(message: Message) { + Log.d(REPO_LOG, "Handling messages added update") + database.writeMessages(listOf(message), message.conversation) + } + private fun outgoingMessageQueueMain() { Log.d(REPO_LOG, "Outgoing Message Queue Main") while (true) { diff --git a/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt b/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt index fd62a92..9cbe2b8 100644 --- a/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt +++ b/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt @@ -2,6 +2,7 @@ package net.buzzert.kordophone.backend import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext @@ -22,16 +23,15 @@ import java.util.concurrent.TimeUnit class BackendTests { private fun liveRepository(host: String): Pair { val client = RetrofitAPIClient(URL(host)) - val apiInterface = client.getAPIInterface() val database = CachedChatDatabase.testDatabase() - val repository = ChatRepository(apiInterface, database) + val repository = ChatRepository(client, database) return Pair(repository, client) } private fun mockRepository(): Pair { val mockServer = MockServer() val database = CachedChatDatabase.testDatabase() - val repository = ChatRepository(mockServer.getAPIInterface(), database) + val repository = ChatRepository(mockServer.getClient(), database) return Pair(repository, mockServer) } @@ -51,7 +51,7 @@ class BackendTests { // Add conversation to mock server val inConversation = mockServer.addTestConversations(1).first() - val conversations = repository.fetchConversations() + val conversations = repository.testingHarness().fetchConversations() assertEquals(conversations.count(), 1) val outConversation = conversations.first() @@ -68,8 +68,8 @@ class BackendTests { val inConversation = mockServer.addTestConversations(1).first() val inMessage = mockServer.addTestMessages(1, inConversation).first() - val conversations = repository.fetchConversations() - val messages = repository.fetchMessages(conversations.first()) + val conversations = repository.testingHarness().fetchConversations() + val messages = repository.testingHarness().fetchMessages(conversations.first()) assertEquals(messages.count(), 1) val outMessage = messages.first() @@ -143,6 +143,7 @@ class BackendTests { // We got it. if (it.isNotEmpty()) { + println("bink") updateLatch.countDown() cancel() } @@ -179,22 +180,24 @@ class BackendTests { repo.synchronize() // Set up flow watcher, asynchronously - var messageAdded: Message? = null + var messagesAdded: List? = null val updateLatch = CountDownLatch(1) val job = launch { println("Watching for messages to be added...") - repo.messagesAdded.collect { - println("Messages added: $it") + repo.messagesChanged(conversation).collect { + println("Messages changed: $it") - messageAdded = it.first() - updateLatch.countDown() - cancel() + if (it.isNotEmpty()) { + messagesAdded = it + updateLatch.countDown() + cancel() + } } } withContext(Dispatchers.IO) { // Add a message - val message = mockServer.addTestMessages(1, conversation).first() + val messages = mockServer.addTestMessages(10, conversation) // Sync. This should trigger an update println("Synchronizing...") @@ -207,7 +210,7 @@ class BackendTests { assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) // Check what we got back - assertEquals(message, messageAdded) + assertEquals(messages, messagesAdded) } } @@ -232,8 +235,6 @@ class BackendTests { } withContext(Dispatchers.IO) { - mockAPIClient.startWatchingForUpdates(this) - Thread.sleep(500) // Add a conversation @@ -269,8 +270,6 @@ class BackendTests { } withContext(Dispatchers.IO) { - mockAPIClient.startWatchingForUpdates(this) - Thread.sleep(500) // Add a conversation @@ -287,4 +286,52 @@ class BackendTests { assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) } } + + @Test + fun testEndToEndMessageUpdates() = runBlocking { + val (repo, mockServer) = mockRepository() + + val conversation = mockServer.addTestConversations(1).first() + + // Initial sync + repo.synchronize() + + // We're going to generate a couple of messages... + val messagesToGenerate = 5 + + // Start watching for N updates + val updateLatch = CountDownLatch(messagesToGenerate) + val monitorJob = launch { + repo.messagesChanged(conversation).collect { + println("Message changed: $it") + + if (it.isNotEmpty()) { + updateLatch.countDown() + } + + if (updateLatch.count == 0L) { + repo.stopWatchingForUpdates() + cancel() + } + } + } + + withContext(Dispatchers.IO) { + repo.beginWatchingForUpdates(this) + + Thread.sleep(500) + + // Should trigger an update + println("Adding messages") + mockServer.addTestMessages(messagesToGenerate, conversation) + + monitorJob.join() + + assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) + + // Check num messages + val allMessages = repo.messagesForConversation(conversation) + assertEquals(messagesToGenerate, allMessages.count()) + } + } } \ No newline at end of file diff --git a/backend/src/test/java/net/buzzert/kordophone/backend/DatabaseTests.kt b/backend/src/test/java/net/buzzert/kordophone/backend/DatabaseTests.kt index c9bcd93..f81848e 100644 --- a/backend/src/test/java/net/buzzert/kordophone/backend/DatabaseTests.kt +++ b/backend/src/test/java/net/buzzert/kordophone/backend/DatabaseTests.kt @@ -32,15 +32,16 @@ class DatabaseTests { val conversation = MockServer.generateConversation() db.writeConversations(listOf(conversation)) - val message = MockServer.generateMessage(conversation) - db.writeMessages(listOf(message), conversation) + var messages = listOf( + MockServer.generateMessage(conversation), + MockServer.generateMessage(conversation), + ) + db.writeMessages(messages, conversation) val readMessages = db.fetchMessages(conversation) - assertEquals(readMessages.count(), 1) - val readMessage = readMessages[0] - assertEquals(readMessage, message) - assertEquals(readMessage.conversation, conversation) + assertEquals(readMessages, messages) + assertEquals(readMessages[0].conversation, conversation) db.close() } diff --git a/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt b/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt index e26b2aa..8f1c012 100644 --- a/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt +++ b/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt @@ -39,6 +39,8 @@ class MockServer { private val messages: MutableMap> = mutableMapOf() private val _updateFlow: MutableSharedFlow = MutableSharedFlow() + private val client = MockServerClient(this) + companion object { fun generateMessage(parentConversation: Conversation): Message { return Message( @@ -64,7 +66,7 @@ class MockServer { } fun getServer(): MockWebServer = MockWebServer() - fun getClient(): MockServerClient = MockServerClient(this) + fun getClient(): MockServerClient = client fun getAPIInterface(): APIInterface = MockServerClient(this).getAPIInterface() fun addConversation(conversation: Conversation) { @@ -79,6 +81,11 @@ class MockServer { } } + fun updateConversation(conversation: Conversation) { + conversations.removeAll { it.guid == conversation.guid } + addConversation(conversation) + } + fun addMessagesToConversation(conversation: Conversation, messages: List) { val guid = conversation.guid this.messages[guid]?.addAll(messages) @@ -86,11 +93,15 @@ class MockServer { conversation.lastMessagePreview = messages.last().text runBlocking { - _updateFlow.emit(UpdateItem( - sequence = updateMessageSequence++, - conversationChanged = conversation, - messageAdded = messages.first() - )) + for (message in messages) { + _updateFlow.emit( + UpdateItem( + sequence = updateMessageSequence++, + conversationChanged = conversation, + messageAdded = message + ) + ) + } } } @@ -160,10 +171,16 @@ class MockServerClient(private val server: MockServer): APIClient, WebSocketList webServer.enqueue(MockResponse().withWebSocketUpgrade(this)) + if (this.updateWatchJob == null) { + CoroutineScope(Job()).launch { + startWatchingForUpdates(this) + } + } + return OkHttpClient().newWebSocket(request, listener) } - fun startWatchingForUpdates(scope: CoroutineScope) { + private fun startWatchingForUpdates(scope: CoroutineScope) { this.updateWatchJob = scope.launch { server.updateFlow.collect { println("Mock WebSocket is sending a message")