Started working on synchronization
This commit is contained in:
@@ -2,6 +2,9 @@ package net.buzzert.kordophone.backend.db
|
||||
|
||||
import io.realm.kotlin.Realm
|
||||
import io.realm.kotlin.RealmConfiguration
|
||||
import io.realm.kotlin.UpdatePolicy
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import net.buzzert.kordophone.backend.db.model.Conversation
|
||||
import net.buzzert.kordophone.backend.db.model.Message
|
||||
import net.buzzert.kordophone.backend.db.model.toDatabaseConversation
|
||||
@@ -32,13 +35,19 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
}
|
||||
}
|
||||
|
||||
// Flow for watching changes to the database
|
||||
val changes: Flow<List<ModelConversation>>
|
||||
get() = realm.query(Conversation::class).find().asFlow().map {
|
||||
it.list.map { it.toConversation() }
|
||||
}
|
||||
|
||||
private val realm = Realm.open(realmConfig)
|
||||
|
||||
fun writeConversations(conversations: List<ModelConversation>) {
|
||||
val dbConversations = conversations.map { it.toDatabaseConversation() }
|
||||
realm.writeBlocking {
|
||||
dbConversations.forEach {
|
||||
copyToRealm(it)
|
||||
copyToRealm(it, updatePolicy = UpdatePolicy.ALL)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -64,7 +73,7 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
realm.close()
|
||||
}
|
||||
|
||||
private fun getConversationByGuid(guid: GUID): Conversation {
|
||||
fun getConversationByGuid(guid: GUID): Conversation {
|
||||
return realm.query(Conversation::class, "guid == '$guid'")
|
||||
.find()
|
||||
.first()
|
||||
|
||||
@@ -15,26 +15,29 @@ import java.util.Date
|
||||
|
||||
open class Conversation(
|
||||
@PrimaryKey
|
||||
var _id: String,
|
||||
var guid: GUID,
|
||||
|
||||
var displayName: String?,
|
||||
var participants: RealmList<String>,
|
||||
var date: RealmInstant,
|
||||
var unreadCount: Int,
|
||||
var lastMessagePreview: String,
|
||||
var guid: GUID,
|
||||
|
||||
var lastMessagePreview: String?,
|
||||
var lastMessage: Message?,
|
||||
|
||||
var messages: RealmList<Message>,
|
||||
): RealmObject
|
||||
{
|
||||
constructor(): this(
|
||||
_id = ObjectId().toString(),
|
||||
constructor() : this(
|
||||
guid = ObjectId().toString(),
|
||||
|
||||
displayName = null,
|
||||
participants = realmListOf<String>(),
|
||||
date = RealmInstant.now(),
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = "",
|
||||
guid = "",
|
||||
|
||||
lastMessagePreview = null,
|
||||
lastMessage = null,
|
||||
|
||||
messages = realmListOf<Message>()
|
||||
)
|
||||
@@ -46,7 +49,8 @@ open class Conversation(
|
||||
date = Date.from(date.toInstant()),
|
||||
unreadCount = unreadCount,
|
||||
guid = guid,
|
||||
lastMessagePreview = lastMessagePreview
|
||||
lastMessagePreview = lastMessagePreview,
|
||||
lastMessage = lastMessage?.toMessage(),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -59,6 +63,7 @@ fun ModelConversation.toDatabaseConversation(): Conversation {
|
||||
date = from.date.toInstant().toRealmInstant()
|
||||
unreadCount = from.unreadCount
|
||||
lastMessagePreview = from.lastMessagePreview
|
||||
lastMessage = from.lastMessage?.toDatabaseMessage()
|
||||
guid = from.guid
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,20 +10,18 @@ import org.mongodb.kbson.ObjectId
|
||||
import net.buzzert.kordophone.backend.model.Message as ModelMessage
|
||||
import java.util.Date
|
||||
|
||||
open class Message (
|
||||
open class Message(
|
||||
@PrimaryKey
|
||||
var _id: String,
|
||||
var guid: GUID,
|
||||
|
||||
var text: String,
|
||||
var guid: GUID,
|
||||
var sender: String?,
|
||||
var date: RealmInstant,
|
||||
): RealmObject
|
||||
{
|
||||
constructor(): this(
|
||||
_id = ObjectId().toString(),
|
||||
constructor() : this(
|
||||
guid = ObjectId().toString(),
|
||||
text = "",
|
||||
guid = "",
|
||||
sender = null,
|
||||
date = RealmInstant.now(),
|
||||
)
|
||||
|
||||
@@ -6,21 +6,24 @@ import java.util.Date
|
||||
typealias GUID = String
|
||||
|
||||
data class Conversation(
|
||||
@SerializedName("date")
|
||||
val date: Date,
|
||||
|
||||
@SerializedName("participantDisplayNames")
|
||||
val participants: List<String>,
|
||||
|
||||
@SerializedName("displayName")
|
||||
val displayName: String?,
|
||||
|
||||
@SerializedName("unreadCount")
|
||||
val unreadCount: Int,
|
||||
|
||||
@SerializedName("lastMessagePreview")
|
||||
val lastMessagePreview: String,
|
||||
|
||||
@SerializedName("guid")
|
||||
val guid: GUID,
|
||||
|
||||
@SerializedName("date")
|
||||
var date: Date,
|
||||
|
||||
@SerializedName("participantDisplayNames")
|
||||
var participants: List<String>,
|
||||
|
||||
@SerializedName("displayName")
|
||||
var displayName: String?,
|
||||
|
||||
@SerializedName("unreadCount")
|
||||
var unreadCount: Int,
|
||||
|
||||
@SerializedName("lastMessagePreview")
|
||||
var lastMessagePreview: String?,
|
||||
|
||||
@SerializedName("lastMessage")
|
||||
var lastMessage: Message?,
|
||||
)
|
||||
|
||||
@@ -4,12 +4,12 @@ import com.google.gson.annotations.SerializedName
|
||||
import java.util.Date
|
||||
|
||||
data class Message(
|
||||
@SerializedName("text")
|
||||
val text: String,
|
||||
|
||||
@SerializedName("guid")
|
||||
val guid: GUID,
|
||||
|
||||
@SerializedName("text")
|
||||
val text: String,
|
||||
|
||||
@SerializedName("sender")
|
||||
val sender: String?, // optional: nil means "from me"
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ import retrofit2.http.Body
|
||||
import retrofit2.http.GET
|
||||
import retrofit2.http.POST
|
||||
import retrofit2.http.Query
|
||||
import java.lang.Error
|
||||
import java.lang.Exception
|
||||
|
||||
data class SendMessageRequest(
|
||||
@SerializedName("guid")
|
||||
@@ -34,4 +36,14 @@ interface APIInterface {
|
||||
|
||||
@POST("/sendMessage")
|
||||
suspend fun sendMessage(@Body request: SendMessageRequest): Response<Void>
|
||||
}
|
||||
|
||||
class ResponseDecodeError(val response: ResponseBody): Exception()
|
||||
|
||||
fun <T> Response<T>.bodyOnSuccessOrThrow(): T {
|
||||
if (isSuccessful) {
|
||||
return body()!!
|
||||
}
|
||||
|
||||
throw ResponseDecodeError(errorBody()!!)
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package net.buzzert.kordophone.backend.server
|
||||
|
||||
import android.util.Log
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import net.buzzert.kordophone.backend.db.CachedChatDatabase
|
||||
import net.buzzert.kordophone.backend.events.MessageDeliveredEvent
|
||||
@@ -19,8 +20,17 @@ class ChatRepository(
|
||||
private val apiInterface: APIInterface,
|
||||
private val database: CachedChatDatabase,
|
||||
) {
|
||||
// All (Cached) Conversations
|
||||
val conversations: List<Conversation>
|
||||
get() = database.fetchConversations()
|
||||
|
||||
// Channel that's signaled when an outgoing message is delivered.
|
||||
val messageDeliveredChannel = Channel<MessageDeliveredEvent>()
|
||||
|
||||
// Changes Flow
|
||||
val changes: Flow<List<Conversation>>
|
||||
get() = database.changes
|
||||
|
||||
private data class OutgoingMessageInfo (
|
||||
val message: Message,
|
||||
val conversation: Conversation,
|
||||
@@ -35,11 +45,11 @@ class ChatRepository(
|
||||
}
|
||||
|
||||
suspend fun fetchConversations(): List<Conversation> {
|
||||
return apiInterface.getConversations().body()!!
|
||||
return apiInterface.getConversations().bodyOnSuccessOrThrow()
|
||||
}
|
||||
|
||||
suspend fun fetchMessages(conversation: Conversation): List<Message> {
|
||||
return apiInterface.getMessages(conversation.guid).body()!!
|
||||
return apiInterface.getMessages(conversation.guid).bodyOnSuccessOrThrow()
|
||||
}
|
||||
|
||||
suspend fun enqueueOutgoingMessage(message: Message, conversation: Conversation): GUID {
|
||||
@@ -56,6 +66,19 @@ class ChatRepository(
|
||||
return guid
|
||||
}
|
||||
|
||||
fun conversationForGuid(guid: GUID): Conversation {
|
||||
return database.getConversationByGuid(guid).toConversation()
|
||||
}
|
||||
|
||||
suspend fun synchronize() {
|
||||
Log.d(REPO_LOG, "Synchronizing conversations")
|
||||
|
||||
val conversations = fetchConversations()
|
||||
database.writeConversations(conversations)
|
||||
|
||||
// TODO: Sync messages too? How many?
|
||||
}
|
||||
|
||||
fun close() {
|
||||
database.close()
|
||||
}
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
package net.buzzert.kordophone.backend
|
||||
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
import net.buzzert.kordophone.backend.db.CachedChatDatabase
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import net.buzzert.kordophone.backend.server.APIClient
|
||||
import net.buzzert.kordophone.backend.server.APIInterface
|
||||
import net.buzzert.kordophone.backend.server.ChatRepository
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Test
|
||||
import java.net.URL
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class BackendTests {
|
||||
private fun liveRepository(host: String): ChatRepository {
|
||||
@@ -86,4 +89,76 @@ class BackendTests {
|
||||
|
||||
repository.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConversationSynchronization() = runBlocking {
|
||||
val (repo, mockServer) = mockRepository()
|
||||
|
||||
// Add some test convos
|
||||
val conversations = mockServer.addTestConversations(10)
|
||||
|
||||
// Sync
|
||||
repo.synchronize()
|
||||
|
||||
// Check our count.
|
||||
assertEquals(repo.conversations.count(), 10)
|
||||
|
||||
// Sync again: let's ensure we're de-duplicating conversations.
|
||||
repo.synchronize()
|
||||
|
||||
// Should be no change...
|
||||
assertEquals(repo.conversations.count(), 10)
|
||||
|
||||
// Say unread count + lastMessage preview changes on server.
|
||||
val someConversation = conversations.first().apply {
|
||||
lastMessagePreview = "COOL"
|
||||
unreadCount = 2
|
||||
}
|
||||
|
||||
// Sync again
|
||||
repo.synchronize()
|
||||
|
||||
// Make sure change is reflected
|
||||
val readConversation = repo.conversationForGuid(someConversation.guid)
|
||||
assertEquals(readConversation.lastMessagePreview, "COOL")
|
||||
assertEquals(readConversation.unreadCount, 2)
|
||||
|
||||
repo.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConversationFlowUpdates() = runBlocking {
|
||||
val (repo, mockServer) = mockRepository()
|
||||
|
||||
// Set up flow watcher, asynchronously
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
println("Watching for conversations changes...")
|
||||
repo.changes.collect {
|
||||
println("Changed conversations: $it")
|
||||
|
||||
// We got it.
|
||||
if (it.isNotEmpty()) {
|
||||
updateLatch.countDown()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
// Add a conversation
|
||||
println("Adding conversation")
|
||||
mockServer.addTestConversations(1)
|
||||
|
||||
// Sync. This should trigger an update
|
||||
println("Synchronizing...")
|
||||
repo.synchronize()
|
||||
|
||||
// Wait for the coroutine that is collecting the flow to finish
|
||||
job.join()
|
||||
|
||||
// Ensure the updates have been processed before proceeding
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -43,4 +43,32 @@ class DatabaseTests {
|
||||
|
||||
db.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConversationModification() {
|
||||
val db = CachedChatDatabase.testDatabase()
|
||||
|
||||
var conversation = MockServer.generateConversation().apply {
|
||||
displayName = "HooBoy"
|
||||
}
|
||||
|
||||
db.writeConversations(listOf(conversation))
|
||||
|
||||
val readConversation = db.fetchConversations().first()
|
||||
assertEquals(conversation.displayName, "HooBoy")
|
||||
|
||||
// Change display name
|
||||
conversation.displayName = "wow"
|
||||
|
||||
// Write back
|
||||
db.writeConversations(listOf(conversation))
|
||||
|
||||
val nowConversations = db.fetchConversations()
|
||||
|
||||
// Make sure we didn't duplicate
|
||||
assertEquals(nowConversations.count(), 1)
|
||||
|
||||
// Make sure our new name was written
|
||||
assertEquals(nowConversations.first().displayName, "wow")
|
||||
}
|
||||
}
|
||||
@@ -27,12 +27,14 @@ class MockServer: APIInterface {
|
||||
}
|
||||
|
||||
fun generateConversation(): Conversation {
|
||||
val lastMessage = generateMessage()
|
||||
return Conversation(
|
||||
date = Date(),
|
||||
participants = listOf("james@magahern.com"),
|
||||
displayName = null,
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = "This is a test!",
|
||||
lastMessagePreview = lastMessage.text,
|
||||
lastMessage = lastMessage,
|
||||
guid = UUID.randomUUID().toString()
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user