Implements message loading and display
This commit is contained in:
@@ -57,7 +57,7 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
.first()
|
||||
.messages
|
||||
.asFlow()
|
||||
.map { it.list.map { it.toMessage(conversation) } }
|
||||
.map { it.set.map { it.toMessage(conversation) } }
|
||||
}
|
||||
|
||||
fun updateConversations(incomingConversations: List<ModelConversation>) = realm.writeBlocking {
|
||||
@@ -111,15 +111,24 @@ class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
return items.map { it.toConversation() }
|
||||
}
|
||||
|
||||
fun writeMessages(messages: List<ModelMessage>, conversation: ModelConversation) {
|
||||
fun writeMessages(messages: List<ModelMessage>, conversation: ModelConversation, outgoing: Boolean = false) {
|
||||
val dbConversation = getManagedConversationByGuid(conversation.guid)
|
||||
realm.writeBlocking {
|
||||
val dbMessages = messages
|
||||
.map { it.toDatabaseMessage() }
|
||||
.map { it.toDatabaseMessage(outgoing = outgoing) }
|
||||
.map { copyToRealm(it, updatePolicy = UpdatePolicy.ALL) }
|
||||
|
||||
val obj = findLatest(dbConversation)
|
||||
obj!!.messages.addAll(dbMessages)
|
||||
findLatest(dbConversation)?.messages?.addAll(dbMessages)
|
||||
}
|
||||
}
|
||||
|
||||
fun clearOutgoingMessages() {
|
||||
realm.query(Message::class, "isOutgoing == true").find().forEach { message ->
|
||||
realm.writeBlocking {
|
||||
findLatest(message)?.let {
|
||||
delete(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,10 +2,12 @@ package net.buzzert.kordophone.backend.db.model
|
||||
|
||||
import io.realm.kotlin.Realm
|
||||
import io.realm.kotlin.ext.realmListOf
|
||||
import io.realm.kotlin.ext.realmSetOf
|
||||
import io.realm.kotlin.ext.toRealmList
|
||||
import io.realm.kotlin.types.RealmInstant
|
||||
import io.realm.kotlin.types.RealmList
|
||||
import io.realm.kotlin.types.RealmObject
|
||||
import io.realm.kotlin.types.RealmSet
|
||||
import io.realm.kotlin.types.annotations.PrimaryKey
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import org.mongodb.kbson.ObjectId
|
||||
@@ -23,7 +25,7 @@ open class Conversation(
|
||||
var unreadCount: Int,
|
||||
|
||||
var lastMessagePreview: String?,
|
||||
var messages: RealmList<Message>,
|
||||
var messages: RealmSet<Message>,
|
||||
): RealmObject
|
||||
{
|
||||
constructor() : this(
|
||||
@@ -35,7 +37,7 @@ open class Conversation(
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = null,
|
||||
|
||||
messages = realmListOf<Message>()
|
||||
messages = realmSetOf<Message>()
|
||||
)
|
||||
|
||||
fun toConversation(): ModelConversation {
|
||||
|
||||
@@ -22,6 +22,7 @@ open class Message(
|
||||
var date: RealmInstant,
|
||||
|
||||
var conversationGUID: GUID,
|
||||
var isOutgoing: Boolean,
|
||||
): RealmObject
|
||||
{
|
||||
constructor() : this(
|
||||
@@ -30,6 +31,7 @@ open class Message(
|
||||
sender = null,
|
||||
date = RealmInstant.now(),
|
||||
conversationGUID = ObjectId().toString(),
|
||||
isOutgoing = false,
|
||||
)
|
||||
|
||||
fun toMessage(parentConversation: ModelConversation): ModelMessage {
|
||||
@@ -43,7 +45,7 @@ open class Message(
|
||||
}
|
||||
}
|
||||
|
||||
fun ModelMessage.toDatabaseMessage(): Message {
|
||||
fun ModelMessage.toDatabaseMessage(outgoing: Boolean = false): Message {
|
||||
val from = this
|
||||
return Message().apply {
|
||||
text = from.text
|
||||
@@ -51,5 +53,6 @@ fun ModelMessage.toDatabaseMessage(): Message {
|
||||
sender = from.sender
|
||||
date = from.date.toInstant().toRealmInstant()
|
||||
conversationGUID = from.conversation.guid
|
||||
isOutgoing = outgoing
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package net.buzzert.kordophone.backend.model
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
typealias GUID = String
|
||||
|
||||
@@ -27,6 +28,19 @@ data class Conversation(
|
||||
@SerializedName("lastMessage")
|
||||
var lastMessage: Message?,
|
||||
) {
|
||||
companion object {
|
||||
fun generate(): Conversation {
|
||||
return Conversation(
|
||||
guid = UUID.randomUUID().toString(),
|
||||
date = Date(),
|
||||
participants = listOf("foo@foo.com"),
|
||||
displayName = null,
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = null,
|
||||
lastMessage = null,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fun formattedDisplayName(): String {
|
||||
return displayName ?: participants.joinToString(", ")
|
||||
|
||||
@@ -2,6 +2,7 @@ package net.buzzert.kordophone.backend.model
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
data class Message(
|
||||
@SerializedName("guid")
|
||||
@@ -19,6 +20,18 @@ data class Message(
|
||||
@Transient
|
||||
var conversation: Conversation,
|
||||
) {
|
||||
companion object {
|
||||
fun generate(text: String, conversation: Conversation = Conversation.generate(), sender: String? = null): Message {
|
||||
return Message(
|
||||
guid = UUID.randomUUID().toString(),
|
||||
text = text,
|
||||
sender = sender,
|
||||
date = Date(),
|
||||
conversation = conversation,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "Message(guid=$guid, text=$text, sender=$sender, date=$date, parent=${conversation.guid})"
|
||||
}
|
||||
@@ -35,4 +48,13 @@ data class Message(
|
||||
conversation.guid == o.conversation.guid
|
||||
)
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = guid.hashCode()
|
||||
result = 31 * result + text.hashCode()
|
||||
result = 31 * result + (sender?.hashCode() ?: 0)
|
||||
result = 31 * result + date.hashCode()
|
||||
result = 31 * result + conversation.guid.hashCode()
|
||||
return result
|
||||
}
|
||||
}
|
||||
@@ -5,8 +5,12 @@ import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.consumeEach
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
@@ -34,7 +38,8 @@ class ChatRepository(
|
||||
get() = database.fetchConversations()
|
||||
|
||||
// Channel that's signaled when an outgoing message is delivered.
|
||||
val messageDeliveredChannel = Channel<MessageDeliveredEvent>()
|
||||
val messageDeliveredChannel: SharedFlow<MessageDeliveredEvent>
|
||||
get() = _messageDeliveredChannel
|
||||
|
||||
// Changes Flow
|
||||
val conversationChanges: Flow<List<Conversation>>
|
||||
@@ -66,6 +71,7 @@ class ChatRepository(
|
||||
private val apiInterface = apiClient.getAPIInterface()
|
||||
private val outgoingMessageQueue: ArrayBlockingQueue<OutgoingMessageInfo> = ArrayBlockingQueue(16)
|
||||
private var outgoingMessageThread: Thread? = null
|
||||
private val _messageDeliveredChannel = MutableSharedFlow<MessageDeliveredEvent>()
|
||||
|
||||
private val updateMonitor = UpdateMonitor(apiClient)
|
||||
private var updateWatchJob: Job? = null
|
||||
@@ -83,6 +89,9 @@ class ChatRepository(
|
||||
launch {
|
||||
updateMonitor.messageAdded.collect { handleMessageAddedUpdate(it) }
|
||||
}
|
||||
launch {
|
||||
messageDeliveredChannel.collectLatest { handleMessageDelivered(it) }
|
||||
}
|
||||
}
|
||||
|
||||
updateMonitor.beginMonitoringUpdates()
|
||||
@@ -124,15 +133,27 @@ class ChatRepository(
|
||||
val serverConversations = fetchConversations()
|
||||
database.updateConversations(serverConversations)
|
||||
|
||||
// Delete outgoing conversations
|
||||
// This is an unfortunate limitation in that we don't know what outgoing GUIDs are going to
|
||||
// be before we send them.
|
||||
// TODO: Keep this in mind when syncing messages after a certain GUID. The outgoing GUIDs are fake.
|
||||
database.clearOutgoingMessages()
|
||||
|
||||
// Sync top N number of conversations' message content
|
||||
Log.d(REPO_LOG, "Synchronizing messages")
|
||||
val sortedConversations = conversations.sortedBy { it.date }.reversed()
|
||||
for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) {
|
||||
val messages = fetchMessages(conversation, limit = 15)
|
||||
database.writeMessages(messages, conversation)
|
||||
synchronizeConversation(conversation)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun synchronizeConversation(conversation: Conversation) {
|
||||
// TODO: Should only fetch messages after the last GUID we know about.
|
||||
// But keep in mind that outgoing message GUIDs are fake...
|
||||
val messages = fetchMessages(conversation, limit = 15)
|
||||
database.writeMessages(messages, conversation)
|
||||
}
|
||||
|
||||
fun close() {
|
||||
database.close()
|
||||
}
|
||||
@@ -164,6 +185,14 @@ class ChatRepository(
|
||||
database.writeMessages(listOf(message), message.conversation)
|
||||
}
|
||||
|
||||
private fun handleMessageDelivered(event: MessageDeliveredEvent) {
|
||||
Log.d(REPO_LOG, "Handling successful delivery event")
|
||||
|
||||
// Unfortunate protocol reality: the server doesn't tell us about new messages that are from us,
|
||||
// so we have to explicitly handle this like a messageAddedUpdate.
|
||||
database.writeMessages(listOf(event.message), event.conversation, outgoing = true)
|
||||
}
|
||||
|
||||
private fun outgoingMessageQueueMain() {
|
||||
Log.d(REPO_LOG, "Outgoing Message Queue Main")
|
||||
while (true) {
|
||||
@@ -185,7 +214,7 @@ class ChatRepository(
|
||||
|
||||
if (result.isSuccessful) {
|
||||
Log.d(REPO_LOG, "Successfully sent message.")
|
||||
messageDeliveredChannel.send(MessageDeliveredEvent(guid, message, conversation))
|
||||
_messageDeliveredChannel.emit(MessageDeliveredEvent(guid, message, conversation))
|
||||
} else {
|
||||
Log.e(REPO_LOG, "Error sending message. Enqueuing for retry.")
|
||||
outgoingMessageQueue.add(it)
|
||||
|
||||
@@ -16,7 +16,7 @@ import okio.ByteString
|
||||
import retrofit2.converter.gson.GsonConverterFactory
|
||||
import java.lang.reflect.Type
|
||||
|
||||
const val UPMON_LOG: String = "ChatRepository"
|
||||
const val UPMON_LOG: String = "UpdateMonitor"
|
||||
|
||||
class UpdateMonitor(private val client: APIClient) : WebSocketListener() {
|
||||
// Flow for getting conversation changed notifications
|
||||
|
||||
@@ -3,6 +3,7 @@ package net.buzzert.kordophone.backend
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
@@ -87,7 +88,7 @@ class BackendTests {
|
||||
|
||||
val guid = repository.enqueueOutgoingMessage(outgoingMessage, conversation)
|
||||
|
||||
val event = repository.messageDeliveredChannel.receive()
|
||||
val event = repository.messageDeliveredChannel.first()
|
||||
assertEquals(event.guid, guid)
|
||||
assertEquals(event.message.text, outgoingMessage.text)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user