Private
Public Access
1
0

Implements message watching per conversation

This commit is contained in:
2023-08-14 00:12:38 -07:00
parent f6affec830
commit 28f2bfe580
7 changed files with 215 additions and 58 deletions

View File

@@ -3,6 +3,7 @@ package net.buzzert.kordophone.backend.db
import io.realm.kotlin.Realm import io.realm.kotlin.Realm
import io.realm.kotlin.RealmConfiguration import io.realm.kotlin.RealmConfiguration
import io.realm.kotlin.UpdatePolicy import io.realm.kotlin.UpdatePolicy
import io.realm.kotlin.ext.toRealmList
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
@@ -10,7 +11,9 @@ import net.buzzert.kordophone.backend.db.model.Conversation
import net.buzzert.kordophone.backend.db.model.Message import net.buzzert.kordophone.backend.db.model.Message
import net.buzzert.kordophone.backend.db.model.toDatabaseConversation import net.buzzert.kordophone.backend.db.model.toDatabaseConversation
import net.buzzert.kordophone.backend.db.model.toDatabaseMessage import net.buzzert.kordophone.backend.db.model.toDatabaseMessage
import net.buzzert.kordophone.backend.db.model.toRealmInstant
import net.buzzert.kordophone.backend.model.GUID import net.buzzert.kordophone.backend.model.GUID
import java.lang.IllegalArgumentException
import net.buzzert.kordophone.backend.model.Conversation as ModelConversation import net.buzzert.kordophone.backend.model.Conversation as ModelConversation
import net.buzzert.kordophone.backend.model.Message as ModelMessage import net.buzzert.kordophone.backend.model.Message as ModelMessage
@@ -36,26 +39,49 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
} }
} }
private val realm = Realm.open(realmConfig)
// Flow for watching changes to the database // Flow for watching changes to the database
val conversationChanges: Flow<List<ModelConversation>> val conversationChanges: Flow<List<ModelConversation>>
get() = realm.query(Conversation::class).find().asFlow().map { get() = realm.query(Conversation::class).find().asFlow().map {
it.list.map { it.toConversation() } it.list.map { it.toConversation() }
} }
// For for watching messages added to the database // Flow for watching for message changes for a given conversation
val messagesAdded: Flow<List<ModelMessage>> fun messagesChanged(conversation: ModelConversation): Flow<List<ModelMessage>> {
get() = realm.query(Message::class).find().asFlow().map { return realm.query(Conversation::class, "guid == '${conversation.guid}'")
it.list.map { it.toMessage() } .find()
} .first()
.messages
.asFlow()
.map { it.list.map { it.toMessage(conversation) } }
}
private val realm = Realm.open(realmConfig) fun writeConversations(conversations: List<ModelConversation>) = realm.writeBlocking {
val dbConversations = conversations
// Convert to database conversations
.map { it.toDatabaseConversation() }
fun writeConversations(conversations: List<ModelConversation>) { // Look for existing conversations, if applicable
val dbConversations = conversations.map { it.toDatabaseConversation() } .map {
realm.writeBlocking { try {
dbConversations.forEach { val existingConvo = getConversationByGuid(it.guid)
copyToRealm(it, updatePolicy = UpdatePolicy.ALL)
// Update existing record
findLatest(existingConvo)?.apply {
displayName = it.displayName
participants = it.participants
date = it.date
unreadCount = it.unreadCount
} ?: existingConvo
} catch (e: NoSuchElementException) {
// This means object is unmanaged (i.e. it's new)
it
}
} }
dbConversations.forEach {
copyToRealm(it, updatePolicy = UpdatePolicy.ALL)
} }
} }
@@ -67,13 +93,18 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
fun writeMessages(messages: List<ModelMessage>, conversation: ModelConversation) { fun writeMessages(messages: List<ModelMessage>, conversation: ModelConversation) {
val dbConversation = getConversationByGuid(conversation.guid) val dbConversation = getConversationByGuid(conversation.guid)
realm.writeBlocking { realm.writeBlocking {
findLatest(dbConversation)?.messages?.addAll(messages.map { it.toDatabaseMessage() }) val dbMessages = messages
.map { it.toDatabaseMessage() }
.map { copyToRealm(it, updatePolicy = UpdatePolicy.ALL) }
val obj = findLatest(dbConversation)
obj!!.messages.addAll(dbMessages)
} }
} }
fun fetchMessages(conversation: ModelConversation): List<ModelMessage> { fun fetchMessages(conversation: ModelConversation): List<ModelMessage> {
val dbConversation = getConversationByGuid(conversation.guid) val dbConversation = getConversationByGuid(conversation.guid)
return dbConversation.messages.map { it.toMessage() } return dbConversation.messages.map { it.toMessage(dbConversation.toConversation()) }
} }
fun close() { fun close() {

View File

@@ -66,3 +66,4 @@ fun ModelConversation.toDatabaseConversation(): Conversation {
guid = from.guid guid = from.guid
} }
} }

