server: implements /updates websocket
This commit is contained in:
3
go.mod
3
go.mod
@@ -9,5 +9,6 @@ require (
|
|||||||
github.com/mattn/go-colorable v0.1.12 // indirect
|
github.com/mattn/go-colorable v0.1.12 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.14 // indirect
|
github.com/mattn/go-isatty v0.0.14 // indirect
|
||||||
github.com/rs/zerolog v1.29.1 // indirect
|
github.com/rs/zerolog v1.29.1 // indirect
|
||||||
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 // indirect
|
golang.org/x/net v0.14.0 // indirect
|
||||||
|
golang.org/x/sys v0.11.0 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -16,8 +16,12 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
|||||||
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||||
github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
|
github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
|
||||||
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
|
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
|
||||||
|
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
|
||||||
|
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
|
||||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 h1:foEbQz/B0Oz6YIqu/69kfXPYeFQAuuMYFkjaqXzl5Wo=
|
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 h1:foEbQz/B0Oz6YIqu/69kfXPYeFQAuuMYFkjaqXzl5Wo=
|
||||||
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 h1:y/woIyUBFbpQGKS0u1aHF/40WUDnek3fPOyD08H5Vng=
|
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 h1:y/woIyUBFbpQGKS0u1aHF/40WUDnek3fPOyD08H5Vng=
|
||||||
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
|
||||||
|
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"code.severnaya.net/kordophone-mock/v2/server"
|
"code.severnaya.net/kordophone-mock/v2/server"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MockHTTPServerConfiguration struct {
|
type MockHTTPServerConfiguration struct {
|
||||||
@@ -286,6 +287,21 @@ func (m *MockHTTPServer) handleMarkConversation(w http.ResponseWriter, r *http.R
|
|||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockHTTPServer) handleUpdatesWebsocket(c *websocket.Conn) {
|
||||||
|
// Fetch updates continuously
|
||||||
|
for {
|
||||||
|
// Fetch updates (blocking)
|
||||||
|
updates := m.Server.FetchUpdatesBlocking(-1)
|
||||||
|
|
||||||
|
// Send updates to client
|
||||||
|
err := websocket.JSON.Send(c, updates)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("handleUpdatesWebsocket: Error sending updates to client (probably disconnected)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MockHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (m *MockHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
m.logRequest(r, r.URL.Query().Encode())
|
m.logRequest(r, r.URL.Query().Encode())
|
||||||
m.mux.ServeHTTP(w, r)
|
m.mux.ServeHTTP(w, r)
|
||||||
@@ -307,6 +323,12 @@ func NewMockHTTPServer(config MockHTTPServerConfiguration) *MockHTTPServer {
|
|||||||
this.mux.Handle("/sendMessage", http.HandlerFunc(this.handleSendMessage))
|
this.mux.Handle("/sendMessage", http.HandlerFunc(this.handleSendMessage))
|
||||||
this.mux.Handle("/markConversation", http.HandlerFunc(this.handleMarkConversation))
|
this.mux.Handle("/markConversation", http.HandlerFunc(this.handleMarkConversation))
|
||||||
|
|
||||||
|
// /updates websocket
|
||||||
|
this.mux.Handle("/updates", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s := websocket.Server{Handler: websocket.Handler(this.handleUpdatesWebsocket)}
|
||||||
|
s.ServeHTTP(w, r)
|
||||||
|
}))
|
||||||
|
|
||||||
this.mux.Handle("/", http.HandlerFunc(this.handleNotFound))
|
this.mux.Handle("/", http.HandlerFunc(this.handleNotFound))
|
||||||
|
|
||||||
return &this
|
return &this
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -14,6 +15,7 @@ import (
|
|||||||
"code.severnaya.net/kordophone-mock/v2/model"
|
"code.severnaya.net/kordophone-mock/v2/model"
|
||||||
"code.severnaya.net/kordophone-mock/v2/server"
|
"code.severnaya.net/kordophone-mock/v2/server"
|
||||||
"code.severnaya.net/kordophone-mock/v2/web"
|
"code.severnaya.net/kordophone-mock/v2/web"
|
||||||
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestVersion(t *testing.T) {
|
func TestVersion(t *testing.T) {
|
||||||
@@ -301,6 +303,98 @@ func TestUpdates(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MessageUpdateError struct {
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e MessageUpdateError) Error() string {
|
||||||
|
return e.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdatesWebsocket(t *testing.T) {
|
||||||
|
s := web.NewMockHTTPServer(web.MockHTTPServerConfiguration{AuthEnabled: true})
|
||||||
|
httpServer := httptest.NewServer(s)
|
||||||
|
|
||||||
|
// Mock conversation
|
||||||
|
guid := "1234567890"
|
||||||
|
conversation := model.Conversation{
|
||||||
|
Date: time.Now(),
|
||||||
|
Participants: []string{"Alice"},
|
||||||
|
UnreadCount: 0,
|
||||||
|
Guid: guid,
|
||||||
|
}
|
||||||
|
s.Server.AddConversation(conversation)
|
||||||
|
|
||||||
|
// Receive a message
|
||||||
|
message := model.Message{
|
||||||
|
Text: "This is a test.",
|
||||||
|
Sender: &conversation.Participants[0],
|
||||||
|
Date: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open websocket connection
|
||||||
|
wsURL := "ws" + strings.TrimPrefix(httpServer.URL, "http") + "/updates"
|
||||||
|
ws, err := websocket.Dial(wsURL, "ws", httpServer.URL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error opening websocket: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Await messages on the websocket
|
||||||
|
messageReceived := make(chan bool)
|
||||||
|
errorEncountered := make(chan error)
|
||||||
|
go func() {
|
||||||
|
// Read from websocket
|
||||||
|
var updates []model.UpdateItem
|
||||||
|
err := websocket.JSON.Receive(ws, &updates)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
errorEncountered <- err
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
if len(updates) != 1 {
|
||||||
|
errorEncountered <- MessageUpdateError{
|
||||||
|
fmt.Sprintf("Unexpected num updates: %d (expected %d)", len(updates), 1),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
update := updates[0]
|
||||||
|
if update.Conversation.Guid != conversation.Guid {
|
||||||
|
errorEncountered <- MessageUpdateError{
|
||||||
|
fmt.Sprintf("Unexpected conversation guid: %s (expected %s)", update.Conversation.Guid, conversation.Guid),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if update.Message.Text != message.Text {
|
||||||
|
errorEncountered <- MessageUpdateError{
|
||||||
|
fmt.Sprintf("Unexpected message text: %s (expected %s)", update.Message.Text, message.Text),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
messageReceived <- true
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
// sleep for a bit to allow the websocket to connect
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// This should enqueue an update item
|
||||||
|
s.Server.ReceiveMessage(&conversation, message)
|
||||||
|
|
||||||
|
// Await expectation
|
||||||
|
select {
|
||||||
|
case <-messageReceived:
|
||||||
|
// COOL
|
||||||
|
case err := <-errorEncountered:
|
||||||
|
t.Fatalf("Error encountered reading from websocket: %s", err)
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("Timed out waiting for websocket message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMarkConversation(t *testing.T) {
|
func TestMarkConversation(t *testing.T) {
|
||||||
s := web.NewMockHTTPServer(web.MockHTTPServerConfiguration{AuthEnabled: true})
|
s := web.NewMockHTTPServer(web.MockHTTPServerConfiguration{AuthEnabled: true})
|
||||||
httpServer := httptest.NewServer(s)
|
httpServer := httptest.NewServer(s)
|
||||||
|
|||||||
Reference in New Issue
Block a user