diff --git a/model/conversation.go b/model/conversation.go index d376581..b10ae5a 100644 --- a/model/conversation.go +++ b/model/conversation.go @@ -10,7 +10,7 @@ import ( type Conversation struct { Date time.Time `json:"date"` Participants []string `json:"participantDisplayNames"` - DisplayName *string `json:"displayName"` // Optional + DisplayName *string `json:"displayName,omitempty"` // Optional UnreadCount int `json:"unreadCount"` LastMessagePreview string `json:"lastMessagePreview"` Guid string `json:"guid"` diff --git a/model/update.go b/model/update.go new file mode 100644 index 0000000..07b0547 --- /dev/null +++ b/model/update.go @@ -0,0 +1,26 @@ +package model + +import "github.com/rs/zerolog" + +type UpdateItem struct { + MessageSequenceNumber int `json:"messageSequenceNumber"` + Conversation *Conversation `json:"conversation,omitempty"` + Message *Message `json:"message,omitempty"` +} + +func New(conversation *Conversation, message *Message) *UpdateItem { + return &UpdateItem{ + Conversation: conversation, + Message: message, + } +} + +func (i *UpdateItem) MarshalZerologObject(e *zerolog.Event) { + e.Int("messageSequenceNumber", i.MessageSequenceNumber) + if i.Conversation != nil { + e.Object("conversation", i.Conversation) + } + if i.Message != nil { + e.Object("message", i.Message) + } +} diff --git a/server/server.go b/server/server.go index f8ee62b..726bf26 100644 --- a/server/server.go +++ b/server/server.go @@ -20,6 +20,9 @@ type Server struct { conversations []model.Conversation authTokens []model.AuthToken messageStore map[string][]model.Message + updateItems map[int]model.UpdateItem + updateChannel chan []model.UpdateItem + updateItemSeq int } type AuthError struct { @@ -44,6 +47,9 @@ func NewServer() *Server { conversations: []model.Conversation{}, authTokens: []model.AuthToken{}, messageStore: make(map[string][]model.Message), + updateItems: make(map[int]model.UpdateItem), + updateChannel: make(chan []model.UpdateItem), + updateItemSeq: 0, } } @@ -157,9 +163,52 @@ func (s *Server) ReceiveMessage(conversation *model.Conversation, message model. conversation.Date = message.Date conversation.UnreadCount += 1 + // Enqueue Update + s.EnqueueUpdateItem(model.UpdateItem{ + Conversation: conversation, + Message: &message, + }) + log.Info().EmbedObject(message).Msgf("Received message from conversation %s", conversation.Guid) } +func (s *Server) EnqueueUpdateItem(item model.UpdateItem) { + log.Info().EmbedObject(&item).Msg("Enqueuing update item") + + s.updateItemSeq += 1 + item.MessageSequenceNumber = s.updateItemSeq + + s.updateItems[s.updateItemSeq] = item + + // Publish to channel + select { + case s.updateChannel <- []model.UpdateItem{item}: + default: // Nobody listening + } +} + +func (s *Server) FetchUpdates(since int) []model.UpdateItem { + items := []model.UpdateItem{} + for i := since; i <= s.updateItemSeq; i++ { + if val, ok := s.updateItems[i]; ok { + items = append(items, val) + } + } + + return items +} + +func (s *Server) FetchUpdatesBlocking(since int) []model.UpdateItem { + if since < 0 || since >= s.updateItemSeq { + // Wait for updates + log.Info().Msgf("Waiting for updates since %d", since) + items := <-s.updateChannel + return items + } else { + return s.FetchUpdates(since) + } +} + // Private func (s *Server) registerAuthToken(token *model.AuthToken) { diff --git a/web/server.go b/web/server.go index 10e43f6..ac42d62 100644 --- a/web/server.go +++ b/web/server.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "strings" "time" @@ -159,11 +160,6 @@ func (m *MockHTTPServer) handleNotFound(w http.ResponseWriter, r *http.Request) http.NotFound(w, r) } -func (m *MockHTTPServer) handlePollUpdates(w http.ResponseWriter, r *http.Request) { - // Stub: return 205 (Nothing to report) - w.WriteHeader(http.StatusResetContent) -} - func (m *MockHTTPServer) handleSendMessage(w http.ResponseWriter, r *http.Request) { // Decode request body as SendMessageRequest var sendMessageReq SendMessageRequest @@ -196,6 +192,43 @@ func (m *MockHTTPServer) handleSendMessage(w http.ResponseWriter, r *http.Reques w.WriteHeader(http.StatusOK) } +func (m *MockHTTPServer) handlePollUpdates(w http.ResponseWriter, r *http.Request) { + // TODO: This should block if we don't have updates for that seq yet. + + seq := -1 + seqString := r.URL.Query().Get("seq") + if len(seqString) > 0 { + var err error + seq, err = strconv.Atoi(seqString) + if err != nil { + log.Error().Err(err).Msg("FetchUpdates: Error parsing seq") + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + + // Fetch updates (blocking) + updates := m.Server.FetchUpdatesBlocking(seq) + + if len(updates) == 0 { + // return 205 (Nothing to report) + w.WriteHeader(http.StatusResetContent) + log.Info().Msg("FetchUpdates: Nothing to report") + } else { + // Encode updates as JSON + jsonData, err := json.Marshal(updates) + if err != nil { + log.Error().Err(err).Msg("Error marshalling updates") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Write JSON to response + w.Header().Set("Content-Type", "application/json") + w.Write(jsonData) + } +} + func (m *MockHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.logRequest(r, r.URL.Query().Encode()) m.mux.ServeHTTP(w, r) diff --git a/web/server_test.go b/web/server_test.go index 173ad0e..2052383 100644 --- a/web/server_test.go +++ b/web/server_test.go @@ -3,6 +3,7 @@ package web_test import ( "bytes" "encoding/json" + "fmt" "io" "net/http" "net/http/httptest" @@ -231,3 +232,70 @@ func TestAuthentication(t *testing.T) { t.Fatalf("Unexpected body: %s (expected %s)", body, "OK") } } + +func TestUpdates(t *testing.T) { + s := web.NewMockHTTPServer(web.MockHTTPServerConfiguration{AuthEnabled: true}) + httpServer := httptest.NewServer(s) + + messageSeq := 0 + + // Mock conversation + guid := "1234567890" + conversation := model.Conversation{ + Date: time.Now(), + Participants: []string{"Alice"}, + UnreadCount: 0, + Guid: guid, + } + s.Server.AddConversation(conversation) + + // Receive a message + message := model.Message{ + Text: "This is a test.", + Sender: &conversation.Participants[0], + Date: time.Now(), + } + + // This should enqueue an update item + s.Server.ReceiveMessage(&conversation, message) + + resp, err := http.Get(httpServer.URL + fmt.Sprintf("/pollUpdates?seq=%d", messageSeq)) + if err != nil { + t.Fatalf("TestUpdates error: %s", err) + } + + if resp.StatusCode != http.StatusOK { + t.Fatalf("Unexpected status code: %d (expected %d)", resp.StatusCode, http.StatusOK) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Error decoding body: %s", body) + } + + var updates []model.UpdateItem + err = json.Unmarshal(body, &updates) + if err != nil { + t.Fatalf("Error unmarshalling JSON: %s", err) + } + + if len(updates) != 1 { + t.Fatalf("Unexpected num updates: %d (expected %d)", len(updates), 1) + } + + update := updates[0] + + // Message seq should be >= messageSeq + messageSeq = update.MessageSequenceNumber + if messageSeq != 1 { + t.Fatalf("Unexpected message seq: %d (expected >= 0)", messageSeq) + } + + if update.Conversation.Guid != conversation.Guid { + t.Fatalf("Unexpected conversation guid: %s (expected %s)", update.Conversation.Guid, conversation.Guid) + } + + if update.Message.Text != message.Text { + t.Fatalf("Unexpected message text: %s (expected %s)", update.Message.Text, message.Text) + } +}