Better database syncing... maybe
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
package net.buzzert.kordophone.backend.db
|
||||
|
||||
import io.realm.kotlin.MutableRealm
|
||||
import io.realm.kotlin.Realm
|
||||
import io.realm.kotlin.RealmConfiguration
|
||||
import io.realm.kotlin.UpdatePolicy
|
||||
import io.realm.kotlin.ext.toRealmList
|
||||
import io.realm.kotlin.query.find
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
@@ -44,7 +46,8 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
// Flow for watching changes to the database
|
||||
val conversationChanges: Flow<List<ModelConversation>>
|
||||
get() = realm.query(Conversation::class).find().asFlow().map {
|
||||
it.list.map { it.toConversation() }
|
||||
realm.copyFromRealm(it.list)
|
||||
.map { it.toConversation() }
|
||||
}
|
||||
|
||||
// Flow for watching for message changes for a given conversation
|
||||
@@ -57,37 +60,45 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
.map { it.list.map { it.toMessage(conversation) } }
|
||||
}
|
||||
|
||||
fun writeConversations(conversations: List<ModelConversation>) = realm.writeBlocking {
|
||||
val dbConversations = conversations
|
||||
// Convert to database conversations
|
||||
.map { it.toDatabaseConversation() }
|
||||
fun updateConversations(incomingConversations: List<ModelConversation>) = realm.writeBlocking {
|
||||
val incomingDatabaseConversations = incomingConversations.map { it.toDatabaseConversation() }
|
||||
|
||||
// Look for existing conversations, if applicable
|
||||
.map {
|
||||
try {
|
||||
val existingConvo = getConversationByGuid(it.guid)
|
||||
var deletedConversations = realm.query(Conversation::class).find()
|
||||
.minus(incomingDatabaseConversations)
|
||||
|
||||
// Update existing record
|
||||
findLatest(existingConvo)?.apply {
|
||||
displayName = it.displayName
|
||||
participants = it.participants
|
||||
date = it.date
|
||||
unreadCount = it.unreadCount
|
||||
} ?: existingConvo
|
||||
} catch (e: NoSuchElementException) {
|
||||
// This means object is unmanaged (i.e. it's new)
|
||||
it
|
||||
}
|
||||
deletedConversations.forEach { conversation ->
|
||||
findLatest(conversation)?.let {
|
||||
delete(it)
|
||||
}
|
||||
}
|
||||
|
||||
dbConversations.forEach {
|
||||
copyToRealm(it, updatePolicy = UpdatePolicy.ALL)
|
||||
writeManagedConversations(this, incomingDatabaseConversations)
|
||||
}
|
||||
|
||||
fun writeConversations(conversations: List<ModelConversation>) = realm.writeBlocking {
|
||||
writeManagedConversations(this, conversations.map { it.toDatabaseConversation() })
|
||||
}
|
||||
|
||||
private fun writeManagedConversations(mutableRealm: MutableRealm, conversations: List<Conversation>) {
|
||||
conversations.forEach {conversation ->
|
||||
try {
|
||||
val managedConversation = getManagedConversationByGuid(conversation.guid)
|
||||
mutableRealm.findLatest(managedConversation)?.apply {
|
||||
displayName = conversation.displayName
|
||||
participants = conversation.participants
|
||||
date = conversation.date
|
||||
unreadCount = conversation.unreadCount
|
||||
}
|
||||
} catch (e: NoSuchElementException) {
|
||||
// Conversation does not exist. Copy it to the realm.
|
||||
mutableRealm.copyToRealm(conversation, updatePolicy = UpdatePolicy.ALL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun deleteConversations(conversations: List<ModelConversation>) = realm.writeBlocking {
|
||||
conversations.forEach { inConversation ->
|
||||
val conversation = getConversationByGuid(inConversation.guid)
|
||||
val conversation = getManagedConversationByGuid(inConversation.guid)
|
||||
findLatest(conversation)?.let {
|
||||
delete(it)
|
||||
}
|
||||
@@ -95,12 +106,13 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
}
|
||||
|
||||
fun fetchConversations(): List<ModelConversation> {
|
||||
val items = realm.query(Conversation::class).find()
|
||||
val itemResults = realm.query(Conversation::class).find()
|
||||
val items = realm.copyFromRealm(itemResults)
|
||||
return items.map { it.toConversation() }
|
||||
}
|
||||
|
||||
fun writeMessages(messages: List<ModelMessage>, conversation: ModelConversation) {
|
||||
val dbConversation = getConversationByGuid(conversation.guid)
|
||||
val dbConversation = getManagedConversationByGuid(conversation.guid)
|
||||
realm.writeBlocking {
|
||||
val dbMessages = messages
|
||||
.map { it.toDatabaseMessage() }
|
||||
@@ -120,9 +132,13 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
realm.close()
|
||||
}
|
||||
|
||||
fun getConversationByGuid(guid: GUID): Conversation {
|
||||
private fun getManagedConversationByGuid(guid: GUID): Conversation {
|
||||
return realm.query(Conversation::class, "guid == '$guid'")
|
||||
.find()
|
||||
.first()
|
||||
}
|
||||
|
||||
fun getConversationByGuid(guid: GUID): Conversation {
|
||||
return realm.copyFromRealm(getManagedConversationByGuid(guid))
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ open class Conversation(
|
||||
var date: RealmInstant,
|
||||
var unreadCount: Int,
|
||||
|
||||
var lastMessagePreview: String?,
|
||||
var messages: RealmList<Message>,
|
||||
): RealmObject
|
||||
{
|
||||
@@ -32,6 +33,7 @@ open class Conversation(
|
||||
participants = realmListOf<String>(),
|
||||
date = RealmInstant.now(),
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = null,
|
||||
|
||||
messages = realmListOf<Message>()
|
||||
)
|
||||
@@ -43,17 +45,29 @@ open class Conversation(
|
||||
date = Date.from(date.toInstant()),
|
||||
unreadCount = unreadCount,
|
||||
guid = guid,
|
||||
lastMessagePreview = lastMessagePreview,
|
||||
lastMessage = null,
|
||||
lastMessagePreview = null,
|
||||
)
|
||||
|
||||
if (messages.isNotEmpty()) {
|
||||
conversation.lastMessage = messages.last().toMessage(conversation)
|
||||
conversation.lastMessagePreview = messages.last().text
|
||||
val lastMessage = sortedMessages().last()
|
||||
conversation.lastMessage = lastMessage.toMessage(conversation)
|
||||
conversation.lastMessagePreview = lastMessage.text
|
||||
}
|
||||
|
||||
return conversation
|
||||
}
|
||||
|
||||
private fun sortedMessages(): List<Message> {
|
||||
return messages.sortedBy { it.date }
|
||||
}
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (other == null || javaClass != other.javaClass) return false
|
||||
|
||||
val o = other as Conversation
|
||||
return guid == o.guid
|
||||
}
|
||||
}
|
||||
|
||||
fun ModelConversation.toDatabaseConversation(): Conversation {
|
||||
@@ -63,6 +77,7 @@ fun ModelConversation.toDatabaseConversation(): Conversation {
|
||||
participants = from.participants.toRealmList()
|
||||
date = from.date.toInstant().toRealmInstant()
|
||||
unreadCount = from.unreadCount
|
||||
lastMessagePreview = from.lastMessagePreview
|
||||
guid = from.guid
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ data class Message(
|
||||
text == o.text &&
|
||||
sender == o.sender &&
|
||||
date == o.date &&
|
||||
conversation == o.conversation
|
||||
conversation.guid == o.conversation.guid
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package net.buzzert.kordophone.backend.server
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import okhttp3.ResponseBody
|
||||
import retrofit2.Call
|
||||
@@ -32,7 +33,12 @@ interface APIInterface {
|
||||
suspend fun getConversations(): Response<List<Conversation>>
|
||||
|
||||
@GET("/messages")
|
||||
suspend fun getMessages(@Query("guid") conversationGUID: String): Response<List<Message>>
|
||||
suspend fun getMessages(
|
||||
@Query("guid") conversationGUID: String,
|
||||
@Query("limit") limit: Int? = null,
|
||||
@Query("beforeMessageGUID") beforeMessageGUID: GUID? = null,
|
||||
@Query("afterMessageGUID") afterMessageGUID: GUID? = null,
|
||||
): Response<List<Message>>
|
||||
|
||||
@POST("/sendMessage")
|
||||
suspend fun sendMessage(@Body request: SendMessageRequest): Response<Void>
|
||||
|
||||
@@ -122,16 +122,13 @@ class ChatRepository(
|
||||
|
||||
// Sync conversations
|
||||
val serverConversations = fetchConversations()
|
||||
val deletedConversations = conversations.minus(serverConversations)
|
||||
|
||||
database.deleteConversations(deletedConversations)
|
||||
database.writeConversations(serverConversations)
|
||||
database.updateConversations(serverConversations)
|
||||
|
||||
// Sync top N number of conversations' message content
|
||||
Log.d(REPO_LOG, "Synchronizing messages")
|
||||
val sortedConversations = conversations.sortedBy { it.date }.reversed()
|
||||
for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) {
|
||||
val messages = fetchMessages(conversation)
|
||||
val messages = fetchMessages(conversation, limit = 15)
|
||||
database.writeMessages(messages, conversation)
|
||||
}
|
||||
}
|
||||
@@ -146,15 +143,20 @@ class ChatRepository(
|
||||
return apiInterface.getConversations().bodyOnSuccessOrThrow()
|
||||
}
|
||||
|
||||
private suspend fun fetchMessages(conversation: Conversation): List<Message> {
|
||||
return apiInterface.getMessages(conversation.guid)
|
||||
private suspend fun fetchMessages(
|
||||
conversation: Conversation,
|
||||
limit: Int? = null,
|
||||
before: Message? = null,
|
||||
after: Message? = null,
|
||||
): List<Message> {
|
||||
return apiInterface.getMessages(conversation.guid, limit, before?.guid, after?.guid)
|
||||
.bodyOnSuccessOrThrow()
|
||||
.onEach { it.conversation = conversation }
|
||||
}
|
||||
|
||||
private fun handleConversationChangedUpdate(conversation: Conversation) {
|
||||
Log.d(REPO_LOG, "Handling conversation changed update")
|
||||
database.writeConversations(conversations)
|
||||
database.writeConversations(listOf(conversation))
|
||||
}
|
||||
|
||||
private fun handleMessageAddedUpdate(message: Message) {
|
||||
|
||||
@@ -105,13 +105,13 @@ class BackendTests {
|
||||
repo.synchronize()
|
||||
|
||||
// Check our count.
|
||||
assertEquals(repo.conversations.count(), 10)
|
||||
assertEquals(10, repo.conversations.count())
|
||||
|
||||
// Sync again: let's ensure we're de-duplicating conversations.
|
||||
repo.synchronize()
|
||||
|
||||
// Should be no change...
|
||||
assertEquals(repo.conversations.count(), 10)
|
||||
assertEquals(10, repo.conversations.count())
|
||||
|
||||
// Say unread count + lastMessage preview changes on server.
|
||||
val someConversation = conversations.first().apply {
|
||||
|
||||
@@ -41,7 +41,7 @@ class DatabaseTests {
|
||||
val readMessages = db.fetchMessages(conversation)
|
||||
|
||||
assertEquals(readMessages, messages)
|
||||
assertEquals(readMessages[0].conversation, conversation)
|
||||
assertEquals(readMessages[0].conversation.guid, conversation.guid)
|
||||
|
||||
db.close()
|
||||
}
|
||||
@@ -72,5 +72,7 @@ class DatabaseTests {
|
||||
|
||||
// Make sure our new name was written
|
||||
assertEquals(nowConversations.first().displayName, "wow")
|
||||
|
||||
db.close()
|
||||
}
|
||||
}
|
||||
@@ -222,7 +222,12 @@ class MockServerInterface(private val server: MockServer): APIInterface {
|
||||
return Response.success(server.conversations)
|
||||
}
|
||||
|
||||
override suspend fun getMessages(conversationGUID: String): Response<List<Message>> {
|
||||
override suspend fun getMessages(
|
||||
conversationGUID: String,
|
||||
limit: Int?,
|
||||
beforeMessageGUID: GUID?,
|
||||
afterMessageGUID: GUID?
|
||||
): Response<List<Message>> {
|
||||
val messages = server.getMessagesForConversationGUID(conversationGUID)
|
||||
|
||||
return if (messages != null) {
|
||||
|
||||
Reference in New Issue
Block a user