Add 'android/' from commit '5d26ea956906cd31a6cc37e79b0a4cac77b3118b'
git-subtree-dir: android git-subtree-mainline:7fe2701272git-subtree-split:5d26ea9569
This commit is contained in:
1
android/backend/.gitignore
vendored
Normal file
1
android/backend/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
/build
|
||||
58
android/backend/build.gradle
Normal file
58
android/backend/build.gradle
Normal file
@@ -0,0 +1,58 @@
|
||||
plugins {
|
||||
id 'com.android.library'
|
||||
id 'org.jetbrains.kotlin.android'
|
||||
id 'io.realm.kotlin'
|
||||
}
|
||||
|
||||
android {
|
||||
namespace 'net.buzzert.kordophone.backend'
|
||||
compileSdk 33
|
||||
|
||||
defaultConfig {
|
||||
minSdk 30
|
||||
targetSdk 33
|
||||
|
||||
testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
|
||||
consumerProguardFiles "consumer-rules.pro"
|
||||
}
|
||||
|
||||
buildTypes {
|
||||
release {
|
||||
minifyEnabled false
|
||||
proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
|
||||
}
|
||||
}
|
||||
compileOptions {
|
||||
sourceCompatibility JavaVersion.VERSION_1_8
|
||||
targetCompatibility JavaVersion.VERSION_1_8
|
||||
}
|
||||
kotlinOptions {
|
||||
jvmTarget = JavaVersion.VERSION_1_8.toString()
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
|
||||
implementation 'androidx.core:core-ktx:1.10.1'
|
||||
implementation 'androidx.appcompat:appcompat:1.6.1'
|
||||
implementation 'com.google.android.material:material:1.9.0'
|
||||
implementation 'androidx.core:core-ktx:1.10.1'
|
||||
|
||||
testImplementation 'junit:junit:4.13.2'
|
||||
androidTestImplementation 'androidx.test.ext:junit:1.1.5'
|
||||
androidTestImplementation 'androidx.test.espresso:espresso-core:3.5.1'
|
||||
|
||||
// Third-party
|
||||
implementation 'com.squareup.retrofit2:retrofit:2.9.0'
|
||||
implementation 'com.squareup.retrofit2:converter-gson:2.9.0'
|
||||
implementation 'com.google.code.gson:gson:2.9.0'
|
||||
implementation 'com.auth0.android:jwtdecode:2.0.2'
|
||||
|
||||
// Realm
|
||||
implementation "io.realm.kotlin:library-base:${realm_version}"
|
||||
|
||||
// https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core
|
||||
implementation group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.7.3', ext: 'pom'
|
||||
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.3'
|
||||
testImplementation 'com.squareup.okhttp3:mockwebserver:4.9.1'
|
||||
}
|
||||
BIN
android/backend/chat-cache-test
Normal file
BIN
android/backend/chat-cache-test
Normal file
Binary file not shown.
BIN
android/backend/chat-cache-test.lock
Normal file
BIN
android/backend/chat-cache-test.lock
Normal file
Binary file not shown.
0
android/backend/consumer-rules.pro
Normal file
0
android/backend/consumer-rules.pro
Normal file
21
android/backend/proguard-rules.pro
vendored
Normal file
21
android/backend/proguard-rules.pro
vendored
Normal file
@@ -0,0 +1,21 @@
|
||||
# Add project specific ProGuard rules here.
|
||||
# You can control the set of applied configuration files using the
|
||||
# proguardFiles setting in build.gradle.
|
||||
#
|
||||
# For more details, see
|
||||
# http://developer.android.com/guide/developing/tools/proguard.html
|
||||
|
||||
# If your project uses WebView with JS, uncomment the following
|
||||
# and specify the fully qualified class name to the JavaScript interface
|
||||
# class:
|
||||
#-keepclassmembers class fqcn.of.javascript.interface.for.webview {
|
||||
# public *;
|
||||
#}
|
||||
|
||||
# Uncomment this to preserve the line number information for
|
||||
# debugging stack traces.
|
||||
#-keepattributes SourceFile,LineNumberTable
|
||||
|
||||
# If you keep the line number information, uncomment this to
|
||||
# hide the original source file name.
|
||||
#-renamesourcefileattribute SourceFile
|
||||
@@ -0,0 +1,24 @@
|
||||
package net.buzzert.kordophone.backend
|
||||
|
||||
import androidx.test.platform.app.InstrumentationRegistry
|
||||
import androidx.test.ext.junit.runners.AndroidJUnit4
|
||||
|
||||
import org.junit.Test
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import org.junit.Assert.*
|
||||
|
||||
/**
|
||||
* Instrumented test, which will execute on an Android device.
|
||||
*
|
||||
* See [testing documentation](http://d.android.com/tools/testing).
|
||||
*/
|
||||
@RunWith(AndroidJUnit4::class)
|
||||
class ExampleInstrumentedTest {
|
||||
@Test
|
||||
fun useAppContext() {
|
||||
// Context of the app under test.
|
||||
val appContext = InstrumentationRegistry.getInstrumentation().targetContext
|
||||
assertEquals("net.buzzert.kordophone.backend.test", appContext.packageName)
|
||||
}
|
||||
}
|
||||
4
android/backend/src/main/AndroidManifest.xml
Normal file
4
android/backend/src/main/AndroidManifest.xml
Normal file
@@ -0,0 +1,4 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<manifest xmlns:android="http://schemas.android.com/apk/res/android">
|
||||
<uses-permission android:name="android.permission.INTERNET" />
|
||||
</manifest>
|
||||
@@ -0,0 +1,170 @@
|
||||
package net.buzzert.kordophone.backend.db
|
||||
|
||||
import android.util.Log
|
||||
import io.realm.kotlin.MutableRealm
|
||||
import io.realm.kotlin.Realm
|
||||
import io.realm.kotlin.RealmConfiguration
|
||||
import io.realm.kotlin.UpdatePolicy
|
||||
import io.realm.kotlin.ext.toRealmList
|
||||
import io.realm.kotlin.query.find
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import net.buzzert.kordophone.backend.db.model.Conversation
|
||||
import net.buzzert.kordophone.backend.db.model.Message
|
||||
import net.buzzert.kordophone.backend.db.model.toDatabaseConversation
|
||||
import net.buzzert.kordophone.backend.db.model.toDatabaseMessage
|
||||
import net.buzzert.kordophone.backend.db.model.toRealmInstant
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import net.buzzert.kordophone.backend.server.REPO_LOG
|
||||
import java.lang.IllegalArgumentException
|
||||
import net.buzzert.kordophone.backend.model.Conversation as ModelConversation
|
||||
import net.buzzert.kordophone.backend.model.Message as ModelMessage
|
||||
|
||||
class CachedChatDatabase (private val realmConfig: RealmConfiguration) {
|
||||
companion object {
|
||||
private val schema = setOf(Conversation::class, Message::class)
|
||||
|
||||
fun liveDatabase(): CachedChatDatabase {
|
||||
return CachedChatDatabase(
|
||||
RealmConfiguration.Builder(schema = schema)
|
||||
.name("chat-cache")
|
||||
.build()
|
||||
)
|
||||
}
|
||||
|
||||
fun testDatabase(): CachedChatDatabase {
|
||||
return CachedChatDatabase(
|
||||
RealmConfiguration.Builder(schema = schema)
|
||||
.name("chat-cache-test")
|
||||
.inMemory()
|
||||
.build()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private val realm = runCatching {
|
||||
Realm.open(realmConfig)
|
||||
}.recover {
|
||||
// We're just a caching layer, so in the event of a migration error, just delete and start over.
|
||||
Log.d(REPO_LOG, "Error opening (${it.message}). Recovering by deleting database.")
|
||||
Realm.deleteRealm(realmConfig)
|
||||
|
||||
return@recover Realm.open(realmConfig)
|
||||
}.getOrThrow()
|
||||
|
||||
// Flow for watching changes to the database
|
||||
val conversationChanges: Flow<List<ModelConversation>>
|
||||
get() = realm.query(Conversation::class).find().asFlow().map {
|
||||
realm.copyFromRealm(it.list)
|
||||
.map { it.toConversation() }
|
||||
}
|
||||
|
||||
// Flow for watching for message changes for a given conversation
|
||||
fun messagesChanged(conversation: ModelConversation): Flow<List<ModelMessage>> {
|
||||
return realm.query(Message::class, "conversationGUID == $0", conversation.guid)
|
||||
.find()
|
||||
.asFlow()
|
||||
.map { it.list.map { it.toMessage(conversation) } }
|
||||
}
|
||||
|
||||
fun updateConversations(incomingConversations: List<ModelConversation>) = realm.writeBlocking {
|
||||
val incomingDatabaseConversations = incomingConversations.map { it.toDatabaseConversation() }
|
||||
|
||||
var deletedConversations = realm.query(Conversation::class).find()
|
||||
.minus(incomingDatabaseConversations)
|
||||
|
||||
deletedConversations.forEach { conversation ->
|
||||
findLatest(conversation)?.let {
|
||||
delete(it)
|
||||
}
|
||||
}
|
||||
|
||||
writeManagedConversations(this, incomingDatabaseConversations)
|
||||
}
|
||||
|
||||
fun writeConversations(conversations: List<ModelConversation>) = realm.writeBlocking {
|
||||
writeManagedConversations(this, conversations.map { it.toDatabaseConversation() })
|
||||
}
|
||||
|
||||
private fun writeManagedConversations(mutableRealm: MutableRealm, conversations: List<Conversation>) {
|
||||
conversations.forEach {conversation ->
|
||||
try {
|
||||
val managedConversation = getManagedConversationByGuid(conversation.guid)
|
||||
mutableRealm.findLatest(managedConversation)?.apply {
|
||||
displayName = conversation.displayName
|
||||
participants = conversation.participants
|
||||
date = conversation.date
|
||||
unreadCount = conversation.unreadCount
|
||||
lastMessagePreview = conversation.lastMessagePreview
|
||||
lastMessageGUID = conversation.lastMessageGUID
|
||||
}
|
||||
} catch (e: NoSuchElementException) {
|
||||
// Conversation does not exist. Copy it to the realm.
|
||||
mutableRealm.copyToRealm(conversation, updatePolicy = UpdatePolicy.ALL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun deleteConversations(conversations: List<ModelConversation>) = realm.writeBlocking {
|
||||
conversations.forEach { inConversation ->
|
||||
val conversation = getManagedConversationByGuid(inConversation.guid)
|
||||
findLatest(conversation)?.let {
|
||||
delete(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun fetchConversations(): List<ModelConversation> {
|
||||
val itemResults = realm.query(Conversation::class).find()
|
||||
val items = realm.copyFromRealm(itemResults)
|
||||
return items.map { it.toConversation() }
|
||||
}
|
||||
|
||||
fun writeMessages(messages: List<ModelMessage>, conversation: ModelConversation, outgoing: Boolean = false) {
|
||||
if (messages.isEmpty()) {
|
||||
return
|
||||
}
|
||||
|
||||
val dbConversation = getManagedConversationByGuid(conversation.guid)
|
||||
realm.writeBlocking {
|
||||
messages
|
||||
.map { it.toDatabaseMessage(outgoing = outgoing) }
|
||||
.map { copyToRealm(it, updatePolicy = UpdatePolicy.ALL) }
|
||||
|
||||
findLatest(dbConversation)?.let {
|
||||
val lastMessage = messages.maxByOrNull { it.date }!!
|
||||
|
||||
val lastMessageDate = lastMessage.date.toInstant().toRealmInstant()
|
||||
if (lastMessageDate > it.date) {
|
||||
it.lastMessageGUID = lastMessage.guid
|
||||
it.lastMessagePreview = lastMessage.displayText
|
||||
|
||||
// This will cause sort order to change. I think this ends
|
||||
// up getting updated whenever we get conversation changes anyway.
|
||||
// it.date = lastMessageDate
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun fetchMessages(conversation: ModelConversation): List<ModelMessage> {
|
||||
return realm.query(Message::class, "conversationGUID == $0", conversation.guid)
|
||||
.find()
|
||||
.map { it.toMessage(conversation) }
|
||||
}
|
||||
|
||||
fun close() {
|
||||
realm.close()
|
||||
}
|
||||
|
||||
private fun getManagedConversationByGuid(guid: GUID): Conversation {
|
||||
return realm.query(Conversation::class, "guid == $0", guid)
|
||||
.find()
|
||||
.first()
|
||||
}
|
||||
|
||||
fun getConversationByGuid(guid: GUID): Conversation {
|
||||
return realm.copyFromRealm(getManagedConversationByGuid(guid))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package net.buzzert.kordophone.backend.db.model
|
||||
|
||||
import io.realm.kotlin.ext.realmListOf
|
||||
import io.realm.kotlin.ext.toRealmList
|
||||
import io.realm.kotlin.types.RealmInstant
|
||||
import io.realm.kotlin.types.RealmList
|
||||
import io.realm.kotlin.types.RealmObject
|
||||
import io.realm.kotlin.types.annotations.PrimaryKey
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import org.mongodb.kbson.ObjectId
|
||||
import net.buzzert.kordophone.backend.model.Conversation as ModelConversation
|
||||
import java.util.Date
|
||||
|
||||
open class Conversation(
|
||||
@PrimaryKey
|
||||
var guid: GUID,
|
||||
|
||||
var displayName: String?,
|
||||
var participants: RealmList<String>,
|
||||
var date: RealmInstant,
|
||||
var unreadCount: Int,
|
||||
|
||||
var lastMessageGUID: String?,
|
||||
var lastMessagePreview: String?,
|
||||
): RealmObject
|
||||
{
|
||||
constructor() : this(
|
||||
guid = ObjectId().toString(),
|
||||
|
||||
displayName = null,
|
||||
participants = realmListOf<String>(),
|
||||
date = RealmInstant.now(),
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = null,
|
||||
lastMessageGUID = null,
|
||||
)
|
||||
|
||||
fun toConversation(): ModelConversation {
|
||||
return ModelConversation(
|
||||
displayName = displayName,
|
||||
participants = participants.toList(),
|
||||
date = Date.from(date.toInstant()),
|
||||
unreadCount = unreadCount,
|
||||
guid = guid,
|
||||
lastMessagePreview = lastMessagePreview,
|
||||
lastMessage = null,
|
||||
lastFetchedMessageGUID = lastMessageGUID
|
||||
)
|
||||
}
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (other == null || javaClass != other.javaClass) return false
|
||||
|
||||
val o = other as Conversation
|
||||
return guid == o.guid
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
return guid.hashCode()
|
||||
}
|
||||
}
|
||||
|
||||
fun ModelConversation.toDatabaseConversation(): Conversation {
|
||||
val from = this
|
||||
return Conversation().apply {
|
||||
displayName = from.displayName
|
||||
participants = from.participants.toRealmList()
|
||||
date = from.date.toInstant().toRealmInstant()
|
||||
unreadCount = from.unreadCount
|
||||
lastMessagePreview = from.lastMessagePreview
|
||||
lastMessageGUID = from.lastFetchedMessageGUID
|
||||
guid = from.guid
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package net.buzzert.kordophone.backend.db.model
|
||||
|
||||
import io.realm.kotlin.types.RealmInstant
|
||||
import java.time.Instant
|
||||
|
||||
// Copied from Realm's documentation
|
||||
// https://www.mongodb.com/docs/realm/sdk/kotlin/realm-database/schemas/supported-types/
|
||||
|
||||
fun RealmInstant.toInstant(): Instant {
|
||||
val sec: Long = this.epochSeconds
|
||||
// The value always lies in the range `-999_999_999..999_999_999`.
|
||||
// minus for timestamps before epoch, positive for after
|
||||
val nano: Int = this.nanosecondsOfSecond
|
||||
return if (sec >= 0) { // For positive timestamps, conversion can happen directly
|
||||
Instant.ofEpochSecond(sec, nano.toLong())
|
||||
} else {
|
||||
// For negative timestamps, RealmInstant starts from the higher value with negative
|
||||
// nanoseconds, while Instant starts from the lower value with positive nanoseconds
|
||||
// TODO This probably breaks at edge cases like MIN/MAX
|
||||
Instant.ofEpochSecond(sec - 1, 1_000_000 + nano.toLong())
|
||||
}
|
||||
}
|
||||
|
||||
fun Instant.toRealmInstant(): RealmInstant {
|
||||
val sec: Long = this.epochSecond
|
||||
// The value is always positive and lies in the range `0..999_999_999`.
|
||||
val nano: Int = this.nano
|
||||
|
||||
return if (sec >= 0) { // For positive timestamps, conversion can happen directly
|
||||
RealmInstant.from(sec, nano)
|
||||
} else {
|
||||
// For negative timestamps, RealmInstant starts from the higher value with negative
|
||||
// nanoseconds, while Instant starts from the lower value with positive nanoseconds
|
||||
// TODO This probably breaks at edge cases like MIN/MAX
|
||||
RealmInstant.from(sec + 1, -1_000_000 + nano)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package net.buzzert.kordophone.backend.db.model
|
||||
|
||||
import android.view.Display.Mode
|
||||
import io.realm.kotlin.Realm
|
||||
import io.realm.kotlin.ext.realmListOf
|
||||
import io.realm.kotlin.ext.toRealmList
|
||||
import io.realm.kotlin.types.EmbeddedRealmObject
|
||||
import io.realm.kotlin.types.RealmInstant
|
||||
import io.realm.kotlin.types.RealmList
|
||||
import io.realm.kotlin.types.RealmObject
|
||||
import io.realm.kotlin.types.annotations.PrimaryKey
|
||||
import net.buzzert.kordophone.backend.db.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import org.mongodb.kbson.ObjectId
|
||||
import net.buzzert.kordophone.backend.model.Message as ModelMessage
|
||||
import net.buzzert.kordophone.backend.model.Conversation as ModelConversation
|
||||
import java.util.Date
|
||||
|
||||
open class Message(
|
||||
@PrimaryKey
|
||||
var guid: GUID,
|
||||
|
||||
var text: String,
|
||||
var sender: String?,
|
||||
var date: RealmInstant,
|
||||
var attachmentGUIDs: RealmList<String>,
|
||||
|
||||
var conversationGUID: GUID,
|
||||
): RealmObject
|
||||
{
|
||||
constructor() : this(
|
||||
guid = ObjectId().toString(),
|
||||
text = "",
|
||||
sender = null,
|
||||
date = RealmInstant.now(),
|
||||
attachmentGUIDs = realmListOf<String>(),
|
||||
conversationGUID = ObjectId().toString(),
|
||||
)
|
||||
|
||||
fun toMessage(parentConversation: ModelConversation): ModelMessage {
|
||||
return ModelMessage(
|
||||
text = text,
|
||||
guid = guid,
|
||||
sender = sender,
|
||||
date = Date.from(date.toInstant()),
|
||||
attachmentGUIDs = attachmentGUIDs.toList(),
|
||||
conversation = parentConversation,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fun ModelMessage.toDatabaseMessage(outgoing: Boolean = false): Message {
|
||||
val from = this
|
||||
return Message().apply {
|
||||
text = from.text
|
||||
guid = from.guid
|
||||
sender = from.sender
|
||||
date = from.date.toInstant().toRealmInstant()
|
||||
conversationGUID = from.conversation.guid
|
||||
from.attachmentGUIDs?.let {
|
||||
attachmentGUIDs = it.toRealmList()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package net.buzzert.kordophone.backend.events
|
||||
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
|
||||
data class MessageDeliveredEvent(
|
||||
val message: Message,
|
||||
val conversation: Conversation,
|
||||
val requestGuid: GUID,
|
||||
)
|
||||
@@ -0,0 +1,79 @@
|
||||
package net.buzzert.kordophone.backend.model
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
typealias GUID = String
|
||||
|
||||
data class Conversation(
|
||||
@SerializedName("guid")
|
||||
val guid: GUID,
|
||||
|
||||
@SerializedName("date")
|
||||
var date: Date,
|
||||
|
||||
@SerializedName("participantDisplayNames")
|
||||
var participants: List<String>,
|
||||
|
||||
@SerializedName("displayName")
|
||||
var displayName: String?,
|
||||
|
||||
@SerializedName("unreadCount")
|
||||
var unreadCount: Int,
|
||||
|
||||
@SerializedName("lastMessagePreview")
|
||||
var lastMessagePreview: String?,
|
||||
|
||||
@SerializedName("lastMessage")
|
||||
var lastMessage: Message?,
|
||||
|
||||
var lastFetchedMessageGUID: String?,
|
||||
) {
|
||||
companion object {
|
||||
fun generate(): Conversation {
|
||||
return Conversation(
|
||||
guid = UUID.randomUUID().toString(),
|
||||
date = Date(),
|
||||
participants = listOf("foo@foo.com"),
|
||||
displayName = null,
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = null,
|
||||
lastMessage = null,
|
||||
lastFetchedMessageGUID = null,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
val isGroupChat: Boolean
|
||||
get() = participants.count() > 1
|
||||
|
||||
fun formattedDisplayName(): String {
|
||||
return displayName ?: participants.joinToString(", ")
|
||||
}
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (other == null || javaClass != other.javaClass) return false
|
||||
|
||||
val o = other as Conversation
|
||||
return (
|
||||
guid == o.guid &&
|
||||
date == o.date &&
|
||||
participants == o.participants &&
|
||||
displayName == o.displayName &&
|
||||
unreadCount == o.unreadCount &&
|
||||
lastFetchedMessageGUID == o.lastFetchedMessageGUID
|
||||
)
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = guid.hashCode()
|
||||
result = 31 * result + date.hashCode()
|
||||
result = 31 * result + participants.hashCode()
|
||||
result = 31 * result + (displayName?.hashCode() ?: 0)
|
||||
result = 31 * result + unreadCount
|
||||
result = 31 * result + (lastMessage?.hashCode() ?: 0)
|
||||
result = 31 * result + (lastFetchedMessageGUID?.hashCode() ?: 0)
|
||||
return result
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package net.buzzert.kordophone.backend.model
|
||||
|
||||
import android.net.Uri
|
||||
import com.google.gson.annotations.SerializedName
|
||||
import java.io.InputStream
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
data class Message(
|
||||
@SerializedName("guid")
|
||||
val guid: GUID,
|
||||
|
||||
@SerializedName("text")
|
||||
val text: String,
|
||||
|
||||
@SerializedName("sender")
|
||||
val sender: String?, // optional: nil means "from me"
|
||||
|
||||
@SerializedName("date")
|
||||
val date: Date,
|
||||
|
||||
@SerializedName("fileTransferGUIDs")
|
||||
val attachmentGUIDs: List<String>?,
|
||||
|
||||
@Transient
|
||||
var conversation: Conversation,
|
||||
) {
|
||||
companion object {
|
||||
fun generate(text: String, conversation: Conversation = Conversation.generate(), sender: String? = null): Message {
|
||||
return Message(
|
||||
guid = UUID.randomUUID().toString(),
|
||||
text = text,
|
||||
sender = sender,
|
||||
date = Date(),
|
||||
attachmentGUIDs = emptyList(),
|
||||
conversation = conversation,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
val displayText: String get() {
|
||||
// Filter out attachment markers
|
||||
val attachmentMarker = byteArrayOf(0xEF.toByte(), 0xBF.toByte(), 0xBC.toByte()).decodeToString()
|
||||
return text.replace(attachmentMarker, "")
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "Message(guid=$guid, text=$text, sender=$sender, date=$date, parent=${conversation.guid})"
|
||||
}
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (other == null || javaClass != other.javaClass) return false
|
||||
|
||||
val o = other as Message
|
||||
return (
|
||||
guid == o.guid &&
|
||||
text == o.text &&
|
||||
sender == o.sender &&
|
||||
date == o.date &&
|
||||
conversation.guid == o.conversation.guid
|
||||
)
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = guid.hashCode()
|
||||
result = 31 * result + text.hashCode()
|
||||
result = 31 * result + (sender?.hashCode() ?: 0)
|
||||
result = 31 * result + date.hashCode()
|
||||
result = 31 * result + conversation.guid.hashCode()
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
data class UploadingAttachmentMetadata(
|
||||
val inputStream: InputStream,
|
||||
val mimeType: String,
|
||||
val filename: String,
|
||||
)
|
||||
|
||||
data class OutgoingMessage(
|
||||
val body: String,
|
||||
val conversation: Conversation,
|
||||
val attachmentUris: Set<Uri>,
|
||||
val attachmentDataSource: (Uri) -> UploadingAttachmentMetadata?
|
||||
) {
|
||||
val guid: String = UUID.randomUUID().toString()
|
||||
|
||||
fun asMessage(): Message {
|
||||
return Message(
|
||||
guid = guid,
|
||||
text = body,
|
||||
sender = null,
|
||||
date = Date(),
|
||||
attachmentGUIDs = listOf(), // TODO: What to do here?
|
||||
conversation = conversation
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
package net.buzzert.kordophone.backend.model
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
|
||||
data class UpdateItem(
|
||||
@SerializedName("messageSequenceNumber")
|
||||
val sequence: Int,
|
||||
|
||||
@SerializedName("conversation")
|
||||
val conversationChanged: Conversation? = null,
|
||||
|
||||
@SerializedName("message")
|
||||
val messageAdded: Message? = null,
|
||||
)
|
||||
@@ -0,0 +1,218 @@
|
||||
package net.buzzert.kordophone.backend.server
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import okhttp3.Authenticator
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.Interceptor
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.RequestBody
|
||||
import okhttp3.Response
|
||||
import okhttp3.ResponseBody
|
||||
import okhttp3.Route
|
||||
import okhttp3.WebSocket
|
||||
import okhttp3.WebSocketListener
|
||||
import retrofit2.Retrofit
|
||||
import retrofit2.converter.gson.GsonConverterFactory
|
||||
import java.net.URL
|
||||
|
||||
interface APIClient {
|
||||
val isConfigured: Boolean
|
||||
|
||||
fun getAPIInterface(): APIInterface
|
||||
fun getWebSocketClient(
|
||||
serverPath: String,
|
||||
queryParams: Map<String, String>?,
|
||||
listener: WebSocketListener
|
||||
): WebSocket
|
||||
}
|
||||
|
||||
data class Authentication (
|
||||
val username: String,
|
||||
val password: String,
|
||||
)
|
||||
|
||||
class TokenStore(val authentication: Authentication) {
|
||||
var authenticationToken: String? = null
|
||||
}
|
||||
|
||||
class AuthenticationInterceptor(
|
||||
val tokenStore: TokenStore
|
||||
): Interceptor {
|
||||
override fun intercept(chain: Interceptor.Chain): Response {
|
||||
// If empty, allow the 401 to occur so we renew our token.
|
||||
val token = tokenStore.authenticationToken ?:
|
||||
return chain.proceed(chain.request())
|
||||
|
||||
val newRequest = chain.request().newBuilder()
|
||||
.header("Authorization", "Bearer $token")
|
||||
.build()
|
||||
|
||||
return chain.proceed(newRequest)
|
||||
}
|
||||
}
|
||||
|
||||
class TokenAuthenticator(
|
||||
private val tokenStore: TokenStore,
|
||||
private val baseURL: URL
|
||||
) : Authenticator {
|
||||
private val retrofit: Retrofit = Retrofit.Builder()
|
||||
.baseUrl(baseURL)
|
||||
.addConverterFactory(GsonConverterFactory.create())
|
||||
.build()
|
||||
|
||||
private val apiInterface: APIInterface
|
||||
get() = retrofit.create(APIInterface::class.java)
|
||||
|
||||
override fun authenticate(route: Route?, response: Response): Request? {
|
||||
// Fetch new token
|
||||
val request = AuthenticationRequest(
|
||||
username = tokenStore.authentication.username,
|
||||
password = tokenStore.authentication.password
|
||||
)
|
||||
|
||||
val token = runBlocking {
|
||||
apiInterface.authenticate(request).body()
|
||||
}
|
||||
|
||||
when (token) {
|
||||
null -> {
|
||||
// Auth failure.
|
||||
// TODO: How to bubble this up?
|
||||
return null
|
||||
}
|
||||
|
||||
// Update token store
|
||||
else -> {
|
||||
tokenStore.authenticationToken = token.serializedToken
|
||||
|
||||
return response.request().newBuilder()
|
||||
.header("Authorization", "Bearer ${token.serializedToken}")
|
||||
.build()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class APIClientFactory {
|
||||
companion object {
|
||||
fun createClient(serverString: String?, authentication: Authentication?): APIClient {
|
||||
if (serverString == null || authentication == null) {
|
||||
return InvalidConfigurationAPIClient(InvalidConfigurationAPIClient.Issue.NOT_CONFIGURED)
|
||||
}
|
||||
|
||||
// Try to parse server string
|
||||
val serverURL = HttpUrl.parse(serverString)
|
||||
?: return InvalidConfigurationAPIClient(InvalidConfigurationAPIClient.Issue.INVALID_HOST_URL)
|
||||
|
||||
return RetrofitAPIClient(serverURL.url(), authentication)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Is this a dumb idea?
|
||||
class InvalidConfigurationAPIClient(val issue: Issue): APIClient {
|
||||
enum class Issue {
|
||||
NOT_CONFIGURED,
|
||||
INVALID_CONFIGURATION,
|
||||
INVALID_HOST_URL,
|
||||
}
|
||||
|
||||
class NotConfiguredError: Throwable(message = "Not configured.")
|
||||
class InvalidConfigurationError(submessage: String): Throwable(message = "Invalid configuration: $submessage")
|
||||
|
||||
private class InvalidConfigurationAPIInterface(val issue: Issue): APIInterface {
|
||||
private fun throwError(): Nothing {
|
||||
when (issue) {
|
||||
Issue.NOT_CONFIGURED -> throw NotConfiguredError()
|
||||
Issue.INVALID_CONFIGURATION -> throw InvalidConfigurationError("Unknown.")
|
||||
Issue.INVALID_HOST_URL -> throw InvalidConfigurationError("Invalid host URL.")
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun getVersion(): ResponseBody = throwError()
|
||||
override suspend fun getConversations(): retrofit2.Response<List<Conversation>> = throwError()
|
||||
override suspend fun sendMessage(request: SendMessageRequest): retrofit2.Response<SendMessageResponse> = throwError()
|
||||
override suspend fun markConversation(conversationGUID: String): retrofit2.Response<Void> = throwError()
|
||||
override suspend fun fetchAttachment(guid: String, preview: Boolean): ResponseBody = throwError()
|
||||
override suspend fun uploadAttachment(filename: String, body: RequestBody): retrofit2.Response<UploadAttachmentResponse> = throwError()
|
||||
override suspend fun authenticate(request: AuthenticationRequest): retrofit2.Response<AuthenticationResponse> = throwError()
|
||||
override suspend fun getMessages(conversationGUID: String, limit: Int?, beforeMessageGUID: GUID?, afterMessageGUID: GUID?): retrofit2.Response<List<Message>> = throwError()
|
||||
}
|
||||
|
||||
override val isConfigured: Boolean
|
||||
get() { return issue != Issue.NOT_CONFIGURED }
|
||||
|
||||
override fun getAPIInterface(): APIInterface {
|
||||
return InvalidConfigurationAPIInterface(issue)
|
||||
}
|
||||
|
||||
override fun getWebSocketClient(
|
||||
serverPath: String,
|
||||
queryParams: Map<String, String>?,
|
||||
listener: WebSocketListener
|
||||
): WebSocket {
|
||||
throw Error("Invalid configuration.")
|
||||
}
|
||||
}
|
||||
|
||||
class RetrofitAPIClient(
|
||||
private val baseURL: URL,
|
||||
private val authentication: Authentication,
|
||||
): APIClient {
|
||||
private val tokenStore: TokenStore = TokenStore(authentication)
|
||||
|
||||
private val client: OkHttpClient = OkHttpClient.Builder()
|
||||
.addInterceptor(AuthenticationInterceptor(tokenStore))
|
||||
.authenticator(TokenAuthenticator(tokenStore, baseURL))
|
||||
.build()
|
||||
|
||||
private val retrofit: Retrofit = Retrofit.Builder()
|
||||
.client(client)
|
||||
.baseUrl(baseURL)
|
||||
.addConverterFactory(GsonConverterFactory.create())
|
||||
.build()
|
||||
|
||||
override val isConfigured: Boolean
|
||||
get() = true
|
||||
|
||||
override fun getAPIInterface(): APIInterface {
|
||||
return retrofit.create(APIInterface::class.java)
|
||||
}
|
||||
|
||||
override fun getWebSocketClient(
|
||||
serverPath: String,
|
||||
queryParams: Map<String, String>?,
|
||||
listener: WebSocketListener
|
||||
): WebSocket {
|
||||
val params = (queryParams ?: hashMapOf<String, String>()).toMutableMap()
|
||||
|
||||
val authToken = tokenStore.authenticationToken
|
||||
if (authToken != null) {
|
||||
params["token"] = authToken
|
||||
}
|
||||
|
||||
val requestURL = baseURL.authenticatedWebSocketURL(serverPath, params)
|
||||
val request = Request.Builder()
|
||||
.url(requestURL)
|
||||
.build()
|
||||
|
||||
return client.newWebSocket(request, listener)
|
||||
}
|
||||
}
|
||||
|
||||
fun URL.authenticatedWebSocketURL(serverPath: String, params: Map<String, String>): URL {
|
||||
val baseURI = HttpUrl.parse(this.toString())!!
|
||||
val requestURL = baseURI.newBuilder()
|
||||
.host(baseURI.host())
|
||||
.addEncodedPathSegments(serverPath)
|
||||
|
||||
params.forEach { (key, value) ->
|
||||
requestURL.addQueryParameter(key, value)
|
||||
}
|
||||
|
||||
return URL(requestURL.build().toString())
|
||||
}
|
||||
@@ -0,0 +1,96 @@
|
||||
package net.buzzert.kordophone.backend.server
|
||||
|
||||
import com.auth0.android.jwt.JWT
|
||||
import com.google.gson.annotations.SerializedName
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import okhttp3.RequestBody
|
||||
import okhttp3.ResponseBody
|
||||
import retrofit2.Call
|
||||
import retrofit2.Response
|
||||
import retrofit2.http.Body
|
||||
import retrofit2.http.GET
|
||||
import retrofit2.http.POST
|
||||
import retrofit2.http.Query
|
||||
import java.lang.Error
|
||||
import java.lang.Exception
|
||||
|
||||
data class SendMessageRequest(
|
||||
@SerializedName("guid")
|
||||
val conversationGUID: String,
|
||||
|
||||
@SerializedName("body")
|
||||
val body: String,
|
||||
|
||||
@SerializedName("fileTransferGUIDs")
|
||||
val transferGUIDs: List<String>?,
|
||||
)
|
||||
|
||||
data class SendMessageResponse(
|
||||
@SerializedName("guid")
|
||||
val sentMessageGUID: String,
|
||||
)
|
||||
|
||||
data class AuthenticationRequest(
|
||||
@SerializedName("username")
|
||||
val username: String,
|
||||
|
||||
@SerializedName("password")
|
||||
val password: String,
|
||||
)
|
||||
|
||||
data class AuthenticationResponse(
|
||||
@SerializedName("jwt")
|
||||
val serializedToken: String,
|
||||
) {
|
||||
fun decodeToken(): JWT {
|
||||
return JWT(serializedToken)
|
||||
}
|
||||
}
|
||||
|
||||
data class UploadAttachmentResponse(
|
||||
@SerializedName("fileTransferGUID")
|
||||
val transferGUID: String
|
||||
)
|
||||
|
||||
interface APIInterface {
|
||||
@GET("/version")
|
||||
suspend fun getVersion(): ResponseBody
|
||||
|
||||
@GET("/conversations")
|
||||
suspend fun getConversations(): Response<List<Conversation>>
|
||||
|
||||
@GET("/messages")
|
||||
suspend fun getMessages(
|
||||
@Query("guid") conversationGUID: String,
|
||||
@Query("limit") limit: Int? = null,
|
||||
@Query("beforeMessageGUID") beforeMessageGUID: GUID? = null,
|
||||
@Query("afterMessageGUID") afterMessageGUID: GUID? = null,
|
||||
): Response<List<Message>>
|
||||
|
||||
@POST("/sendMessage")
|
||||
suspend fun sendMessage(@Body request: SendMessageRequest): Response<SendMessageResponse>
|
||||
|
||||
@POST("/markConversation")
|
||||
suspend fun markConversation(@Query("guid") conversationGUID: String): Response<Void>
|
||||
|
||||
@GET("/attachment")
|
||||
suspend fun fetchAttachment(@Query("guid") guid: String, @Query("preview") preview: Boolean = false): ResponseBody
|
||||
|
||||
@POST("/uploadAttachment")
|
||||
suspend fun uploadAttachment(@Query("filename") filename: String, @Body body: RequestBody): Response<UploadAttachmentResponse>
|
||||
|
||||
@POST("/authenticate")
|
||||
suspend fun authenticate(@Body request: AuthenticationRequest): Response<AuthenticationResponse>
|
||||
}
|
||||
|
||||
class ResponseDecodeError(val response: ResponseBody): Exception(response.string())
|
||||
|
||||
fun <T> Response<T>.bodyOnSuccessOrThrow(): T {
|
||||
if (isSuccessful) {
|
||||
return body()!!
|
||||
}
|
||||
|
||||
throw ResponseDecodeError(errorBody()!!)
|
||||
}
|
||||
@@ -0,0 +1,333 @@
|
||||
package net.buzzert.kordophone.backend.server
|
||||
|
||||
import android.net.Uri
|
||||
import android.util.Log
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.asSharedFlow
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
import net.buzzert.kordophone.backend.db.CachedChatDatabase
|
||||
import net.buzzert.kordophone.backend.events.MessageDeliveredEvent
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import net.buzzert.kordophone.backend.model.OutgoingMessage
|
||||
import okhttp3.MediaType
|
||||
import okhttp3.RequestBody
|
||||
import okio.BufferedSource
|
||||
import java.io.InputStream
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
|
||||
const val REPO_LOG: String = "ChatRepository"
|
||||
const val CONVERSATION_MESSAGE_SYNC_COUNT = 10
|
||||
|
||||
class ChatRepository(
|
||||
private var apiClient: APIClient,
|
||||
private val database: CachedChatDatabase,
|
||||
) {
|
||||
sealed class Error {
|
||||
open val title: String = "Error"
|
||||
open val description: String = "Generic Error"
|
||||
|
||||
data class ConnectionError(val message: String?): Error() {
|
||||
override val title: String = "Connection Error"
|
||||
override val description: String = message ?: "???"
|
||||
}
|
||||
|
||||
data class AttachmentError(val message: String): Error() {
|
||||
override val title: String = "Attachment Error"
|
||||
override val description: String = message
|
||||
}
|
||||
}
|
||||
|
||||
// All (Cached) Conversations
|
||||
val conversations: List<Conversation>
|
||||
get() = database.fetchConversations()
|
||||
|
||||
// Channel that's signaled when an outgoing message is delivered.
|
||||
val messageDeliveredChannel: SharedFlow<MessageDeliveredEvent>
|
||||
get() = _messageDeliveredChannel.asSharedFlow()
|
||||
|
||||
// Changes Flow
|
||||
val conversationChanges: Flow<List<Conversation>>
|
||||
get() = database.conversationChanges
|
||||
.onEach { Log.d(REPO_LOG, "Got database conversations changed") }
|
||||
|
||||
// New Messages
|
||||
val newMessages: SharedFlow<Message>
|
||||
get() = _newMessageChannel.asSharedFlow()
|
||||
|
||||
// Errors channel
|
||||
val errorEncounteredChannel: SharedFlow<Error>
|
||||
get() = _errorEncounteredChannel.asSharedFlow()
|
||||
|
||||
val isConfigured: Boolean
|
||||
get() = apiClient.isConfigured
|
||||
|
||||
// New messages for a particular conversation
|
||||
fun messagesChanged(conversation: Conversation): Flow<List<Message>> =
|
||||
database.messagesChanged(conversation)
|
||||
|
||||
// Testing harness
|
||||
internal class TestingHarness(private val repository: ChatRepository) {
|
||||
suspend fun fetchConversations(): List<Conversation> {
|
||||
return repository.fetchConversations()
|
||||
}
|
||||
|
||||
suspend fun fetchMessages(conversation: Conversation): List<Message> {
|
||||
return repository.fetchMessages(conversation)
|
||||
}
|
||||
}
|
||||
|
||||
internal fun testingHarness(): TestingHarness = TestingHarness(this)
|
||||
|
||||
private var apiInterface = apiClient.getAPIInterface()
|
||||
private val outgoingMessageQueue: ArrayBlockingQueue<OutgoingMessage> = ArrayBlockingQueue(16)
|
||||
private var outgoingMessageThread: Thread? = null
|
||||
private val _messageDeliveredChannel = MutableSharedFlow<MessageDeliveredEvent>()
|
||||
private val _errorEncounteredChannel = MutableSharedFlow<Error>()
|
||||
private val _newMessageChannel = MutableSharedFlow<Message>()
|
||||
|
||||
private var updateMonitor = UpdateMonitor(apiClient)
|
||||
private var updateWatchJob: Job? = null
|
||||
private var updateWatchScope: CoroutineScope? = null
|
||||
|
||||
fun updateAPIClient(client: APIClient) {
|
||||
this.apiClient = client
|
||||
this.apiInterface = client.getAPIInterface()
|
||||
this.updateMonitor = UpdateMonitor(client)
|
||||
|
||||
// Restart update watch job, if necessary.
|
||||
if (this.updateWatchJob != null) {
|
||||
stopWatchingForUpdates()
|
||||
beginWatchingForUpdates(updateWatchScope!!)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun getVersion(): String {
|
||||
return apiInterface.getVersion().string()
|
||||
}
|
||||
|
||||
fun beginWatchingForUpdates(scope: CoroutineScope) {
|
||||
updateWatchJob?.cancel()
|
||||
updateWatchJob = CoroutineScope(scope.coroutineContext).launch {
|
||||
launch {
|
||||
updateMonitor.conversationChanged.collect { handleConversationChangedUpdate(it) }
|
||||
}
|
||||
launch {
|
||||
updateMonitor.messageAdded.collect { handleMessageAddedUpdate(it) }
|
||||
}
|
||||
launch {
|
||||
messageDeliveredChannel.collectLatest { handleMessageDelivered(it) }
|
||||
}
|
||||
}
|
||||
|
||||
updateWatchScope = scope
|
||||
updateMonitor.beginMonitoringUpdates()
|
||||
}
|
||||
|
||||
fun stopWatchingForUpdates() {
|
||||
updateWatchJob?.cancel()
|
||||
updateWatchJob = null
|
||||
|
||||
updateMonitor.stopMonitoringForUpdates()
|
||||
}
|
||||
|
||||
fun enqueueOutgoingMessage(message: OutgoingMessage): GUID {
|
||||
val guid = UUID.randomUUID().toString()
|
||||
|
||||
Log.d(REPO_LOG, "Enqueuing outgoing message: $message ($guid)")
|
||||
outgoingMessageQueue.add(message)
|
||||
|
||||
if (outgoingMessageThread == null) {
|
||||
outgoingMessageThread = Thread { outgoingMessageQueueMain() }
|
||||
outgoingMessageThread?.start()
|
||||
}
|
||||
|
||||
return guid
|
||||
}
|
||||
|
||||
fun conversationForGuid(guid: GUID): Conversation {
|
||||
return database.getConversationByGuid(guid).toConversation()
|
||||
}
|
||||
|
||||
fun messagesForConversation(conversation: Conversation): List<Message> {
|
||||
return database.fetchMessages(conversation)
|
||||
}
|
||||
|
||||
suspend fun synchronize() = withErrorChannelHandling {
|
||||
Log.d(REPO_LOG, "Synchronizing conversations")
|
||||
|
||||
// Sync conversations
|
||||
val serverConversations = fetchConversations()
|
||||
database.updateConversations(serverConversations)
|
||||
|
||||
// Sync top N number of conversations' message content
|
||||
Log.d(REPO_LOG, "Synchronizing messages")
|
||||
val sortedConversations = conversations.sortedBy { it.date }.reversed()
|
||||
for (conversation in sortedConversations.take(CONVERSATION_MESSAGE_SYNC_COUNT)) {
|
||||
synchronizeConversation(conversation)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun synchronizeConversation(conversation: Conversation, limit: Int = 15) = withErrorChannelHandling {
|
||||
val messages = fetchMessages(conversation, limit = limit, afterGUID = conversation.lastFetchedMessageGUID)
|
||||
database.writeMessages(messages, conversation)
|
||||
}
|
||||
|
||||
suspend fun markConversationAsRead(conversation: Conversation) = withErrorChannelHandling(silent = true) {
|
||||
apiInterface.markConversation(conversation.guid)
|
||||
}
|
||||
|
||||
suspend fun fetchAttachmentDataSource(guid: String, preview: Boolean): BufferedSource {
|
||||
return apiInterface.fetchAttachment(guid, preview).source()
|
||||
}
|
||||
|
||||
private suspend fun uploadAttachment(filename: String, mediaType: String, source: InputStream): String {
|
||||
val attachmentData = source.readBytes()
|
||||
val requestBody = RequestBody.create(MediaType.get(mediaType), attachmentData)
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
source.close()
|
||||
}
|
||||
|
||||
val response = apiInterface.uploadAttachment(filename, requestBody)
|
||||
return response.bodyOnSuccessOrThrow().transferGUID
|
||||
}
|
||||
|
||||
fun close() {
|
||||
database.close()
|
||||
}
|
||||
|
||||
// - private
|
||||
|
||||
private suspend fun withErrorChannelHandling(silent: Boolean = false, body: suspend () -> Unit) {
|
||||
try {
|
||||
body()
|
||||
} catch (e: InvalidConfigurationAPIClient.NotConfiguredError) {
|
||||
// Not configured yet: ignore.
|
||||
} catch (e: java.lang.Exception) {
|
||||
if (!silent) _errorEncounteredChannel.emit(Error.ConnectionError(e.message))
|
||||
} catch (e: java.lang.Error) {
|
||||
if (!silent) _errorEncounteredChannel.emit(Error.ConnectionError(e.message))
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun fetchConversations(): List<Conversation> {
|
||||
return apiInterface.getConversations().bodyOnSuccessOrThrow()
|
||||
}
|
||||
|
||||
private suspend fun fetchMessages(
|
||||
conversation: Conversation,
|
||||
limit: Int? = null,
|
||||
beforeGUID: String? = null,
|
||||
afterGUID: String? = null,
|
||||
): List<Message> {
|
||||
return apiInterface.getMessages(conversation.guid, limit, beforeGUID, afterGUID)
|
||||
.bodyOnSuccessOrThrow()
|
||||
.onEach { it.conversation = conversation }
|
||||
}
|
||||
|
||||
private fun handleConversationChangedUpdate(conversation: Conversation) {
|
||||
Log.d(REPO_LOG, "Handling conversation changed update")
|
||||
database.writeConversations(listOf(conversation))
|
||||
}
|
||||
|
||||
private suspend fun handleMessageAddedUpdate(message: Message) {
|
||||
Log.d(REPO_LOG, "Handling messages added update")
|
||||
database.writeMessages(listOf(message), message.conversation)
|
||||
_newMessageChannel.emit(message)
|
||||
}
|
||||
|
||||
private suspend fun handleMessageDelivered(event: MessageDeliveredEvent) {
|
||||
Log.d(REPO_LOG, "Handling successful delivery event")
|
||||
|
||||
// Unfortunate protocol reality: the server doesn't tell us about new messages that are from us,
|
||||
// so we have to explicitly handle this like a messageAddedUpdate.
|
||||
database.writeMessages(listOf(event.message), event.conversation, outgoing = true)
|
||||
}
|
||||
|
||||
private suspend fun retryMessageSend(info: OutgoingMessage) {
|
||||
delay(5000L)
|
||||
outgoingMessageQueue.add(info)
|
||||
}
|
||||
|
||||
private fun outgoingMessageQueueMain() {
|
||||
Log.d(REPO_LOG, "Outgoing Message Queue Main")
|
||||
while (true) {
|
||||
outgoingMessageQueue.take().let {
|
||||
runBlocking {
|
||||
val conversation = it.conversation
|
||||
val requestGuid = it.guid
|
||||
val body = it.body
|
||||
|
||||
Log.d(REPO_LOG, "Sending message to $conversation: $requestGuid")
|
||||
|
||||
// Upload attachments first
|
||||
val attachmentGUIDs = mutableListOf<String>()
|
||||
try {
|
||||
for (uri: Uri in it.attachmentUris) {
|
||||
val uploadData = it.attachmentDataSource(uri)
|
||||
?: throw java.lang.Exception("No upload data.")
|
||||
|
||||
val guid = uploadAttachment(uploadData.filename, uploadData.mimeType, uploadData.inputStream)
|
||||
attachmentGUIDs.add(guid)
|
||||
}
|
||||
} catch (e: java.lang.Exception) {
|
||||
Log.e(REPO_LOG, "Error uploading attachment (${e.message}). Dropping...")
|
||||
_errorEncounteredChannel.emit(Error.AttachmentError("Upload error: ${e.message}"))
|
||||
}
|
||||
|
||||
try {
|
||||
val result = apiInterface.sendMessage(
|
||||
SendMessageRequest(
|
||||
conversationGUID = conversation.guid,
|
||||
body = body,
|
||||
transferGUIDs = attachmentGUIDs,
|
||||
)
|
||||
)
|
||||
|
||||
if (result.isSuccessful) {
|
||||
val messageGuid = result.body()?.sentMessageGUID ?: requestGuid
|
||||
Log.d(REPO_LOG, "Successfully sent message: $messageGuid")
|
||||
|
||||
val newMessage = Message(
|
||||
guid = messageGuid,
|
||||
text = body,
|
||||
sender = null,
|
||||
conversation = it.conversation,
|
||||
date = Date(),
|
||||
attachmentGUIDs = attachmentGUIDs,
|
||||
)
|
||||
|
||||
_messageDeliveredChannel.emit(
|
||||
MessageDeliveredEvent(
|
||||
newMessage,
|
||||
conversation,
|
||||
requestGuid
|
||||
)
|
||||
)
|
||||
} else {
|
||||
Log.e(REPO_LOG, "Error sending message. Enqueuing for retry.")
|
||||
retryMessageSend(it)
|
||||
}
|
||||
} catch (e: java.lang.Exception) {
|
||||
Log.e(REPO_LOG, "Error sending message: (${e.message}). Enqueuing for retry in 5 sec.")
|
||||
retryMessageSend(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,133 @@
|
||||
package net.buzzert.kordophone.backend.server
|
||||
|
||||
import android.util.Log
|
||||
import com.google.gson.Gson
|
||||
import com.google.gson.reflect.TypeToken
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import net.buzzert.kordophone.backend.model.UpdateItem
|
||||
import okhttp3.Response
|
||||
import okhttp3.WebSocket
|
||||
import okhttp3.WebSocketListener
|
||||
import okio.ByteString
|
||||
import java.lang.reflect.Type
|
||||
|
||||
const val UPMON_LOG: String = "UpdateMonitor"
|
||||
|
||||
class UpdateMonitor(private val client: APIClient) : WebSocketListener() {
|
||||
// Flow for getting conversation changed notifications
|
||||
val conversationChanged: Flow<Conversation>
|
||||
get() = _conversationChanged
|
||||
|
||||
// Flow for messages added notifications
|
||||
val messageAdded: Flow<Message>
|
||||
get() = _messageAdded
|
||||
|
||||
private val gson: Gson = Gson()
|
||||
private val updateItemsType: Type = object : TypeToken<ArrayList<UpdateItem>>() {}.type
|
||||
private var webSocket: WebSocket? = null
|
||||
private var needsSocketReconnect: Boolean = false
|
||||
private var messageSeq: Int = -1
|
||||
|
||||
private val _conversationChanged: MutableSharedFlow<Conversation> = MutableSharedFlow()
|
||||
private val _messageAdded: MutableSharedFlow<Message> = MutableSharedFlow()
|
||||
|
||||
fun beginMonitoringUpdates() {
|
||||
if (!client.isConfigured) {
|
||||
Log.e(UPMON_LOG, "Closing websocket connection because client is not configured.")
|
||||
return
|
||||
}
|
||||
|
||||
Log.d(UPMON_LOG, "Opening websocket connection")
|
||||
try {
|
||||
this.webSocket = client.getWebSocketClient(
|
||||
serverPath = "updates",
|
||||
queryParams = mapOf("seq" to messageSeq.toString()),
|
||||
listener = this
|
||||
)
|
||||
} catch (e: Error) {
|
||||
Log.e(UPMON_LOG, "Error getting websocket client: ${e.message}")
|
||||
}
|
||||
}
|
||||
|
||||
fun stopMonitoringForUpdates() {
|
||||
this.webSocket?.close(1000, "Closing on program request.")
|
||||
}
|
||||
|
||||
private fun processEncodedSocketMessage(message: String) = runBlocking {
|
||||
val reader = message.reader()
|
||||
val jsonReader = gson.newJsonReader(reader)
|
||||
|
||||
val updateItems: List<UpdateItem> = gson.fromJson(message, updateItemsType)
|
||||
for (updateItem: UpdateItem in updateItems) {
|
||||
val conversationChanged = updateItem.conversationChanged
|
||||
if (conversationChanged != null) {
|
||||
_conversationChanged.emit(conversationChanged)
|
||||
}
|
||||
|
||||
if (updateItem.messageAdded != null) {
|
||||
_messageAdded.emit(updateItem.messageAdded.also {
|
||||
it.conversation = conversationChanged!!
|
||||
})
|
||||
}
|
||||
|
||||
if (updateItem.sequence > messageSeq) {
|
||||
messageSeq = updateItem.sequence
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
private fun setNeedsSocketReconnect() {
|
||||
if (!needsSocketReconnect) {
|
||||
needsSocketReconnect = true
|
||||
|
||||
GlobalScope.launch {
|
||||
needsSocketReconnect = false
|
||||
|
||||
// Delay 5 seconds
|
||||
delay(5000L)
|
||||
|
||||
beginMonitoringUpdates()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// <WebSocketListener>
|
||||
|
||||
override fun onOpen(webSocket: WebSocket, response: Response) {
|
||||
super.onOpen(webSocket, response)
|
||||
Log.d(UPMON_LOG, "Update monitor websocket open")
|
||||
}
|
||||
|
||||
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
|
||||
super.onClosed(webSocket, code, reason)
|
||||
Log.d(UPMON_LOG, "Update monitor socket closed")
|
||||
setNeedsSocketReconnect()
|
||||
}
|
||||
|
||||
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
|
||||
super.onFailure(webSocket, t, response)
|
||||
Log.d(UPMON_LOG, "Update monitor socket failure: ${t.message} :: Response: ${response?.body()}. Reconnecting in 5 seconds.")
|
||||
setNeedsSocketReconnect()
|
||||
}
|
||||
|
||||
override fun onMessage(webSocket: WebSocket, text: String) {
|
||||
super.onMessage(webSocket, text)
|
||||
Log.d(UPMON_LOG, "Update monitor websocket received text message")
|
||||
processEncodedSocketMessage(text)
|
||||
}
|
||||
|
||||
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
|
||||
super.onMessage(webSocket, bytes)
|
||||
Log.d(UPMON_LOG, "Update monitor websocket received bytes message")
|
||||
processEncodedSocketMessage(bytes.utf8())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,345 @@
|
||||
package net.buzzert.kordophone.backend
|
||||
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
import net.buzzert.kordophone.backend.db.CachedChatDatabase
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import net.buzzert.kordophone.backend.model.OutgoingMessage
|
||||
import net.buzzert.kordophone.backend.server.APIClient
|
||||
import net.buzzert.kordophone.backend.server.APIInterface
|
||||
import net.buzzert.kordophone.backend.server.Authentication
|
||||
import net.buzzert.kordophone.backend.server.ChatRepository
|
||||
import net.buzzert.kordophone.backend.server.RetrofitAPIClient
|
||||
import net.buzzert.kordophone.backend.server.UpdateMonitor
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Test
|
||||
import java.net.URL
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class BackendTests {
|
||||
private fun liveRepository(host: String): Pair<ChatRepository, RetrofitAPIClient> {
|
||||
val client = RetrofitAPIClient(URL(host), authentication = Authentication("test", "test"))
|
||||
val database = CachedChatDatabase.testDatabase()
|
||||
val repository = ChatRepository(client, database)
|
||||
return Pair(repository, client)
|
||||
}
|
||||
|
||||
private fun mockRepository(): Pair<ChatRepository, MockServer> {
|
||||
val mockServer = MockServer()
|
||||
val database = CachedChatDatabase.testDatabase()
|
||||
val repository = ChatRepository(mockServer.getClient(), database)
|
||||
return Pair(repository, mockServer)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testGetVersion() = runBlocking {
|
||||
val (repository, mockServer) = mockRepository()
|
||||
val version = repository.getVersion()
|
||||
assertEquals(version, mockServer.version)
|
||||
|
||||
repository.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testFetchConversations() = runBlocking {
|
||||
val (repository, mockServer) = mockRepository()
|
||||
|
||||
// Add conversation to mock server
|
||||
val inConversation = mockServer.addTestConversations(1).first()
|
||||
|
||||
val conversations = repository.testingHarness().fetchConversations()
|
||||
assertEquals(conversations.count(), 1)
|
||||
|
||||
val outConversation = conversations.first()
|
||||
assertEquals(inConversation, outConversation)
|
||||
|
||||
repository.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testFetchMessages() = runBlocking {
|
||||
val (repository, mockServer) = mockRepository()
|
||||
|
||||
// Add conversation & message to mock server
|
||||
val inConversation = mockServer.addTestConversations(1).first()
|
||||
val inMessage = mockServer.addTestMessages(1, inConversation).first()
|
||||
|
||||
val conversations = repository.testingHarness().fetchConversations()
|
||||
val messages = repository.testingHarness().fetchMessages(conversations.first())
|
||||
assertEquals(messages.count(), 1)
|
||||
|
||||
val outMessage = messages.first()
|
||||
assertEquals(outMessage, inMessage)
|
||||
|
||||
repository.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testSendMessage() = runBlocking {
|
||||
val (repository, mockServer) = mockRepository()
|
||||
|
||||
val conversation = mockServer.addTestConversations(1).first()
|
||||
val generatedMessage = MockServer.generateMessage(conversation)
|
||||
val outgoingMessage = OutgoingMessage(
|
||||
body = generatedMessage.text,
|
||||
conversation = conversation,
|
||||
attachmentUris = setOf(),
|
||||
attachmentDataSource = { null },
|
||||
)
|
||||
|
||||
repository.enqueueOutgoingMessage(outgoingMessage)
|
||||
|
||||
val event = repository.messageDeliveredChannel.first()
|
||||
assertEquals(event.message.text, outgoingMessage.body)
|
||||
|
||||
repository.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConversationSynchronization() = runBlocking {
|
||||
val (repo, mockServer) = mockRepository()
|
||||
|
||||
// Add some test convos
|
||||
val conversations = mockServer.addTestConversations(10)
|
||||
|
||||
// Sync
|
||||
repo.synchronize()
|
||||
|
||||
// Check our count.
|
||||
assertEquals(10, repo.conversations.count())
|
||||
|
||||
// Sync again: let's ensure we're de-duplicating conversations.
|
||||
repo.synchronize()
|
||||
|
||||
// Should be no change...
|
||||
assertEquals(10, repo.conversations.count())
|
||||
|
||||
// Say unread count + lastMessage preview changes on server.
|
||||
val someConversation = conversations.first().apply {
|
||||
displayName = "COOL"
|
||||
unreadCount = 2
|
||||
}
|
||||
|
||||
// Sync again
|
||||
repo.synchronize()
|
||||
|
||||
// Make sure change is reflected
|
||||
val readConversation = repo.conversationForGuid(someConversation.guid)
|
||||
assertEquals("COOL", readConversation.displayName)
|
||||
assertEquals(2, readConversation.unreadCount)
|
||||
|
||||
repo.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConversationFlowUpdates() = runBlocking {
|
||||
val (repo, mockServer) = mockRepository()
|
||||
|
||||
// Set up flow watcher, asynchronously
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
println("Watching for conversations changes...")
|
||||
repo.conversationChanges.collect {
|
||||
println("Changed conversations: $it")
|
||||
|
||||
// We got it.
|
||||
if (it.isNotEmpty()) {
|
||||
println("bink")
|
||||
updateLatch.countDown()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
// Add a conversation
|
||||
println("Adding conversation")
|
||||
mockServer.addTestConversations(1)
|
||||
|
||||
// Sync. This should trigger an update
|
||||
println("Synchronizing...")
|
||||
repo.synchronize()
|
||||
|
||||
// Wait for the coroutine that is collecting the flow to finish
|
||||
job.join()
|
||||
|
||||
// Ensure the updates have been processed before proceeding
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testMessageFlowUpdates() = runBlocking {
|
||||
val (repo, mockServer) = mockRepository()
|
||||
|
||||
// Add an existing conversation
|
||||
println("Adding conversation")
|
||||
val conversation = mockServer.addTestConversations(1).first()
|
||||
|
||||
// Initial sync
|
||||
println("Initial sync")
|
||||
repo.synchronize()
|
||||
|
||||
// Set up flow watcher, asynchronously
|
||||
var messagesAdded: List<Message>? = null
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
println("Watching for messages to be added...")
|
||||
repo.messagesChanged(conversation).collect {
|
||||
println("Messages changed: $it")
|
||||
|
||||
if (it.isNotEmpty()) {
|
||||
messagesAdded = it
|
||||
updateLatch.countDown()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
// Add a message
|
||||
val messages = mockServer.addTestMessages(10, conversation)
|
||||
|
||||
// Sync. This should trigger an update
|
||||
println("Synchronizing...")
|
||||
repo.synchronize()
|
||||
|
||||
// Wait for the coroutine that is collecting the flow to finish
|
||||
job.join()
|
||||
|
||||
// Ensure the updates have been processed before proceeding
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
|
||||
// Check what we got back
|
||||
assertEquals(messages, messagesAdded)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpdateMonitorForConversations() = runBlocking {
|
||||
val mockServer = MockServer()
|
||||
val mockAPIClient = mockServer.getClient()
|
||||
val updateMonitor = UpdateMonitor(mockAPIClient)
|
||||
|
||||
// Set up flow watcher, asynchronously
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
updateMonitor.beginMonitoringUpdates()
|
||||
updateMonitor.conversationChanged.collect {
|
||||
println("Got conversation changed: $it")
|
||||
updateLatch.countDown()
|
||||
|
||||
updateMonitor.stopMonitoringForUpdates()
|
||||
mockAPIClient.stopWatchingForUpdates()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
Thread.sleep(500)
|
||||
|
||||
// Add a conversation
|
||||
println("Adding conversation")
|
||||
mockServer.addTestConversations(1)
|
||||
|
||||
// Wait for the coroutine that is collecting the flow to finish
|
||||
job.join()
|
||||
|
||||
// Ensure the updates have been processed before proceeding
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpdateMonitorForMessages() = runBlocking {
|
||||
val mockServer = MockServer()
|
||||
val mockAPIClient = mockServer.getClient()
|
||||
val updateMonitor = UpdateMonitor(mockAPIClient)
|
||||
|
||||
// Set up flow watcher, asynchronously
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
updateMonitor.beginMonitoringUpdates()
|
||||
updateMonitor.messageAdded.collect {
|
||||
println("Got message added: $it")
|
||||
updateLatch.countDown()
|
||||
|
||||
updateMonitor.stopMonitoringForUpdates()
|
||||
mockAPIClient.stopWatchingForUpdates()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
Thread.sleep(500)
|
||||
|
||||
// Add a conversation
|
||||
println("Adding conversation")
|
||||
val convo = mockServer.addTestConversations(1).first()
|
||||
|
||||
// Add a test message
|
||||
mockServer.addTestMessages(1, convo)
|
||||
|
||||
// Wait for the coroutine that is collecting the flow to finish
|
||||
job.join()
|
||||
|
||||
// Ensure the updates have been processed before proceeding
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testEndToEndMessageUpdates() = runBlocking {
|
||||
val (repo, mockServer) = mockRepository()
|
||||
|
||||
val conversation = mockServer.addTestConversations(1).first()
|
||||
|
||||
// Initial sync
|
||||
repo.synchronize()
|
||||
|
||||
// We're going to generate a couple of messages...
|
||||
val messagesToGenerate = 5
|
||||
|
||||
// Start watching for N updates
|
||||
val updateLatch = CountDownLatch(messagesToGenerate)
|
||||
val monitorJob = launch {
|
||||
repo.messagesChanged(conversation).collect {
|
||||
println("Message changed: $it")
|
||||
|
||||
if (it.isNotEmpty()) {
|
||||
updateLatch.countDown()
|
||||
}
|
||||
|
||||
if (updateLatch.count == 0L) {
|
||||
repo.stopWatchingForUpdates()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
repo.beginWatchingForUpdates(this)
|
||||
|
||||
Thread.sleep(500)
|
||||
|
||||
// Should trigger an update
|
||||
println("Adding messages")
|
||||
mockServer.addTestMessages(messagesToGenerate, conversation)
|
||||
|
||||
monitorJob.join()
|
||||
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
|
||||
// Check num messages
|
||||
val allMessages = repo.messagesForConversation(conversation)
|
||||
assertEquals(messagesToGenerate, allMessages.count())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
package net.buzzert.kordophone.backend
|
||||
|
||||
import net.buzzert.kordophone.backend.db.CachedChatDatabase
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import org.junit.AfterClass
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Test
|
||||
import java.util.Date
|
||||
|
||||
class DatabaseTests {
|
||||
@Test
|
||||
fun testConversationRetrieval() {
|
||||
val db = CachedChatDatabase.testDatabase()
|
||||
|
||||
val conversation = MockServer.generateConversation()
|
||||
db.writeConversations(listOf(conversation))
|
||||
|
||||
val readBackConversations = db.fetchConversations()
|
||||
assertEquals(readBackConversations.count(), 1)
|
||||
|
||||
val readConversation = readBackConversations[0]
|
||||
assertEquals(readConversation, conversation)
|
||||
|
||||
db.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testMessageRetrieval() {
|
||||
val db = CachedChatDatabase.testDatabase()
|
||||
|
||||
val conversation = MockServer.generateConversation()
|
||||
db.writeConversations(listOf(conversation))
|
||||
|
||||
var messages = listOf(
|
||||
MockServer.generateMessage(conversation),
|
||||
MockServer.generateMessage(conversation),
|
||||
)
|
||||
db.writeMessages(messages, conversation)
|
||||
|
||||
val readMessages = db.fetchMessages(conversation)
|
||||
|
||||
assertEquals(readMessages, messages)
|
||||
assertEquals(readMessages[0].conversation.guid, conversation.guid)
|
||||
|
||||
db.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConversationModification() {
|
||||
val db = CachedChatDatabase.testDatabase()
|
||||
|
||||
var conversation = MockServer.generateConversation().apply {
|
||||
displayName = "HooBoy"
|
||||
}
|
||||
|
||||
db.writeConversations(listOf(conversation))
|
||||
|
||||
val readConversation = db.fetchConversations().first()
|
||||
assertEquals(conversation.displayName, "HooBoy")
|
||||
|
||||
// Change display name
|
||||
conversation.displayName = "wow"
|
||||
|
||||
// Write back
|
||||
db.writeConversations(listOf(conversation))
|
||||
|
||||
val nowConversations = db.fetchConversations()
|
||||
|
||||
// Make sure we didn't duplicate
|
||||
assertEquals(nowConversations.count(), 1)
|
||||
|
||||
// Make sure our new name was written
|
||||
assertEquals(nowConversations.first().displayName, "wow")
|
||||
|
||||
db.close()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,292 @@
|
||||
package net.buzzert.kordophone.backend
|
||||
|
||||
import com.google.gson.Gson
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.cancelAndJoin
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import net.buzzert.kordophone.backend.model.UpdateItem
|
||||
import net.buzzert.kordophone.backend.server.APIClient
|
||||
import net.buzzert.kordophone.backend.server.APIInterface
|
||||
import net.buzzert.kordophone.backend.server.AuthenticationRequest
|
||||
import net.buzzert.kordophone.backend.server.AuthenticationResponse
|
||||
import net.buzzert.kordophone.backend.server.SendMessageRequest
|
||||
import net.buzzert.kordophone.backend.server.SendMessageResponse
|
||||
import net.buzzert.kordophone.backend.server.UploadAttachmentResponse
|
||||
import net.buzzert.kordophone.backend.server.authenticatedWebSocketURL
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.MediaType.Companion.toMediaType
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.RequestBody
|
||||
import okhttp3.ResponseBody
|
||||
import okhttp3.ResponseBody.Companion.toResponseBody
|
||||
import okhttp3.WebSocket
|
||||
import okhttp3.WebSocketListener
|
||||
import okhttp3.mockwebserver.MockResponse
|
||||
import okhttp3.mockwebserver.MockWebServer
|
||||
import retrofit2.Response
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
@OptIn(ExperimentalStdlibApi::class)
|
||||
class MockServer {
|
||||
val version = "Kordophone-2.0"
|
||||
val conversations: MutableList<Conversation> = mutableListOf()
|
||||
val updateFlow: Flow<UpdateItem> get() = _updateFlow
|
||||
var updateMessageSequence: Int = 0
|
||||
|
||||
private val messages: MutableMap<String, MutableList<Message>> = mutableMapOf()
|
||||
private val _updateFlow: MutableSharedFlow<UpdateItem> = MutableSharedFlow()
|
||||
|
||||
private val client = MockServerClient(this)
|
||||
|
||||
companion object {
|
||||
fun generateMessage(parentConversation: Conversation): Message {
|
||||
return Message(
|
||||
date = Date(),
|
||||
text = "This is a test!",
|
||||
guid = UUID.randomUUID().toString(),
|
||||
sender = null,
|
||||
conversation = parentConversation,
|
||||
attachmentGUIDs = null,
|
||||
)
|
||||
}
|
||||
|
||||
fun generateConversation(): Conversation {
|
||||
return Conversation(
|
||||
date = Date(),
|
||||
participants = listOf("james@magahern.com"),
|
||||
displayName = null,
|
||||
unreadCount = 0,
|
||||
lastMessagePreview = null,
|
||||
lastMessage = null,
|
||||
guid = UUID.randomUUID().toString(),
|
||||
lastFetchedMessageGUID = null,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fun getServer(): MockWebServer = MockWebServer()
|
||||
fun getClient(): MockServerClient = client
|
||||
fun getAPIInterface(): APIInterface = MockServerClient(this).getAPIInterface()
|
||||
|
||||
fun addConversation(conversation: Conversation) {
|
||||
conversations.add(conversation)
|
||||
messages[conversation.guid] = mutableListOf()
|
||||
|
||||
runBlocking {
|
||||
_updateFlow.emit(UpdateItem(
|
||||
sequence = updateMessageSequence++,
|
||||
conversationChanged = conversation
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fun updateConversation(conversation: Conversation) {
|
||||
conversations.removeAll { it.guid == conversation.guid }
|
||||
addConversation(conversation)
|
||||
}
|
||||
|
||||
fun addMessagesToConversation(conversation: Conversation, messages: List<Message>) {
|
||||
val guid = conversation.guid
|
||||
this.messages[guid]?.addAll(messages)
|
||||
conversation.lastMessage = messages.last()
|
||||
conversation.lastMessagePreview = messages.last().text
|
||||
|
||||
runBlocking {
|
||||
for (message in messages) {
|
||||
_updateFlow.emit(
|
||||
UpdateItem(
|
||||
sequence = updateMessageSequence++,
|
||||
conversationChanged = conversation,
|
||||
messageAdded = message
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun addTestConversations(count: Int): List<Conversation> {
|
||||
val testConversations = ArrayList<Conversation>()
|
||||
for (i in 0..<count) {
|
||||
val conversation = MockServer.generateConversation()
|
||||
testConversations.add(conversation)
|
||||
addConversation(conversation)
|
||||
}
|
||||
|
||||
return testConversations
|
||||
}
|
||||
|
||||
fun addTestMessages(count: Int, conversation: Conversation): List<Message> {
|
||||
val testMessages = ArrayList<Message>()
|
||||
for (i in 0..<count) {
|
||||
val message = MockServer.generateMessage(conversation)
|
||||
testMessages.add(message)
|
||||
}
|
||||
|
||||
addMessagesToConversation(conversation, testMessages)
|
||||
return testMessages
|
||||
}
|
||||
|
||||
fun markConversationAsRead(guid: GUID) {
|
||||
val conversation = conversations.first { it.guid == guid }
|
||||
conversation.unreadCount = 0
|
||||
|
||||
updateConversation(conversation)
|
||||
}
|
||||
|
||||
internal fun getMessagesForConversationGUID(guid: GUID): List<Message>? {
|
||||
return messages[guid]?.toList()
|
||||
}
|
||||
|
||||
internal fun sendMessage(body: String, toConversationGUID: GUID): Message {
|
||||
val conversation = conversations.first { it.guid == toConversationGUID }
|
||||
|
||||
val message = Message(
|
||||
text = body,
|
||||
date = Date(),
|
||||
guid = UUID.randomUUID().toString(),
|
||||
sender = null, // me
|
||||
conversation = conversation,
|
||||
attachmentGUIDs = null,
|
||||
)
|
||||
|
||||
addMessagesToConversation(conversation, listOf(message))
|
||||
return message
|
||||
}
|
||||
}
|
||||
|
||||
class MockServerClient(private val server: MockServer): APIClient, WebSocketListener() {
|
||||
private var updateWebSocket: WebSocket? = null
|
||||
private var updateWatchJob: Job? = null
|
||||
private val gson: Gson = Gson()
|
||||
|
||||
override val isConfigured: Boolean = true
|
||||
|
||||
override fun getAPIInterface(): APIInterface {
|
||||
return MockServerInterface(server)
|
||||
}
|
||||
|
||||
override fun getWebSocketClient(
|
||||
serverPath: String,
|
||||
queryParams: Map<String, String>?,
|
||||
listener: WebSocketListener
|
||||
): WebSocket {
|
||||
val webServer = server.getServer()
|
||||
|
||||
val params = queryParams ?: mapOf()
|
||||
val baseHTTPURL: HttpUrl = webServer.url("/")
|
||||
val baseURL = baseHTTPURL.toUrl()
|
||||
val requestURL = baseURL.authenticatedWebSocketURL(serverPath, params)
|
||||
val request = Request.Builder()
|
||||
.url(requestURL)
|
||||
.build()
|
||||
|
||||
webServer.enqueue(MockResponse().withWebSocketUpgrade(this))
|
||||
|
||||
if (this.updateWatchJob == null) {
|
||||
CoroutineScope(Job()).launch {
|
||||
startWatchingForUpdates(this)
|
||||
}
|
||||
}
|
||||
|
||||
return OkHttpClient().newWebSocket(request, listener)
|
||||
}
|
||||
|
||||
private fun startWatchingForUpdates(scope: CoroutineScope) {
|
||||
this.updateWatchJob = scope.launch {
|
||||
server.updateFlow.collect {
|
||||
println("Mock WebSocket is sending a message")
|
||||
|
||||
// Encode to JSON and send to websocket
|
||||
val updateItems = listOf(it)
|
||||
val encodedUpdateItem = gson.toJson(updateItems)
|
||||
updateWebSocket?.send(encodedUpdateItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun stopWatchingForUpdates() = runBlocking {
|
||||
updateWatchJob?.cancelAndJoin()
|
||||
}
|
||||
|
||||
override fun onOpen(webSocket: WebSocket, response: okhttp3.Response) {
|
||||
super.onOpen(webSocket, response)
|
||||
|
||||
println("Mock WebSocket opened.")
|
||||
this.updateWebSocket = webSocket
|
||||
}
|
||||
|
||||
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
|
||||
super.onClosed(webSocket, code, reason)
|
||||
|
||||
println("Mock WebSocket closed.")
|
||||
this.updateWebSocket = null
|
||||
}
|
||||
}
|
||||
|
||||
class MockServerInterface(private val server: MockServer): APIInterface {
|
||||
override suspend fun getVersion(): ResponseBody {
|
||||
return server.version.toResponseBody("text/plain".toMediaType())
|
||||
}
|
||||
|
||||
override suspend fun getConversations(): Response<List<Conversation>> {
|
||||
return Response.success(server.conversations)
|
||||
}
|
||||
|
||||
override suspend fun getMessages(
|
||||
conversationGUID: String,
|
||||
limit: Int?,
|
||||
beforeMessageGUID: GUID?,
|
||||
afterMessageGUID: GUID?
|
||||
): Response<List<Message>> {
|
||||
val messages = server.getMessagesForConversationGUID(conversationGUID)
|
||||
|
||||
return if (messages != null) {
|
||||
Response.success(messages)
|
||||
} else {
|
||||
Response.error(500, "GUID not found".toResponseBody())
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun sendMessage(request: SendMessageRequest): Response<SendMessageResponse> {
|
||||
val message = server.sendMessage(request.body, request.conversationGUID)
|
||||
|
||||
val response = SendMessageResponse(message.guid)
|
||||
return Response.success(response)
|
||||
}
|
||||
|
||||
override suspend fun markConversation(conversationGUID: String): Response<Void> {
|
||||
server.markConversationAsRead(conversationGUID)
|
||||
return Response.success(null)
|
||||
}
|
||||
|
||||
override suspend fun fetchAttachment(guid: String, preview: Boolean): ResponseBody {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override suspend fun uploadAttachment(
|
||||
filename: String,
|
||||
body: RequestBody
|
||||
): Response<UploadAttachmentResponse> {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override suspend fun authenticate(request: AuthenticationRequest): Response<AuthenticationResponse> {
|
||||
// Anything goes!
|
||||
val response = AuthenticationResponse(
|
||||
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." +
|
||||
"eyJ1c2VybmFtZSI6InRlc3QiLCJleHAiOjE3MDk3OTQ5NjB9." +
|
||||
"82UcI1gB4eARmgrKwAY6JnbEdWLXou1GWp29scnUhi8"
|
||||
)
|
||||
|
||||
return Response.success(response)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package android.util
|
||||
|
||||
public class Log {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun d(tag: String, msg: String): Int {
|
||||
println("DEBUG: $tag: $msg")
|
||||
return 0
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun i(tag: String, msg: String): Int {
|
||||
println("INFO: $tag: $msg")
|
||||
return 0
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun w(tag: String, msg: String): Int {
|
||||
println("WARN: $tag: $msg")
|
||||
return 0
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun e(tag: String, msg: String): Int {
|
||||
println("ERROR: $tag: $msg")
|
||||
return 0
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user