Private
Public Access
1
0

Implements pollUpdates

This commit is contained in:
2023-07-19 11:58:13 -06:00
parent 7611bedef7
commit 1fce2c7cb3
5 changed files with 182 additions and 6 deletions

View File

@@ -10,7 +10,7 @@ import (
type Conversation struct {
Date time.Time `json:"date"`
Participants []string `json:"participantDisplayNames"`
DisplayName *string `json:"displayName"` // Optional
DisplayName *string `json:"displayName,omitempty"` // Optional
UnreadCount int `json:"unreadCount"`
LastMessagePreview string `json:"lastMessagePreview"`
Guid string `json:"guid"`

26
model/update.go Normal file
View File

@@ -0,0 +1,26 @@
package model
import "github.com/rs/zerolog"
type UpdateItem struct {
MessageSequenceNumber int `json:"messageSequenceNumber"`
Conversation *Conversation `json:"conversation,omitempty"`
Message *Message `json:"message,omitempty"`
}
func New(conversation *Conversation, message *Message) *UpdateItem {
return &UpdateItem{
Conversation: conversation,
Message: message,
}
}
func (i *UpdateItem) MarshalZerologObject(e *zerolog.Event) {
e.Int("messageSequenceNumber", i.MessageSequenceNumber)
if i.Conversation != nil {
e.Object("conversation", i.Conversation)
}
if i.Message != nil {
e.Object("message", i.Message)
}
}

View File

@@ -20,6 +20,9 @@ type Server struct {
conversations []model.Conversation
authTokens []model.AuthToken
messageStore map[string][]model.Message
updateItems map[int]model.UpdateItem
updateChannel chan []model.UpdateItem
updateItemSeq int
}
type AuthError struct {
@@ -44,6 +47,9 @@ func NewServer() *Server {
conversations: []model.Conversation{},
authTokens: []model.AuthToken{},
messageStore: make(map[string][]model.Message),
updateItems: make(map[int]model.UpdateItem),
updateChannel: make(chan []model.UpdateItem),
updateItemSeq: 0,
}
}
@@ -157,9 +163,52 @@ func (s *Server) ReceiveMessage(conversation *model.Conversation, message model.
conversation.Date = message.Date
conversation.UnreadCount += 1
// Enqueue Update
s.EnqueueUpdateItem(model.UpdateItem{
Conversation: conversation,
Message: &message,
})
log.Info().EmbedObject(message).Msgf("Received message from conversation %s", conversation.Guid)
}
func (s *Server) EnqueueUpdateItem(item model.UpdateItem) {
log.Info().EmbedObject(&item).Msg("Enqueuing update item")
s.updateItemSeq += 1
item.MessageSequenceNumber = s.updateItemSeq
s.updateItems[s.updateItemSeq] = item
// Publish to channel
select {
case s.updateChannel <- []model.UpdateItem{item}:
default: // Nobody listening
}
}
func (s *Server) FetchUpdates(since int) []model.UpdateItem {
items := []model.UpdateItem{}
for i := since; i <= s.updateItemSeq; i++ {
if val, ok := s.updateItems[i]; ok {
items = append(items, val)
}
}
return items
}
func (s *Server) FetchUpdatesBlocking(since int) []model.UpdateItem {
if since < 0 || since >= s.updateItemSeq {
// Wait for updates
log.Info().Msgf("Waiting for updates since %d", since)
items := <-s.updateChannel
return items
} else {
return s.FetchUpdates(since)
}
}
// Private
func (s *Server) registerAuthToken(token *model.AuthToken) {

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
@@ -159,11 +160,6 @@ func (m *MockHTTPServer) handleNotFound(w http.ResponseWriter, r *http.Request)
http.NotFound(w, r)
}
func (m *MockHTTPServer) handlePollUpdates(w http.ResponseWriter, r *http.Request) {
// Stub: return 205 (Nothing to report)
w.WriteHeader(http.StatusResetContent)
}
func (m *MockHTTPServer) handleSendMessage(w http.ResponseWriter, r *http.Request) {
// Decode request body as SendMessageRequest
var sendMessageReq SendMessageRequest
@@ -196,6 +192,43 @@ func (m *MockHTTPServer) handleSendMessage(w http.ResponseWriter, r *http.Reques
w.WriteHeader(http.StatusOK)
}
func (m *MockHTTPServer) handlePollUpdates(w http.ResponseWriter, r *http.Request) {
// TODO: This should block if we don't have updates for that seq yet.
seq := -1
seqString := r.URL.Query().Get("seq")
if len(seqString) > 0 {
var err error
seq, err = strconv.Atoi(seqString)
if err != nil {
log.Error().Err(err).Msg("FetchUpdates: Error parsing seq")
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
// Fetch updates (blocking)
updates := m.Server.FetchUpdatesBlocking(seq)
if len(updates) == 0 {
// return 205 (Nothing to report)
w.WriteHeader(http.StatusResetContent)
log.Info().Msg("FetchUpdates: Nothing to report")
} else {
// Encode updates as JSON
jsonData, err := json.Marshal(updates)
if err != nil {
log.Error().Err(err).Msg("Error marshalling updates")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Write JSON to response
w.Header().Set("Content-Type", "application/json")
w.Write(jsonData)
}
}
func (m *MockHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.logRequest(r, r.URL.Query().Encode())
m.mux.ServeHTTP(w, r)

View File

@@ -3,6 +3,7 @@ package web_test
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
@@ -231,3 +232,70 @@ func TestAuthentication(t *testing.T) {
t.Fatalf("Unexpected body: %s (expected %s)", body, "OK")
}
}
func TestUpdates(t *testing.T) {
s := web.NewMockHTTPServer(web.MockHTTPServerConfiguration{AuthEnabled: true})
httpServer := httptest.NewServer(s)
messageSeq := 0
// Mock conversation
guid := "1234567890"
conversation := model.Conversation{
Date: time.Now(),
Participants: []string{"Alice"},
UnreadCount: 0,
Guid: guid,
}
s.Server.AddConversation(conversation)
// Receive a message
message := model.Message{
Text: "This is a test.",
Sender: &conversation.Participants[0],
Date: time.Now(),
}
// This should enqueue an update item
s.Server.ReceiveMessage(&conversation, message)
resp, err := http.Get(httpServer.URL + fmt.Sprintf("/pollUpdates?seq=%d", messageSeq))
if err != nil {
t.Fatalf("TestUpdates error: %s", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("Unexpected status code: %d (expected %d)", resp.StatusCode, http.StatusOK)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error decoding body: %s", body)
}
var updates []model.UpdateItem
err = json.Unmarshal(body, &updates)
if err != nil {
t.Fatalf("Error unmarshalling JSON: %s", err)
}
if len(updates) != 1 {
t.Fatalf("Unexpected num updates: %d (expected %d)", len(updates), 1)
}
update := updates[0]
// Message seq should be >= messageSeq
messageSeq = update.MessageSequenceNumber
if messageSeq != 1 {
t.Fatalf("Unexpected message seq: %d (expected >= 0)", messageSeq)
}
if update.Conversation.Guid != conversation.Guid {
t.Fatalf("Unexpected conversation guid: %s (expected %s)", update.Conversation.Guid, conversation.Guid)
}
if update.Message.Text != message.Text {
t.Fatalf("Unexpected message text: %s (expected %s)", update.Message.Text, message.Text)
}
}