View File

@@ -21,7 +21,7 @@ open class Message(
var sender: String?, var sender: String?,
var date: RealmInstant, var date: RealmInstant,
var conversation: Conversation?, var conversationGUID: GUID,
): RealmObject ): RealmObject
{ {
constructor() : this( constructor() : this(
@@ -29,10 +29,10 @@ open class Message(
text = "", text = "",
sender = null, sender = null,
date = RealmInstant.now(), date = RealmInstant.now(),
conversation = null, conversationGUID = ObjectId().toString(),
) )
fun toMessage(parentConversation: ModelConversation = conversation!!.toConversation()): ModelMessage { fun toMessage(parentConversation: ModelConversation): ModelMessage {
return ModelMessage( return ModelMessage(
text = text, text = text,
guid = guid, guid = guid,
@@ -50,6 +50,6 @@ fun ModelMessage.toDatabaseMessage(): Message {
guid = from.guid guid = from.guid
sender = from.sender sender = from.sender
date = from.date.toInstant().toRealmInstant() date = from.date.toInstant().toRealmInstant()
conversation = from.conversation.toDatabaseConversation() conversationGUID = from.conversation.guid
} }
} }

View File

@@ -1,24 +1,31 @@
package net.buzzert.kordophone.backend.server package net.buzzert.kordophone.backend.server
import android.util.Log import android.util.Log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import net.buzzert.kordophone.backend.db.CachedChatDatabase import net.buzzert.kordophone.backend.db.CachedChatDatabase
import net.buzzert.kordophone.backend.events.MessageDeliveredEvent import net.buzzert.kordophone.backend.events.MessageDeliveredEvent
import net.buzzert.kordophone.backend.model.Conversation import net.buzzert.kordophone.backend.model.Conversation
import net.buzzert.kordophone.backend.model.GUID import net.buzzert.kordophone.backend.model.GUID
import net.buzzert.kordophone.backend.model.Message import net.buzzert.kordophone.backend.model.Message
import net.buzzert.kordophone.backend.model.UpdateItem
import java.net.URL import java.net.URL
import java.util.Queue import java.util.Queue
import java.util.UUID import java.util.UUID
import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CancellationException
const val REPO_LOG: String = "ChatRepository" const val REPO_LOG: String = "ChatRepository"
const val CONVERSATION_MESSAGE_SYNC_COUNT = 10 const val CONVERSATION_MESSAGE_SYNC_COUNT = 10
class ChatRepository( class ChatRepository(
private val apiInterface: APIInterface, private val apiClient: APIClient,
private val database: CachedChatDatabase, private val database: CachedChatDatabase,
) { ) {
// All (Cached) Conversations // All (Cached) Conversations
@@ -32,8 +39,21 @@ class ChatRepository(
val conversationChanges: Flow<List<Conversation>> val conversationChanges: Flow<List<Conversation>>
get() = database.conversationChanges get() = database.conversationChanges
val messagesAdded: Flow<List<Message>> fun messagesChanged(conversation: Conversation): Flow<List<Message>> =
get() = database.messagesAdded database.messagesChanged(conversation)
// Testing harness
internal class TestingHarness(private val repository: ChatRepository) {
suspend fun fetchConversations(): List<Conversation> {
return repository.fetchConversations()
}
suspend fun fetchMessages(conversation: Conversation): List<Message> {
return repository.fetchMessages(conversation)
}
}
internal fun testingHarness(): TestingHarness = TestingHarness(this)
private data class OutgoingMessageInfo ( private data class OutgoingMessageInfo (
val message: Message, val message: Message,
@@ -41,24 +61,39 @@ class ChatRepository(
val guid: GUID, val guid: GUID,
) )
private val apiInterface = apiClient.getAPIInterface()
private val outgoingMessageQueue: ArrayBlockingQueue<OutgoingMessageInfo> = ArrayBlockingQueue(16) private val outgoingMessageQueue: ArrayBlockingQueue<OutgoingMessageInfo> = ArrayBlockingQueue(16)
private var outgoingMessageThread: Thread? = null private var outgoingMessageThread: Thread? = null
private val updateMonitor = UpdateMonitor(apiClient)
private var updateWatchJob: Job? = null
suspend fun getVersion(): String { suspend fun getVersion(): String {
return apiInterface.getVersion().string() return apiInterface.getVersion().string()
} }
suspend fun fetchConversations(): List<Conversation> { fun beginWatchingForUpdates(scope: CoroutineScope) {
return apiInterface.getConversations().bodyOnSuccessOrThrow() updateWatchJob?.cancel()
updateWatchJob = CoroutineScope(scope.coroutineContext).launch {
launch {
updateMonitor.conversationChanged.collect { handleConversationChangedUpdate(it) }
}
launch {
updateMonitor.messageAdded.collect { handleMessageAddedUpdate(it) }
}
}
updateMonitor.beginMonitoringUpdates()
} }
suspend fun fetchMessages(conversation: Conversation): List<Message> { fun stopWatchingForUpdates() {
return apiInterface.getMessages(conversation.guid) updateWatchJob?.cancel()
.bodyOnSuccessOrThrow() updateWatchJob = null
.onEach { it.conversation = conversation }
updateMonitor.stopMonitoringForUpdates()
} }
suspend fun enqueueOutgoingMessage(message: Message, conversation: Conversation): GUID { fun enqueueOutgoingMessage(message: Message, conversation: Conversation): GUID {
val guid = UUID.randomUUID().toString() val guid = UUID.randomUUID().toString()
Log.d(REPO_LOG, "Enqueuing outgoing message: $message ($guid)") Log.d(REPO_LOG, "Enqueuing outgoing message: $message ($guid)")
@@ -76,6 +111,10 @@ class ChatRepository(
return database.getConversationByGuid(guid).toConversation() return database.getConversationByGuid(guid).toConversation()
} }
fun messagesForConversation(conversation: Conversation): List<Message> {
return database.fetchMessages(conversation)
}
suspend fun synchronize() { suspend fun synchronize() {
Log.d(REPO_LOG, "Synchronizing conversations") Log.d(REPO_LOG, "Synchronizing conversations")
@@ -84,6 +123,7 @@ class ChatRepository(
database.writeConversations(conversations) database.writeConversations(conversations)
// Sync top N number of conversations' message content // Sync top N number of conversations' message content
Log.d(REPO_LOG, "Synchronizing messages")
val sortedConversations = conversations.sortedBy { it.date } val sortedConversations = conversations.sortedBy { it.date }
for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) { for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) {
val messages = fetchMessages(conversation) val messages = fetchMessages(conversation)
@@ -97,6 +137,26 @@ class ChatRepository(
// - private // - private
private suspend fun fetchConversations(): List<Conversation> {
return apiInterface.getConversations().bodyOnSuccessOrThrow()
}
private suspend fun fetchMessages(conversation: Conversation): List<Message> {
return apiInterface.getMessages(conversation.guid)
.bodyOnSuccessOrThrow()
.onEach { it.conversation = conversation }
}
private fun handleConversationChangedUpdate(conversation: Conversation) {
Log.d(REPO_LOG, "Handling conversation changed update")
database.writeConversations(conversations)
}
private fun handleMessageAddedUpdate(message: Message) {
Log.d(REPO_LOG, "Handling messages added update")
database.writeMessages(listOf(message), message.conversation)
}
private fun outgoingMessageQueueMain() { private fun outgoingMessageQueueMain() {
Log.d(REPO_LOG, "Outgoing Message Queue Main") Log.d(REPO_LOG, "Outgoing Message Queue Main")
while (true) { while (true) {

View File

@@ -2,6 +2,7 @@ package net.buzzert.kordophone.backend
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
@@ -22,16 +23,15 @@ import java.util.concurrent.TimeUnit
class BackendTests { class BackendTests {
private fun liveRepository(host: String): Pair<ChatRepository, RetrofitAPIClient> { private fun liveRepository(host: String): Pair<ChatRepository, RetrofitAPIClient> {
val client = RetrofitAPIClient(URL(host)) val client = RetrofitAPIClient(URL(host))
val apiInterface = client.getAPIInterface()
val database = CachedChatDatabase.testDatabase() val database = CachedChatDatabase.testDatabase()
val repository = ChatRepository(apiInterface, database) val repository = ChatRepository(client, database)
return Pair(repository, client) return Pair(repository, client)
} }
private fun mockRepository(): Pair<ChatRepository, MockServer> { private fun mockRepository(): Pair<ChatRepository, MockServer> {
val mockServer = MockServer() val mockServer = MockServer()
val database = CachedChatDatabase.testDatabase() val database = CachedChatDatabase.testDatabase()
val repository = ChatRepository(mockServer.getAPIInterface(), database) val repository = ChatRepository(mockServer.getClient(), database)
return Pair(repository, mockServer) return Pair(repository, mockServer)
} }
@@ -51,7 +51,7 @@ class BackendTests {
// Add conversation to mock server // Add conversation to mock server
val inConversation = mockServer.addTestConversations(1).first() val inConversation = mockServer.addTestConversations(1).first()
val conversations = repository.fetchConversations() val conversations = repository.testingHarness().fetchConversations()
assertEquals(conversations.count(), 1) assertEquals(conversations.count(), 1)
val outConversation = conversations.first() val outConversation = conversations.first()
@@ -68,8 +68,8 @@ class BackendTests {
val inConversation = mockServer.addTestConversations(1).first() val inConversation = mockServer.addTestConversations(1).first()
val inMessage = mockServer.addTestMessages(1, inConversation).first() val inMessage = mockServer.addTestMessages(1, inConversation).first()
val conversations = repository.fetchConversations() val conversations = repository.testingHarness().fetchConversations()
val messages = repository.fetchMessages(conversations.first()) val messages = repository.testingHarness().fetchMessages(conversations.first())
assertEquals(messages.count(), 1) assertEquals(messages.count(), 1)
val outMessage = messages.first() val outMessage = messages.first()
@@ -143,6 +143,7 @@ class BackendTests {
// We got it. // We got it.
if (it.isNotEmpty()) { if (it.isNotEmpty()) {
println("bink")
updateLatch.countDown() updateLatch.countDown()
cancel() cancel()
} }
@@ -179,22 +180,24 @@ class BackendTests {
repo.synchronize() repo.synchronize()
// Set up flow watcher, asynchronously // Set up flow watcher, asynchronously
var messageAdded: Message? = null var messagesAdded: List<Message>? = null
val updateLatch = CountDownLatch(1) val updateLatch = CountDownLatch(1)
val job = launch { val job = launch {
println("Watching for messages to be added...") println("Watching for messages to be added...")
repo.messagesAdded.collect { repo.messagesChanged(conversation).collect {
println("Messages added: $it") println("Messages changed: $it")
messageAdded = it.first() if (it.isNotEmpty()) {
updateLatch.countDown() messagesAdded = it
cancel() updateLatch.countDown()
cancel()
}
} }
} }
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
// Add a message // Add a message
val message = mockServer.addTestMessages(1, conversation).first() val messages = mockServer.addTestMessages(10, conversation)
// Sync. This should trigger an update // Sync. This should trigger an update
println("Synchronizing...") println("Synchronizing...")
@@ -207,7 +210,7 @@ class BackendTests {
assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
// Check what we got back // Check what we got back
assertEquals(message, messageAdded) assertEquals(messages, messagesAdded)
} }
} }
@@ -232,8 +235,6 @@ class BackendTests {
} }
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
mockAPIClient.startWatchingForUpdates(this)
Thread.sleep(500) Thread.sleep(500)
// Add a conversation // Add a conversation
@@ -269,8 +270,6 @@ class BackendTests {
} }
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
mockAPIClient.startWatchingForUpdates(this)
Thread.sleep(500) Thread.sleep(500)
// Add a conversation // Add a conversation
@@ -287,4 +286,52 @@ class BackendTests {
assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
} }
} }
@Test
fun testEndToEndMessageUpdates() = runBlocking {
val (repo, mockServer) = mockRepository()
val conversation = mockServer.addTestConversations(1).first()
// Initial sync
repo.synchronize()
// We're going to generate a couple of messages...
val messagesToGenerate = 5
// Start watching for N updates
val updateLatch = CountDownLatch(messagesToGenerate)
val monitorJob = launch {
repo.messagesChanged(conversation).collect {
println("Message changed: $it")
if (it.isNotEmpty()) {
updateLatch.countDown()
}
if (updateLatch.count == 0L) {
repo.stopWatchingForUpdates()
cancel()
}
}
}
withContext(Dispatchers.IO) {
repo.beginWatchingForUpdates(this)
Thread.sleep(500)
// Should trigger an update
println("Adding messages")
mockServer.addTestMessages(messagesToGenerate, conversation)
monitorJob.join()
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
// Check num messages
val allMessages = repo.messagesForConversation(conversation)
assertEquals(messagesToGenerate, allMessages.count())
}
}
} }

View File

@@ -32,15 +32,16 @@ class DatabaseTests {
val conversation = MockServer.generateConversation() val conversation = MockServer.generateConversation()
db.writeConversations(listOf(conversation)) db.writeConversations(listOf(conversation))
val message = MockServer.generateMessage(conversation) var messages = listOf(
db.writeMessages(listOf(message), conversation) MockServer.generateMessage(conversation),
MockServer.generateMessage(conversation),
)
db.writeMessages(messages, conversation)
val readMessages = db.fetchMessages(conversation) val readMessages = db.fetchMessages(conversation)
assertEquals(readMessages.count(), 1)
val readMessage = readMessages[0] assertEquals(readMessages, messages)
assertEquals(readMessage, message) assertEquals(readMessages[0].conversation, conversation)
assertEquals(readMessage.conversation, conversation)
db.close() db.close()
} }

View File

@@ -39,6 +39,8 @@ class MockServer {
private val messages: MutableMap<String, MutableList<Message>> = mutableMapOf() private val messages: MutableMap<String, MutableList<Message>> = mutableMapOf()
private val _updateFlow: MutableSharedFlow<UpdateItem> = MutableSharedFlow() private val _updateFlow: MutableSharedFlow<UpdateItem> = MutableSharedFlow()
private val client = MockServerClient(this)
companion object { companion object {
fun generateMessage(parentConversation: Conversation): Message { fun generateMessage(parentConversation: Conversation): Message {
return Message( return Message(
@@ -64,7 +66,7 @@ class MockServer {
} }
fun getServer(): MockWebServer = MockWebServer() fun getServer(): MockWebServer = MockWebServer()
fun getClient(): MockServerClient = MockServerClient(this) fun getClient(): MockServerClient = client
fun getAPIInterface(): APIInterface = MockServerClient(this).getAPIInterface() fun getAPIInterface(): APIInterface = MockServerClient(this).getAPIInterface()
fun addConversation(conversation: Conversation) { fun addConversation(conversation: Conversation) {
@@ -79,6 +81,11 @@ class MockServer {
} }
} }
fun updateConversation(conversation: Conversation) {
conversations.removeAll { it.guid == conversation.guid }
addConversation(conversation)
}
fun addMessagesToConversation(conversation: Conversation, messages: List<Message>) { fun addMessagesToConversation(conversation: Conversation, messages: List<Message>) {
val guid = conversation.guid val guid = conversation.guid
this.messages[guid]?.addAll(messages) this.messages[guid]?.addAll(messages)
@@ -86,11 +93,15 @@ class MockServer {
conversation.lastMessagePreview = messages.last().text conversation.lastMessagePreview = messages.last().text
runBlocking { runBlocking {
_updateFlow.emit(UpdateItem( for (message in messages) {
sequence = updateMessageSequence++, _updateFlow.emit(
conversationChanged = conversation, UpdateItem(
messageAdded = messages.first() sequence = updateMessageSequence++,
)) conversationChanged = conversation,
messageAdded = message
)
)
}
} }
} }
@@ -160,10 +171,16 @@ class MockServerClient(private val server: MockServer): APIClient, WebSocketList
webServer.enqueue(MockResponse().withWebSocketUpgrade(this)) webServer.enqueue(MockResponse().withWebSocketUpgrade(this))
if (this.updateWatchJob == null) {
CoroutineScope(Job()).launch {
startWatchingForUpdates(this)
}
}
return OkHttpClient().newWebSocket(request, listener) return OkHttpClient().newWebSocket(request, listener)
} }
fun startWatchingForUpdates(scope: CoroutineScope) { private fun startWatchingForUpdates(scope: CoroutineScope) {
this.updateWatchJob = scope.launch { this.updateWatchJob = scope.launch {
server.updateFlow.collect { server.updateFlow.collect {
println("Mock WebSocket is sending a message") println("Mock WebSocket is sending a message")