Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: use gobwas/ws instead of gorilla/websocket #13932

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ go 1.18
require (
github.com/ahmetb/gen-crd-api-reference-docs v0.3.1-0.20210609063737-0067dc6dcea2
github.com/davecgh/go-spew v1.1.1
github.com/gobwas/ws v1.2.0
github.com/gogo/protobuf v1.3.2
github.com/google/go-cmp v0.5.9
github.com/google/go-containerregistry v0.13.0
github.com/google/go-containerregistry/pkg/authn/k8schain v0.0.0-20230209165335-3624968304fd
github.com/google/gofuzz v1.2.0
github.com/google/mako v0.0.0-20190821191249-122f8dcef9e3
github.com/gorilla/websocket v1.4.2
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.4
github.com/influxdata/influxdb-client-go/v2 v2.9.0
github.com/kelseyhightower/envconfig v1.4.0
Expand Down Expand Up @@ -93,6 +94,8 @@ require (
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gobuffalo/flect v0.2.4 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down Expand Up @@ -137,7 +140,7 @@ require (
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/oauth2 v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.1.12 // indirect
Expand All @@ -154,3 +157,5 @@ require (
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

replace knative.dev/pkg => ../pkg
16 changes: 10 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gobuffalo/flect v0.2.4 h1:BSYA8+T60cdyq+vynaSUjqSVI9mDEg9ZfQUXKmfjo4I=
github.com/gobuffalo/flect v0.2.4/go.mod h1:1ZyCLIbg0YD7sDkzvFdPoOydPtD8y9JQnrOROolUcM8=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.0 h1:u0p9s3xLYpZCA1z5JgCkMeB34CKCMMQbM+G8Ii7YD0I=
github.com/gobwas/ws v1.2.0/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down Expand Up @@ -388,8 +394,8 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.14.6/go.mod h1:zdiPV4Yse/1gnckTHtghG4GkDEdKCRJduHpTxT3/jcw=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
Expand Down Expand Up @@ -851,8 +857,8 @@ golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down Expand Up @@ -1197,8 +1203,6 @@ knative.dev/hack v0.0.0-20230501013555-7d81248b4638 h1:9IuXHdwp5jNmIg+0LVTQr8o4u
knative.dev/hack v0.0.0-20230501013555-7d81248b4638/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/networking v0.0.0-20230504184058-77975a12b2ee h1:d2dytSnwikNVtttk/lTjn7t6A9447DkUXADHR+zLOdU=
knative.dev/networking v0.0.0-20230504184058-77975a12b2ee/go.mod h1:OG9AEepHd3dofzrkzb0IelqN5uzu10RjbSdhl5UruSE=
knative.dev/pkg v0.0.0-20230502134655-db8a35330281 h1:9mN8O5XO68DKlkzEhFAShUx+O/I+TQR71vmTvYt8oF4=
knative.dev/pkg v0.0.0-20230502134655-db8a35330281/go.mod h1:2qWPP9Gjh9Q7ETti+WRHnBnGCSCq+6q7m3p/nmUQviE=
pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U=
pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
8 changes: 4 additions & 4 deletions pkg/activator/stat_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ limitations under the License.
package activator

import (
"github.com/gorilla/websocket"
"github.com/gobwas/ws"
"go.uber.org/zap"
"knative.dev/serving/pkg/autoscaler/metrics"
)

// RawSender sends raw byte array messages with a message type
// (implemented by gorilla/websocket.Socket).
// (implemented by gobwas/ws).
type RawSender interface {
SendRaw(msgType int, msg []byte) error
SendRaw(msgType ws.OpCode, msg []byte) error
}

// ReportStats sends any messages received on the source channel to the sink.
Expand All @@ -41,7 +41,7 @@ func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metr
return
}

if err := sink.SendRaw(websocket.BinaryMessage, b); err != nil {
if err := sink.SendRaw(ws.OpBinary, b); err != nil {
logger.Errorw("Error while sending stats", zap.Error(err))
}
}(sms)
Expand Down
12 changes: 6 additions & 6 deletions pkg/activator/stat_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/gobwas/ws"
"github.com/google/go-cmp/cmp"
gorillawebsocket "github.com/gorilla/websocket"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
logtesting "knative.dev/pkg/logging/testing"
Expand All @@ -33,9 +33,9 @@ func TestReportStats(t *testing.T) {
ch := make(chan []metrics.StatMessage)

results := make(chan []byte)
sink := sendRawFunc(func(msgType int, msg []byte) error {
if msgType != gorillawebsocket.BinaryMessage {
t.Errorf("Expected metrics to be sent as Binary (%d), was %d", gorillawebsocket.BinaryMessage, msgType)
sink := sendRawFunc(func(msgType ws.OpCode, msg []byte) error {
if msgType != ws.OpBinary {
t.Errorf("Expected metrics to be sent as Binary (%d), was %d", ws.OpBinary, msgType)
}

results <- msg
Expand Down Expand Up @@ -90,8 +90,8 @@ func TestReportStats(t *testing.T) {
}
}

type sendRawFunc func(msgType int, msg []byte) error
type sendRawFunc func(msgType ws.OpCode, msg []byte) error

func (fn sendRawFunc) SendRaw(msgType int, msg []byte) error {
func (fn sendRawFunc) SendRaw(msgType ws.OpCode, msg []byte) error {
return fn(msgType, msg)
}
4 changes: 2 additions & 2 deletions pkg/autoscaler/statforwarder/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync"
"time"

gorillawebsocket "github.com/gorilla/websocket"
"github.com/gobwas/ws"
"go.uber.org/zap"

"knative.dev/pkg/logging/logkey"
Expand Down Expand Up @@ -138,7 +138,7 @@ func (p *remoteProcessor) process(sm asmetrics.StatMessage) error {
}
}

return c.SendRaw(gorillawebsocket.BinaryMessage, b)
return c.SendRaw(ws.OpBinary, b)
}

func (p *remoteProcessor) shutdown() {
Expand Down
19 changes: 12 additions & 7 deletions pkg/autoscaler/statforwarder/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"testing"
"time"

gorillawebsocket "github.com/gorilla/websocket"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
. "knative.dev/pkg/logging/testing"
)

Expand Down Expand Up @@ -98,16 +98,16 @@ func TestProcessorForwardingViaSvcRetry(t *testing.T) {

func testService(t *testing.T, received chan struct{}) *httptest.Server {
var httpHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
var upgrader gorillawebsocket.Upgrader
upgrader := ws.HTTPUpgrader{}

conn, err := upgrader.Upgrade(w, r, nil)
conn, _, _, err := upgrader.Upgrade(r, w)
if err != nil {
t.Fatal("error upgrading websocket:", err)
}

defer conn.Close()
for {
t, b, err := conn.ReadMessage()
var messages []wsutil.Message
messages, err = wsutil.ReadMessage(conn, ws.StateServerSide, messages)
if err != nil {
// This is probably caused by connection closed by client side.
return
Expand All @@ -116,7 +116,12 @@ func testService(t *testing.T, received chan struct{}) *httptest.Server {

// Answer messages to keep the connection's keepalive function moving and the
// connection closed quicker than 10s.
conn.WriteMessage(t, b)
for _, m := range messages {
op := m.OpCode
p := m.Payload
wsutil.WriteMessage(conn, ws.StateServerSide, op, p)
}

}
}

Expand Down
54 changes: 31 additions & 23 deletions pkg/autoscaler/statserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package statserver
import (
"context"
"errors"
"io"
"net"
"net/http"
"strings"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"go.uber.org/zap"
netheader "knative.dev/networking/pkg/http/header"
"knative.dev/serving/pkg/autoscaler/bucket"
Expand Down Expand Up @@ -133,8 +135,8 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
}
}

var upgrader websocket.Upgrader
conn, err := upgrader.Upgrade(w, r, nil)
upgrader := ws.HTTPUpgrader{}
conn, _, _, err := upgrader.Upgrade(r, w)
if err != nil {
s.logger.Errorw("error upgrading websocket", zap.Error(err))
return
Expand All @@ -149,7 +151,7 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
case <-s.stopCh:
// Send a close message to tell the client to immediately reconnect
s.logger.Debug("Sending close message to client")
err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(closeCodeServiceRestart, "Restarting"))
err := wsutil.WriteMessage(conn, ws.StateServerSide, ws.OpClose, ws.NewCloseFrameBody(closeCodeServiceRestart, "Restarting"))
if err != nil {
s.logger.Warnw("Failed to send close message to client", zap.Error(err))
}
Expand All @@ -162,11 +164,12 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
s.logger.Debug("Connection upgraded to WebSocket. Entering receive loop.")

for {
messageType, msg, err := conn.ReadMessage()
var messages []wsutil.Message
messages, err = wsutil.ReadMessage(conn, ws.StateServerSide, messages)
if err != nil {
// We close abnormally, because we're just closing the connection in the client,
// which is okay. There's no value delaying closure of the connection unnecessarily.
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
if errors.Is(err, io.EOF) {
s.logger.Debug("Handler disconnected")
} else {
s.logger.Errorf("Handler exiting on error: %#v", err)
Expand All @@ -175,28 +178,33 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
return
}

switch messageType {
case websocket.BinaryMessage:
var wsms metrics.WireStatMessages
if err := wsms.Unmarshal(msg); err != nil {
s.logger.Errorw("Failed to unmarshal the object", zap.Error(err))
continue
}

for _, wsm := range wsms.Messages {
if wsm.Stat == nil {
// To allow for future protobuf schema changes.
for _, m := range messages {
messageType := m.OpCode
msg := m.Payload
switch messageType {
case ws.OpBinary:
var wsms metrics.WireStatMessages
if err := wsms.Unmarshal(msg); err != nil {
s.logger.Errorw("Failed to unmarshal the object", zap.Error(err))
continue
}

sm := wsm.ToStatMessage()
s.logger.Debugf("Received stat message: %+v", sm)
s.statsCh <- sm
for _, wsm := range wsms.Messages {
if wsm.Stat == nil {
// To allow for future protobuf schema changes.
continue
}

sm := wsm.ToStatMessage()
s.logger.Debugf("Received stat message: %+v", sm)
s.statsCh <- sm
}
default:
s.logger.Error("Dropping unknown message type.")
continue
}
default:
s.logger.Error("Dropping unknown message type.")
continue
}

}
}

Expand Down
Loading