Private
Public Access
1
0

Updates: need to have one channel per observer

This commit is contained in:
2023-08-24 00:30:57 -07:00
parent 7895a3a2e0
commit a0e8e049f0
2 changed files with 30 additions and 19 deletions

View File

@@ -17,13 +17,13 @@ const (
) )
type Server struct { type Server struct {
version string version string
conversations []model.Conversation conversations []model.Conversation
authTokens []model.AuthToken authTokens []model.AuthToken
messageStore map[string][]model.Message messageStore map[string][]model.Message
updateItems map[int]model.UpdateItem updateItems map[int]model.UpdateItem
updateChannel chan []model.UpdateItem updateChannels []chan []model.UpdateItem
updateItemSeq int updateItemSeq int
} }
type MessagesQuery struct { type MessagesQuery struct {
@@ -52,13 +52,13 @@ func (e *DatabaseError) Error() string {
func NewServer() *Server { func NewServer() *Server {
return &Server{ return &Server{
version: VERSION, version: VERSION,
conversations: []model.Conversation{}, conversations: []model.Conversation{},
authTokens: []model.AuthToken{}, authTokens: []model.AuthToken{},
messageStore: make(map[string][]model.Message), messageStore: make(map[string][]model.Message),
updateItems: make(map[int]model.UpdateItem), updateItems: make(map[int]model.UpdateItem),
updateChannel: make(chan []model.UpdateItem), updateChannels: []chan []model.UpdateItem{},
updateItemSeq: 0, updateItemSeq: 0,
} }
} }
@@ -186,7 +186,7 @@ func (s *Server) PerformMessageQuery(query *MessagesQuery) []model.Message {
if query.Limit != nil { if query.Limit != nil {
limit := *query.Limit limit := *query.Limit
if len(messages) > limit { if len(messages) > limit {
messages = messages[:limit] messages = messages[len(messages)-limit:]
} }
} }
@@ -241,9 +241,8 @@ func (s *Server) EnqueueUpdateItem(item model.UpdateItem) {
s.updateItems[s.updateItemSeq] = item s.updateItems[s.updateItemSeq] = item
// Publish to channel // Publish to channel
select { for i := range s.updateChannels {
case s.updateChannel <- []model.UpdateItem{item}: s.updateChannels[i] <- []model.UpdateItem{item}
default: // Nobody listening
} }
} }
@@ -262,7 +261,18 @@ func (s *Server) FetchUpdatesBlocking(since int) []model.UpdateItem {
if since < 0 || since >= s.updateItemSeq { if since < 0 || since >= s.updateItemSeq {
// Wait for updates // Wait for updates
log.Info().Msgf("Waiting for updates since %d", since) log.Info().Msgf("Waiting for updates since %d", since)
items := <-s.updateChannel updateChannel := make(chan []model.UpdateItem)
s.updateChannels = append(s.updateChannels, updateChannel)
items := <-updateChannel
// Remove channel
for i := range s.updateChannels {
if s.updateChannels[i] == updateChannel {
s.updateChannels = append(s.updateChannels[:i], s.updateChannels[i+1:]...)
break
}
}
return items return items
} else { } else {
return s.FetchUpdates(since) return s.FetchUpdates(since)

View File

@@ -289,6 +289,7 @@ func (m *MockHTTPServer) handleMarkConversation(w http.ResponseWriter, r *http.R
func (m *MockHTTPServer) handleUpdatesWebsocket(c *websocket.Conn) { func (m *MockHTTPServer) handleUpdatesWebsocket(c *websocket.Conn) {
// Fetch updates continuously // Fetch updates continuously
defer c.Close()
for { for {
// Fetch updates (blocking) // Fetch updates (blocking)
updates := m.Server.FetchUpdatesBlocking(-1) updates := m.Server.FetchUpdatesBlocking(-1)