296 lines
6.8 KiB
Go
296 lines
6.8 KiB
Go
package server
|
|
|
|
import (
|
|
"sort"
|
|
"time"
|
|
|
|
"code.severnaya.net/kordophone-mock/v2/data"
|
|
"code.severnaya.net/kordophone-mock/v2/model"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
const VERSION = "Kordophone-2.6"
|
|
|
|
const (
|
|
AUTH_USERNAME = "test"
|
|
AUTH_PASSWORD = "test"
|
|
)
|
|
|
|
type Server struct {
|
|
version string
|
|
conversations []model.Conversation
|
|
authTokens []model.AuthToken
|
|
messageStore map[string][]model.Message
|
|
updateItems map[int]model.UpdateItem
|
|
updateChannel chan []model.UpdateItem
|
|
updateItemSeq int
|
|
}
|
|
|
|
type MessagesQuery struct {
|
|
ConversationGUID string
|
|
BeforeDate *time.Time
|
|
AfterGUID *string
|
|
BeforeGUID *string
|
|
Limit *int
|
|
}
|
|
|
|
type AuthError struct {
|
|
message string
|
|
}
|
|
|
|
func (e *AuthError) Error() string {
|
|
return e.message
|
|
}
|
|
|
|
type DatabaseError struct {
|
|
message string
|
|
}
|
|
|
|
func (e *DatabaseError) Error() string {
|
|
return e.message
|
|
}
|
|
|
|
func NewServer() *Server {
|
|
return &Server{
|
|
version: VERSION,
|
|
conversations: []model.Conversation{},
|
|
authTokens: []model.AuthToken{},
|
|
messageStore: make(map[string][]model.Message),
|
|
updateItems: make(map[int]model.UpdateItem),
|
|
updateChannel: make(chan []model.UpdateItem),
|
|
updateItemSeq: 0,
|
|
}
|
|
}
|
|
|
|
func (s *Server) Version() string {
|
|
return s.version
|
|
}
|
|
|
|
func (s *Server) Conversations() []model.Conversation {
|
|
return s.conversations
|
|
}
|
|
|
|
func (s *Server) SortedConversations() []model.Conversation {
|
|
conversations := s.Conversations()
|
|
sort.Slice(conversations, func(i, j int) bool {
|
|
return conversations[i].Date.After(conversations[j].Date)
|
|
})
|
|
|
|
return conversations
|
|
}
|
|
|
|
func (s *Server) ConversationForGUID(guid string) (*model.Conversation, error) {
|
|
var conversation *model.Conversation = nil
|
|
for i := range s.conversations {
|
|
c := &s.conversations[i]
|
|
if c.Guid == guid {
|
|
conversation = c
|
|
break
|
|
}
|
|
}
|
|
|
|
if conversation != nil {
|
|
return conversation, nil
|
|
}
|
|
|
|
return nil, &DatabaseError{message: "Conversation not found"}
|
|
}
|
|
|
|
func (s *Server) AddConversation(c model.Conversation) {
|
|
s.conversations = append(s.conversations, c)
|
|
}
|
|
|
|
func (s *Server) PopulateWithTestData() {
|
|
numConversations := 100
|
|
cs := make([]model.Conversation, numConversations)
|
|
for i := 0; i < numConversations; i++ {
|
|
cs[i] = data.GenerateRandomConversation()
|
|
|
|
// Generate messages
|
|
convo := &cs[i]
|
|
var lastMessage model.Message
|
|
for i := 0; i < 100; i++ {
|
|
message := data.GenerateRandomMessage(convo.Participants)
|
|
s.AppendMessageToConversation(convo, message)
|
|
|
|
if lastMessage.Date.Before(message.Date) {
|
|
lastMessage = message
|
|
}
|
|
}
|
|
|
|
// Update last message
|
|
convo.LastMessage = lastMessage
|
|
convo.LastMessagePreview = lastMessage.Text
|
|
}
|
|
|
|
s.conversations = cs
|
|
}
|
|
|
|
func (s *Server) Authenticate(username string, password string) (*model.AuthToken, error) {
|
|
if username != AUTH_USERNAME || password != AUTH_PASSWORD {
|
|
return nil, &AuthError{"Invalid username or password"}
|
|
}
|
|
|
|
token, err := model.NewAuthToken(username)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Register for future auth
|
|
s.registerAuthToken(token)
|
|
|
|
return token, nil
|
|
}
|
|
|
|
func (s *Server) CheckBearerToken(token string) bool {
|
|
return s.authenticateToken(token)
|
|
}
|
|
|
|
func (s *Server) PerformMessageQuery(query *MessagesQuery) []model.Message {
|
|
messages := s.messageStore[query.ConversationGUID]
|
|
|
|
// Sort
|
|
sort.Slice(messages, func(i int, j int) bool {
|
|
return messages[i].Date.Before(messages[j].Date)
|
|
})
|
|
|
|
// Apply before/after filters
|
|
// The following code assumes the messages are sorted by date ascending
|
|
if query.BeforeGUID != nil {
|
|
beforeGUID := *query.BeforeGUID
|
|
for i := range messages {
|
|
if messages[i].Guid == beforeGUID {
|
|
messages = messages[:i]
|
|
break
|
|
}
|
|
}
|
|
} else if query.AfterGUID != nil {
|
|
afterGUID := *query.AfterGUID
|
|
for i := range messages {
|
|
if messages[i].Guid == afterGUID {
|
|
messages = messages[i+1:]
|
|
break
|
|
}
|
|
}
|
|
} else if query.BeforeDate != nil {
|
|
beforeDate := *query.BeforeDate
|
|
for i := range messages {
|
|
if messages[i].Date.Before(beforeDate) {
|
|
messages = messages[:i]
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Limit
|
|
if query.Limit != nil {
|
|
limit := *query.Limit
|
|
if len(messages) > limit {
|
|
messages = messages[:limit]
|
|
}
|
|
}
|
|
|
|
return messages
|
|
}
|
|
|
|
func (s *Server) MessagesForConversation(conversation *model.Conversation) []model.Message {
|
|
return s.PerformMessageQuery(&MessagesQuery{
|
|
ConversationGUID: conversation.Guid,
|
|
})
|
|
}
|
|
|
|
func (s *Server) AppendMessageToConversation(conversation *model.Conversation, message model.Message) {
|
|
s.messageStore[conversation.Guid] = append(s.messageStore[conversation.Guid], message)
|
|
}
|
|
|
|
func (s *Server) SendMessage(conversation *model.Conversation, message model.Message) {
|
|
s.AppendMessageToConversation(conversation, message)
|
|
|
|
// Update Conversation
|
|
ourConversation, _ := s.ConversationForGUID(conversation.Guid)
|
|
ourConversation.LastMessagePreview = message.Text
|
|
ourConversation.Date = message.Date
|
|
|
|
log.Info().EmbedObject(message).Msgf("Sent message to conversation %s", conversation.Guid)
|
|
}
|
|
|
|
func (s *Server) ReceiveMessage(conversation *model.Conversation, message model.Message) {
|
|
s.AppendMessageToConversation(conversation, message)
|
|
|
|
// Update conversation
|
|
ourConversation, _ := s.ConversationForGUID(conversation.Guid)
|
|
ourConversation.LastMessagePreview = message.Text
|
|
ourConversation.Date = message.Date
|
|
ourConversation.UnreadCount += 1
|
|
|
|
// Enqueue Update
|
|
s.EnqueueUpdateItem(model.UpdateItem{
|
|
Conversation: ourConversation,
|
|
Message: &message,
|
|
})
|
|
|
|
log.Info().EmbedObject(message).Msgf("Received message from conversation %s", conversation.Guid)
|
|
}
|
|
|
|
func (s *Server) EnqueueUpdateItem(item model.UpdateItem) {
|
|
log.Info().EmbedObject(&item).Msg("Enqueuing update item")
|
|
|
|
s.updateItemSeq += 1
|
|
item.MessageSequenceNumber = s.updateItemSeq
|
|
|
|
s.updateItems[s.updateItemSeq] = item
|
|
|
|
// Publish to channel
|
|
select {
|
|
case s.updateChannel <- []model.UpdateItem{item}:
|
|
default: // Nobody listening
|
|
}
|
|
}
|
|
|
|
func (s *Server) FetchUpdates(since int) []model.UpdateItem {
|
|
items := []model.UpdateItem{}
|
|
for i := since; i <= s.updateItemSeq; i++ {
|
|
if val, ok := s.updateItems[i]; ok {
|
|
items = append(items, val)
|
|
}
|
|
}
|
|
|
|
return items
|
|
}
|
|
|
|
func (s *Server) FetchUpdatesBlocking(since int) []model.UpdateItem {
|
|
if since < 0 || since >= s.updateItemSeq {
|
|
// Wait for updates
|
|
log.Info().Msgf("Waiting for updates since %d", since)
|
|
items := <-s.updateChannel
|
|
return items
|
|
} else {
|
|
return s.FetchUpdates(since)
|
|
}
|
|
}
|
|
|
|
func (s *Server) MarkConversationAsRead(conversation *model.Conversation) {
|
|
conversation.UnreadCount = 0
|
|
|
|
// enqueue update
|
|
s.EnqueueUpdateItem(model.UpdateItem{
|
|
Conversation: conversation,
|
|
})
|
|
}
|
|
|
|
// Private
|
|
|
|
func (s *Server) registerAuthToken(token *model.AuthToken) {
|
|
s.authTokens = append(s.authTokens, *token)
|
|
}
|
|
|
|
func (s *Server) authenticateToken(token string) bool {
|
|
for _, t := range s.authTokens {
|
|
if t.SignedToken == token {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|