Implements UpdateMonitor
This commit is contained in:
@@ -0,0 +1,14 @@
|
||||
package net.buzzert.kordophone.backend.model
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
|
||||
data class UpdateItem(
|
||||
@SerializedName("messageSequenceNumber")
|
||||
val sequence: Int,
|
||||
|
||||
@SerializedName("conversation")
|
||||
val conversationChanged: Conversation? = null,
|
||||
|
||||
@SerializedName("message")
|
||||
val messageAdded: Message? = null,
|
||||
)
|
||||
@@ -1,16 +1,52 @@
|
||||
package net.buzzert.kordophone.backend.server
|
||||
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.WebSocket
|
||||
import okhttp3.WebSocketListener
|
||||
import retrofit2.Retrofit
|
||||
import retrofit2.converter.gson.GsonConverterFactory
|
||||
import java.net.URL
|
||||
|
||||
class APIClient(baseURL: URL) {
|
||||
interface APIClient {
|
||||
fun getAPIInterface(): APIInterface
|
||||
fun getWebSocketClient(
|
||||
serverPath: String,
|
||||
authToken: String? = null,
|
||||
listener: WebSocketListener
|
||||
): WebSocket
|
||||
}
|
||||
|
||||
class RetrofitAPIClient(private val baseURL: URL): APIClient {
|
||||
private val retrofit: Retrofit = Retrofit.Builder()
|
||||
.baseUrl(baseURL)
|
||||
.addConverterFactory(GsonConverterFactory.create())
|
||||
.build()
|
||||
|
||||
fun getClient(): Retrofit {
|
||||
return retrofit
|
||||
override fun getAPIInterface(): APIInterface {
|
||||
return retrofit.create(APIInterface::class.java)
|
||||
}
|
||||
|
||||
override fun getWebSocketClient(serverPath: String, authToken: String?, listener: WebSocketListener): WebSocket {
|
||||
val requestURL = baseURL.authenticatedWebSocketURL(serverPath, authToken)
|
||||
val request = Request.Builder()
|
||||
.url(requestURL)
|
||||
.build()
|
||||
|
||||
return OkHttpClient().newWebSocket(request, listener)
|
||||
}
|
||||
}
|
||||
|
||||
fun URL.authenticatedWebSocketURL(serverPath: String, authToken: String? = null): URL {
|
||||
val baseURI = HttpUrl.parse(this.toString())!!
|
||||
val requestURL = baseURI.newBuilder()
|
||||
.host(baseURI.host())
|
||||
.addEncodedPathSegments(serverPath)
|
||||
|
||||
if (authToken != null) {
|
||||
requestURL.addQueryParameter("token", authToken)
|
||||
}
|
||||
|
||||
return URL(requestURL.build().toString())
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
package net.buzzert.kordophone.backend.server
|
||||
|
||||
import android.util.Log
|
||||
import com.google.gson.Gson
|
||||
import com.google.gson.reflect.TypeToken
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import net.buzzert.kordophone.backend.model.UpdateItem
|
||||
import okhttp3.Response
|
||||
import okhttp3.WebSocket
|
||||
import okhttp3.WebSocketListener
|
||||
import okio.ByteString
|
||||
import retrofit2.converter.gson.GsonConverterFactory
|
||||
import java.lang.reflect.Type
|
||||
|
||||
const val UPMON_LOG: String = "ChatRepository"
|
||||
|
||||
class UpdateMonitor(private val client: APIClient) : WebSocketListener() {
|
||||
// Flow for getting conversation changed notifications
|
||||
val conversationChanged: Flow<Conversation>
|
||||
get() = _conversationChanged
|
||||
|
||||
// Flow for messages added notifications
|
||||
val messageAdded: Flow<Message>
|
||||
get() = _messageAdded
|
||||
|
||||
private val gson: Gson = Gson()
|
||||
private val updateItemsType: Type = object : TypeToken<ArrayList<UpdateItem>>() {}.type
|
||||
private var webSocket: WebSocket? = null
|
||||
|
||||
private val _conversationChanged: MutableSharedFlow<Conversation> = MutableSharedFlow()
|
||||
private val _messageAdded: MutableSharedFlow<Message> = MutableSharedFlow()
|
||||
|
||||
|
||||
fun beginMonitoringUpdates() {
|
||||
Log.d(UPMON_LOG, "Opening websocket connection")
|
||||
this.webSocket = client.getWebSocketClient("/updates", null, this)
|
||||
}
|
||||
|
||||
fun stopMonitoringForUpdates() {
|
||||
this.webSocket?.close(1000, "Closing on program request.")
|
||||
}
|
||||
|
||||
private fun processEncodedSocketMessage(message: String) = runBlocking {
|
||||
val reader = message.reader()
|
||||
val jsonReader = gson.newJsonReader(reader)
|
||||
|
||||
val updateItems: List<UpdateItem> = gson.fromJson(message, updateItemsType)
|
||||
for (updateItem: UpdateItem in updateItems) {
|
||||
if (updateItem.conversationChanged != null) {
|
||||
_conversationChanged.emit(updateItem.conversationChanged)
|
||||
}
|
||||
|
||||
if (updateItem.messageAdded != null) {
|
||||
_messageAdded.emit(updateItem.messageAdded)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// <WebSocketListener>
|
||||
|
||||
override fun onOpen(webSocket: WebSocket, response: Response) {
|
||||
super.onOpen(webSocket, response)
|
||||
Log.d(UPMON_LOG, "Update monitor websocket open")
|
||||
}
|
||||
|
||||
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
|
||||
super.onClosed(webSocket, code, reason)
|
||||
Log.d(UPMON_LOG, "Update monitor socket closed")
|
||||
}
|
||||
|
||||
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
|
||||
super.onFailure(webSocket, t, response)
|
||||
Log.d(UPMON_LOG, "Update monitor socket failure: ${t.message} :: Response: ${response?.body()}")
|
||||
}
|
||||
|
||||
override fun onMessage(webSocket: WebSocket, text: String) {
|
||||
super.onMessage(webSocket, text)
|
||||
Log.d(UPMON_LOG, "Update monitor websocket received text message")
|
||||
processEncodedSocketMessage(text)
|
||||
}
|
||||
|
||||
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
|
||||
super.onMessage(webSocket, bytes)
|
||||
Log.d(UPMON_LOG, "Update monitor websocket received bytes message")
|
||||
processEncodedSocketMessage(bytes.utf8())
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,8 @@ import net.buzzert.kordophone.backend.db.CachedChatDatabase
|
||||
import net.buzzert.kordophone.backend.server.APIClient
|
||||
import net.buzzert.kordophone.backend.server.APIInterface
|
||||
import net.buzzert.kordophone.backend.server.ChatRepository
|
||||
import net.buzzert.kordophone.backend.server.RetrofitAPIClient
|
||||
import net.buzzert.kordophone.backend.server.UpdateMonitor
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Test
|
||||
@@ -17,17 +19,18 @@ import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class BackendTests {
|
||||
private fun liveRepository(host: String): ChatRepository {
|
||||
val client = APIClient(URL(host))
|
||||
val apiInterface = client.getClient().create(APIInterface::class.java)
|
||||
private fun liveRepository(host: String): Pair<ChatRepository, RetrofitAPIClient> {
|
||||
val client = RetrofitAPIClient(URL(host))
|
||||
val apiInterface = client.getAPIInterface()
|
||||
val database = CachedChatDatabase.testDatabase()
|
||||
return ChatRepository(apiInterface, database)
|
||||
val repository = ChatRepository(apiInterface, database)
|
||||
return Pair(repository, client)
|
||||
}
|
||||
|
||||
private fun mockRepository(): Pair<ChatRepository, MockServer> {
|
||||
val mockServer = MockServer()
|
||||
val database = CachedChatDatabase.testDatabase()
|
||||
val repository = ChatRepository(mockServer, database)
|
||||
val repository = ChatRepository(mockServer.getAPIInterface(), database)
|
||||
return Pair(repository, mockServer)
|
||||
}
|
||||
|
||||
@@ -161,4 +164,41 @@ class BackendTests {
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testUpdateMonitor() = runBlocking {
|
||||
val mockServer = MockServer()
|
||||
val mockAPIClient = mockServer.getClient()
|
||||
val updateMonitor = UpdateMonitor(mockAPIClient)
|
||||
|
||||
// Set up flow watcher, asynchronously
|
||||
val updateLatch = CountDownLatch(1)
|
||||
val job = launch {
|
||||
updateMonitor.beginMonitoringUpdates()
|
||||
updateMonitor.conversationChanged.collect {
|
||||
println("Got conversation changed: $it")
|
||||
updateLatch.countDown()
|
||||
|
||||
updateMonitor.stopMonitoringForUpdates()
|
||||
mockAPIClient.stopWatchingForUpdates()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
withContext(Dispatchers.IO) {
|
||||
mockAPIClient.startWatchingForUpdates(this)
|
||||
|
||||
Thread.sleep(500)
|
||||
|
||||
// Add a conversation
|
||||
println("Adding conversation")
|
||||
mockServer.addTestConversations(1)
|
||||
|
||||
// Wait for the coroutine that is collecting the flow to finish
|
||||
job.join()
|
||||
|
||||
// Ensure the updates have been processed before proceeding
|
||||
assertTrue(updateLatch.await(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,20 +1,47 @@
|
||||
package net.buzzert.kordophone.backend
|
||||
|
||||
import com.google.gson.Gson
|
||||
import com.google.gson.reflect.TypeToken
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.cancelAndJoin
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import net.buzzert.kordophone.backend.model.Conversation
|
||||
import net.buzzert.kordophone.backend.model.GUID
|
||||
import net.buzzert.kordophone.backend.model.Message
|
||||
import net.buzzert.kordophone.backend.model.UpdateItem
|
||||
import net.buzzert.kordophone.backend.server.APIClient
|
||||
import net.buzzert.kordophone.backend.server.APIInterface
|
||||
import net.buzzert.kordophone.backend.server.SendMessageRequest
|
||||
import net.buzzert.kordophone.backend.server.authenticatedWebSocketURL
|
||||
import okhttp3.HttpUrl
|
||||
import okhttp3.MediaType
|
||||
import okhttp3.MediaType.Companion.toMediaType
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.ResponseBody
|
||||
import okhttp3.ResponseBody.Companion.toResponseBody
|
||||
import okhttp3.WebSocket
|
||||
import okhttp3.WebSocketListener
|
||||
import okhttp3.mockwebserver.MockResponse
|
||||
import okhttp3.mockwebserver.MockWebServer
|
||||
import retrofit2.Response
|
||||
import java.net.URL
|
||||
import java.util.Date
|
||||
import java.util.UUID
|
||||
|
||||
class MockServer: APIInterface {
|
||||
class MockServer {
|
||||
val version = "Kordophone-2.0"
|
||||
val conversations: MutableList<Conversation> = mutableListOf()
|
||||
val messages: MutableMap<String, MutableList<Message>> = mutableMapOf()
|
||||
val updateFlow: Flow<UpdateItem> get() = _updateFlow
|
||||
var updateMessageSequence: Int = 0
|
||||
|
||||
private val messages: MutableMap<String, MutableList<Message>> = mutableMapOf()
|
||||
private val _updateFlow: MutableSharedFlow<UpdateItem> = MutableSharedFlow()
|
||||
|
||||
companion object {
|
||||
fun generateMessage(): Message {
|
||||
@@ -40,35 +67,19 @@ class MockServer: APIInterface {
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun getVersion(): ResponseBody {
|
||||
return ResponseBody.create("text/plain".toMediaType(), version)
|
||||
}
|
||||
|
||||
override suspend fun getConversations(): Response<List<Conversation>> {
|
||||
return Response.success(conversations)
|
||||
}
|
||||
|
||||
override suspend fun getMessages(conversationGUID: String): Response<List<Message>> {
|
||||
return Response.success(messages[conversationGUID])
|
||||
}
|
||||
|
||||
override suspend fun sendMessage(request: SendMessageRequest): Response<Void> {
|
||||
val message = Message(
|
||||
text = request.body,
|
||||
date = Date(),
|
||||
guid = UUID.randomUUID().toString(),
|
||||
sender = null, // me
|
||||
)
|
||||
|
||||
messages[request.conversationGUID]?.add(message) ?: run {
|
||||
messages[request.conversationGUID] = mutableListOf(message)
|
||||
}
|
||||
|
||||
return Response.success(null)
|
||||
}
|
||||
fun getServer(): MockWebServer = MockWebServer()
|
||||
fun getClient(): MockServerClient = MockServerClient(this)
|
||||
fun getAPIInterface(): APIInterface = MockServerClient(this).getAPIInterface()
|
||||
|
||||
fun addConversation(conversation: Conversation) {
|
||||
conversations.add(conversation)
|
||||
|
||||
runBlocking {
|
||||
_updateFlow.emit(UpdateItem(
|
||||
sequence = updateMessageSequence++,
|
||||
conversationChanged = conversation
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fun addMessagesToConversation(conversation: Conversation, messages: List<Message>) {
|
||||
@@ -76,6 +87,14 @@ class MockServer: APIInterface {
|
||||
this.messages[guid]?.addAll(messages) ?: run {
|
||||
this.messages[guid] = messages.toMutableList()
|
||||
}
|
||||
|
||||
runBlocking {
|
||||
_updateFlow.emit(UpdateItem(
|
||||
sequence = updateMessageSequence++,
|
||||
conversationChanged = conversation,
|
||||
messageAdded = messages.first()
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fun addTestConversations(count: Int): List<Conversation> {
|
||||
@@ -99,4 +118,105 @@ class MockServer: APIInterface {
|
||||
addMessagesToConversation(conversation, testMessages)
|
||||
return testMessages
|
||||
}
|
||||
|
||||
internal fun getMessagesForConversationGUID(guid: GUID): List<Message>? {
|
||||
return messages[guid]?.toList()
|
||||
}
|
||||
|
||||
internal fun sendMessage(body: String, toConversationGUID: GUID) {
|
||||
val message = Message(
|
||||
text = body,
|
||||
date = Date(),
|
||||
guid = UUID.randomUUID().toString(),
|
||||
sender = null, // me
|
||||
)
|
||||
|
||||
val conversation = conversations.first { it.guid == toConversationGUID }
|
||||
addMessagesToConversation(conversation, listOf(message))
|
||||
}
|
||||
}
|
||||
|
||||
class MockServerClient(private val server: MockServer): APIClient, WebSocketListener() {
|
||||
private var updateWebSocket: WebSocket? = null
|
||||
private var updateWatchJob: Job? = null
|
||||
private val gson: Gson = Gson()
|
||||
|
||||
override fun getAPIInterface(): APIInterface {
|
||||
return MockServerInterface(server)
|
||||
}
|
||||
|
||||
override fun getWebSocketClient(
|
||||
serverPath: String,
|
||||
authToken: String?,
|
||||
listener: WebSocketListener
|
||||
): WebSocket {
|
||||
val webServer = server.getServer()
|
||||
|
||||
val baseHTTPURL: HttpUrl = webServer.url("/")
|
||||
val baseURL = baseHTTPURL.toUrl()
|
||||
val requestURL = baseURL.authenticatedWebSocketURL(serverPath, authToken)
|
||||
val request = Request.Builder()
|
||||
.url(requestURL)
|
||||
.build()
|
||||
|
||||
webServer.enqueue(MockResponse().withWebSocketUpgrade(this))
|
||||
|
||||
return OkHttpClient().newWebSocket(request, listener)
|
||||
}
|
||||
|
||||
fun startWatchingForUpdates(scope: CoroutineScope) {
|
||||
this.updateWatchJob = scope.launch {
|
||||
server.updateFlow.collect {
|
||||
println("Mock WebSocket is sending a message")
|
||||
|
||||
// Encode to JSON and send to websocket
|
||||
val updateItems = listOf(it)
|
||||
val encodedUpdateItem = gson.toJson(updateItems)
|
||||
updateWebSocket?.send(encodedUpdateItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun stopWatchingForUpdates() = runBlocking {
|
||||
updateWatchJob?.cancelAndJoin()
|
||||
}
|
||||
|
||||
override fun onOpen(webSocket: WebSocket, response: okhttp3.Response) {
|
||||
super.onOpen(webSocket, response)
|
||||
|
||||
println("Mock WebSocket opened.")
|
||||
this.updateWebSocket = webSocket
|
||||
}
|
||||
|
||||
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
|
||||
super.onClosed(webSocket, code, reason)
|
||||
|
||||
println("Mock WebSocket closed.")
|
||||
this.updateWebSocket = null
|
||||
}
|
||||
}
|
||||
|
||||
class MockServerInterface(private val server: MockServer): APIInterface {
|
||||
override suspend fun getVersion(): ResponseBody {
|
||||
return server.version.toResponseBody("text/plain".toMediaType())
|
||||
}
|
||||
|
||||
override suspend fun getConversations(): Response<List<Conversation>> {
|
||||
return Response.success(server.conversations)
|
||||
}
|
||||
|
||||
override suspend fun getMessages(conversationGUID: String): Response<List<Message>> {
|
||||
val messages = server.getMessagesForConversationGUID(conversationGUID)
|
||||
|
||||
return if (messages != null) {
|
||||
Response.success(messages)
|
||||
} else {
|
||||
Response.error(500, "GUID not found".toResponseBody())
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun sendMessage(request: SendMessageRequest): Response<Void> {
|
||||
server.sendMessage(request.body, request.conversationGUID)
|
||||
return Response.success(null)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user