From 62ea95099ada6a3d3d2f1abb485332f0735255d8 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sun, 13 Aug 2023 18:41:49 -0700 Subject: [PATCH] Implements UpdateMonitor --- .../kordophone/backend/model/UpdateItem.kt | 14 ++ .../kordophone/backend/server/APIClient.kt | 42 ++++- .../backend/server/UpdateMonitor.kt | 91 +++++++++ .../kordophone/backend/BackendTests.kt | 50 ++++- .../buzzert/kordophone/backend/MockServer.kt | 176 +++++++++++++++--- 5 files changed, 337 insertions(+), 36 deletions(-) create mode 100644 backend/src/main/java/net/buzzert/kordophone/backend/model/UpdateItem.kt create mode 100644 backend/src/main/java/net/buzzert/kordophone/backend/server/UpdateMonitor.kt diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/model/UpdateItem.kt b/backend/src/main/java/net/buzzert/kordophone/backend/model/UpdateItem.kt new file mode 100644 index 0000000..559627d --- /dev/null +++ b/backend/src/main/java/net/buzzert/kordophone/backend/model/UpdateItem.kt @@ -0,0 +1,14 @@ +package net.buzzert.kordophone.backend.model + +import com.google.gson.annotations.SerializedName + +data class UpdateItem( + @SerializedName("messageSequenceNumber") + val sequence: Int, + + @SerializedName("conversation") + val conversationChanged: Conversation? = null, + + @SerializedName("message") + val messageAdded: Message? = null, +) \ No newline at end of file diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/server/APIClient.kt b/backend/src/main/java/net/buzzert/kordophone/backend/server/APIClient.kt index f9e325d..e185792 100644 --- a/backend/src/main/java/net/buzzert/kordophone/backend/server/APIClient.kt +++ b/backend/src/main/java/net/buzzert/kordophone/backend/server/APIClient.kt @@ -1,16 +1,52 @@ package net.buzzert.kordophone.backend.server +import okhttp3.HttpUrl +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.WebSocket +import okhttp3.WebSocketListener import retrofit2.Retrofit import retrofit2.converter.gson.GsonConverterFactory import java.net.URL -class APIClient(baseURL: URL) { +interface APIClient { + fun getAPIInterface(): APIInterface + fun getWebSocketClient( + serverPath: String, + authToken: String? = null, + listener: WebSocketListener + ): WebSocket +} + +class RetrofitAPIClient(private val baseURL: URL): APIClient { private val retrofit: Retrofit = Retrofit.Builder() .baseUrl(baseURL) .addConverterFactory(GsonConverterFactory.create()) .build() - fun getClient(): Retrofit { - return retrofit + override fun getAPIInterface(): APIInterface { + return retrofit.create(APIInterface::class.java) } + + override fun getWebSocketClient(serverPath: String, authToken: String?, listener: WebSocketListener): WebSocket { + val requestURL = baseURL.authenticatedWebSocketURL(serverPath, authToken) + val request = Request.Builder() + .url(requestURL) + .build() + + return OkHttpClient().newWebSocket(request, listener) + } +} + +fun URL.authenticatedWebSocketURL(serverPath: String, authToken: String? = null): URL { + val baseURI = HttpUrl.parse(this.toString())!! + val requestURL = baseURI.newBuilder() + .host(baseURI.host()) + .addEncodedPathSegments(serverPath) + + if (authToken != null) { + requestURL.addQueryParameter("token", authToken) + } + + return URL(requestURL.build().toString()) } \ No newline at end of file diff --git a/backend/src/main/java/net/buzzert/kordophone/backend/server/UpdateMonitor.kt b/backend/src/main/java/net/buzzert/kordophone/backend/server/UpdateMonitor.kt new file mode 100644 index 0000000..aa4b1af --- /dev/null +++ b/backend/src/main/java/net/buzzert/kordophone/backend/server/UpdateMonitor.kt @@ -0,0 +1,91 @@ +package net.buzzert.kordophone.backend.server + +import android.util.Log +import com.google.gson.Gson +import com.google.gson.reflect.TypeToken +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.runBlocking +import net.buzzert.kordophone.backend.model.Conversation +import net.buzzert.kordophone.backend.model.Message +import net.buzzert.kordophone.backend.model.UpdateItem +import okhttp3.Response +import okhttp3.WebSocket +import okhttp3.WebSocketListener +import okio.ByteString +import retrofit2.converter.gson.GsonConverterFactory +import java.lang.reflect.Type + +const val UPMON_LOG: String = "ChatRepository" + +class UpdateMonitor(private val client: APIClient) : WebSocketListener() { + // Flow for getting conversation changed notifications + val conversationChanged: Flow + get() = _conversationChanged + + // Flow for messages added notifications + val messageAdded: Flow + get() = _messageAdded + + private val gson: Gson = Gson() + private val updateItemsType: Type = object : TypeToken>() {}.type + private var webSocket: WebSocket? = null + + private val _conversationChanged: MutableSharedFlow = MutableSharedFlow() + private val _messageAdded: MutableSharedFlow = MutableSharedFlow() + + + fun beginMonitoringUpdates() { + Log.d(UPMON_LOG, "Opening websocket connection") + this.webSocket = client.getWebSocketClient("/updates", null, this) + } + + fun stopMonitoringForUpdates() { + this.webSocket?.close(1000, "Closing on program request.") + } + + private fun processEncodedSocketMessage(message: String) = runBlocking { + val reader = message.reader() + val jsonReader = gson.newJsonReader(reader) + + val updateItems: List = gson.fromJson(message, updateItemsType) + for (updateItem: UpdateItem in updateItems) { + if (updateItem.conversationChanged != null) { + _conversationChanged.emit(updateItem.conversationChanged) + } + + if (updateItem.messageAdded != null) { + _messageAdded.emit(updateItem.messageAdded) + } + } + } + + // + + override fun onOpen(webSocket: WebSocket, response: Response) { + super.onOpen(webSocket, response) + Log.d(UPMON_LOG, "Update monitor websocket open") + } + + override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { + super.onClosed(webSocket, code, reason) + Log.d(UPMON_LOG, "Update monitor socket closed") + } + + override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { + super.onFailure(webSocket, t, response) + Log.d(UPMON_LOG, "Update monitor socket failure: ${t.message} :: Response: ${response?.body()}") + } + + override fun onMessage(webSocket: WebSocket, text: String) { + super.onMessage(webSocket, text) + Log.d(UPMON_LOG, "Update monitor websocket received text message") + processEncodedSocketMessage(text) + } + + override fun onMessage(webSocket: WebSocket, bytes: ByteString) { + super.onMessage(webSocket, bytes) + Log.d(UPMON_LOG, "Update monitor websocket received bytes message") + processEncodedSocketMessage(bytes.utf8()) + } +} \ No newline at end of file diff --git a/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt b/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt index e72df0f..15c739d 100644 --- a/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt +++ b/backend/src/test/java/net/buzzert/kordophone/backend/BackendTests.kt @@ -9,6 +9,8 @@ import net.buzzert.kordophone.backend.db.CachedChatDatabase import net.buzzert.kordophone.backend.server.APIClient import net.buzzert.kordophone.backend.server.APIInterface import net.buzzert.kordophone.backend.server.ChatRepository +import net.buzzert.kordophone.backend.server.RetrofitAPIClient +import net.buzzert.kordophone.backend.server.UpdateMonitor import org.junit.Assert.assertEquals import org.junit.Assert.assertTrue import org.junit.Test @@ -17,17 +19,18 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit class BackendTests { - private fun liveRepository(host: String): ChatRepository { - val client = APIClient(URL(host)) - val apiInterface = client.getClient().create(APIInterface::class.java) + private fun liveRepository(host: String): Pair { + val client = RetrofitAPIClient(URL(host)) + val apiInterface = client.getAPIInterface() val database = CachedChatDatabase.testDatabase() - return ChatRepository(apiInterface, database) + val repository = ChatRepository(apiInterface, database) + return Pair(repository, client) } private fun mockRepository(): Pair { val mockServer = MockServer() val database = CachedChatDatabase.testDatabase() - val repository = ChatRepository(mockServer, database) + val repository = ChatRepository(mockServer.getAPIInterface(), database) return Pair(repository, mockServer) } @@ -161,4 +164,41 @@ class BackendTests { assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) } } + + @Test + fun testUpdateMonitor() = runBlocking { + val mockServer = MockServer() + val mockAPIClient = mockServer.getClient() + val updateMonitor = UpdateMonitor(mockAPIClient) + + // Set up flow watcher, asynchronously + val updateLatch = CountDownLatch(1) + val job = launch { + updateMonitor.beginMonitoringUpdates() + updateMonitor.conversationChanged.collect { + println("Got conversation changed: $it") + updateLatch.countDown() + + updateMonitor.stopMonitoringForUpdates() + mockAPIClient.stopWatchingForUpdates() + cancel() + } + } + + withContext(Dispatchers.IO) { + mockAPIClient.startWatchingForUpdates(this) + + Thread.sleep(500) + + // Add a conversation + println("Adding conversation") + mockServer.addTestConversations(1) + + // Wait for the coroutine that is collecting the flow to finish + job.join() + + // Ensure the updates have been processed before proceeding + assertTrue(updateLatch.await(1, TimeUnit.SECONDS)) + } + } } \ No newline at end of file diff --git a/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt b/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt index a85a6e0..60a8cdc 100644 --- a/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt +++ b/backend/src/test/java/net/buzzert/kordophone/backend/MockServer.kt @@ -1,20 +1,47 @@ package net.buzzert.kordophone.backend +import com.google.gson.Gson +import com.google.gson.reflect.TypeToken +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import net.buzzert.kordophone.backend.model.Conversation +import net.buzzert.kordophone.backend.model.GUID import net.buzzert.kordophone.backend.model.Message +import net.buzzert.kordophone.backend.model.UpdateItem +import net.buzzert.kordophone.backend.server.APIClient import net.buzzert.kordophone.backend.server.APIInterface import net.buzzert.kordophone.backend.server.SendMessageRequest +import net.buzzert.kordophone.backend.server.authenticatedWebSocketURL +import okhttp3.HttpUrl import okhttp3.MediaType import okhttp3.MediaType.Companion.toMediaType +import okhttp3.OkHttpClient +import okhttp3.Request import okhttp3.ResponseBody +import okhttp3.ResponseBody.Companion.toResponseBody +import okhttp3.WebSocket +import okhttp3.WebSocketListener +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer import retrofit2.Response +import java.net.URL import java.util.Date import java.util.UUID -class MockServer: APIInterface { +class MockServer { val version = "Kordophone-2.0" val conversations: MutableList = mutableListOf() - val messages: MutableMap> = mutableMapOf() + val updateFlow: Flow get() = _updateFlow + var updateMessageSequence: Int = 0 + + private val messages: MutableMap> = mutableMapOf() + private val _updateFlow: MutableSharedFlow = MutableSharedFlow() companion object { fun generateMessage(): Message { @@ -40,35 +67,19 @@ class MockServer: APIInterface { } } - override suspend fun getVersion(): ResponseBody { - return ResponseBody.create("text/plain".toMediaType(), version) - } - - override suspend fun getConversations(): Response> { - return Response.success(conversations) - } - - override suspend fun getMessages(conversationGUID: String): Response> { - return Response.success(messages[conversationGUID]) - } - - override suspend fun sendMessage(request: SendMessageRequest): Response { - val message = Message( - text = request.body, - date = Date(), - guid = UUID.randomUUID().toString(), - sender = null, // me - ) - - messages[request.conversationGUID]?.add(message) ?: run { - messages[request.conversationGUID] = mutableListOf(message) - } - - return Response.success(null) - } + fun getServer(): MockWebServer = MockWebServer() + fun getClient(): MockServerClient = MockServerClient(this) + fun getAPIInterface(): APIInterface = MockServerClient(this).getAPIInterface() fun addConversation(conversation: Conversation) { conversations.add(conversation) + + runBlocking { + _updateFlow.emit(UpdateItem( + sequence = updateMessageSequence++, + conversationChanged = conversation + )) + } } fun addMessagesToConversation(conversation: Conversation, messages: List) { @@ -76,6 +87,14 @@ class MockServer: APIInterface { this.messages[guid]?.addAll(messages) ?: run { this.messages[guid] = messages.toMutableList() } + + runBlocking { + _updateFlow.emit(UpdateItem( + sequence = updateMessageSequence++, + conversationChanged = conversation, + messageAdded = messages.first() + )) + } } fun addTestConversations(count: Int): List { @@ -99,4 +118,105 @@ class MockServer: APIInterface { addMessagesToConversation(conversation, testMessages) return testMessages } + + internal fun getMessagesForConversationGUID(guid: GUID): List? { + return messages[guid]?.toList() + } + + internal fun sendMessage(body: String, toConversationGUID: GUID) { + val message = Message( + text = body, + date = Date(), + guid = UUID.randomUUID().toString(), + sender = null, // me + ) + + val conversation = conversations.first { it.guid == toConversationGUID } + addMessagesToConversation(conversation, listOf(message)) + } +} + +class MockServerClient(private val server: MockServer): APIClient, WebSocketListener() { + private var updateWebSocket: WebSocket? = null + private var updateWatchJob: Job? = null + private val gson: Gson = Gson() + + override fun getAPIInterface(): APIInterface { + return MockServerInterface(server) + } + + override fun getWebSocketClient( + serverPath: String, + authToken: String?, + listener: WebSocketListener + ): WebSocket { + val webServer = server.getServer() + + val baseHTTPURL: HttpUrl = webServer.url("/") + val baseURL = baseHTTPURL.toUrl() + val requestURL = baseURL.authenticatedWebSocketURL(serverPath, authToken) + val request = Request.Builder() + .url(requestURL) + .build() + + webServer.enqueue(MockResponse().withWebSocketUpgrade(this)) + + return OkHttpClient().newWebSocket(request, listener) + } + + fun startWatchingForUpdates(scope: CoroutineScope) { + this.updateWatchJob = scope.launch { + server.updateFlow.collect { + println("Mock WebSocket is sending a message") + + // Encode to JSON and send to websocket + val updateItems = listOf(it) + val encodedUpdateItem = gson.toJson(updateItems) + updateWebSocket?.send(encodedUpdateItem) + } + } + } + + fun stopWatchingForUpdates() = runBlocking { + updateWatchJob?.cancelAndJoin() + } + + override fun onOpen(webSocket: WebSocket, response: okhttp3.Response) { + super.onOpen(webSocket, response) + + println("Mock WebSocket opened.") + this.updateWebSocket = webSocket + } + + override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { + super.onClosed(webSocket, code, reason) + + println("Mock WebSocket closed.") + this.updateWebSocket = null + } +} + +class MockServerInterface(private val server: MockServer): APIInterface { + override suspend fun getVersion(): ResponseBody { + return server.version.toResponseBody("text/plain".toMediaType()) + } + + override suspend fun getConversations(): Response> { + return Response.success(server.conversations) + } + + override suspend fun getMessages(conversationGUID: String): Response> { + val messages = server.getMessagesForConversationGUID(conversationGUID) + + return if (messages != null) { + Response.success(messages) + } else { + Response.error(500, "GUID not found".toResponseBody()) + } + } + + override suspend fun sendMessage(request: SendMessageRequest): Response { + server.sendMessage(request.body, request.conversationGUID) + return Response.success(null) + } } \ No newline at end of file