diff --git a/server/server.go b/server/server.go index 8da05c9..6dfddf6 100644 --- a/server/server.go +++ b/server/server.go @@ -17,13 +17,13 @@ const ( ) type Server struct { - version string - conversations []model.Conversation - authTokens []model.AuthToken - messageStore map[string][]model.Message - updateItems map[int]model.UpdateItem - updateChannel chan []model.UpdateItem - updateItemSeq int + version string + conversations []model.Conversation + authTokens []model.AuthToken + messageStore map[string][]model.Message + updateItems map[int]model.UpdateItem + updateChannels []chan []model.UpdateItem + updateItemSeq int } type MessagesQuery struct { @@ -52,13 +52,13 @@ func (e *DatabaseError) Error() string { func NewServer() *Server { return &Server{ - version: VERSION, - conversations: []model.Conversation{}, - authTokens: []model.AuthToken{}, - messageStore: make(map[string][]model.Message), - updateItems: make(map[int]model.UpdateItem), - updateChannel: make(chan []model.UpdateItem), - updateItemSeq: 0, + version: VERSION, + conversations: []model.Conversation{}, + authTokens: []model.AuthToken{}, + messageStore: make(map[string][]model.Message), + updateItems: make(map[int]model.UpdateItem), + updateChannels: []chan []model.UpdateItem{}, + updateItemSeq: 0, } } @@ -186,7 +186,7 @@ func (s *Server) PerformMessageQuery(query *MessagesQuery) []model.Message { if query.Limit != nil { limit := *query.Limit if len(messages) > limit { - messages = messages[:limit] + messages = messages[len(messages)-limit:] } } @@ -241,9 +241,8 @@ func (s *Server) EnqueueUpdateItem(item model.UpdateItem) { s.updateItems[s.updateItemSeq] = item // Publish to channel - select { - case s.updateChannel <- []model.UpdateItem{item}: - default: // Nobody listening + for i := range s.updateChannels { + s.updateChannels[i] <- []model.UpdateItem{item} } } @@ -262,7 +261,18 @@ func (s *Server) FetchUpdatesBlocking(since int) []model.UpdateItem { if since < 0 || since >= s.updateItemSeq { // Wait for updates log.Info().Msgf("Waiting for updates since %d", since) - items := <-s.updateChannel + updateChannel := make(chan []model.UpdateItem) + s.updateChannels = append(s.updateChannels, updateChannel) + items := <-updateChannel + + // Remove channel + for i := range s.updateChannels { + if s.updateChannels[i] == updateChannel { + s.updateChannels = append(s.updateChannels[:i], s.updateChannels[i+1:]...) + break + } + } + return items } else { return s.FetchUpdates(since) diff --git a/web/server.go b/web/server.go index 9d1b40f..a3f6b25 100644 --- a/web/server.go +++ b/web/server.go @@ -289,6 +289,7 @@ func (m *MockHTTPServer) handleMarkConversation(w http.ResponseWriter, r *http.R func (m *MockHTTPServer) handleUpdatesWebsocket(c *websocket.Conn) { // Fetch updates continuously + defer c.Close() for { // Fetch updates (blocking) updates := m.Server.FetchUpdatesBlocking(-1)