Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Gekko0114 committed May 1, 2023
1 parent 384b6e8 commit 6d1cf34
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 101 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ require (
k8s.io/code-generator v0.25.4
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2
knative.dev/caching v0.0.0-20230428120151-d563a6163387
knative.dev/control-protocol v0.0.0-20230428014639-f335b1de0367
knative.dev/hack v0.0.0-20230428013635-6e4569cc679d
knative.dev/networking v0.0.0-20230428120551-68725bdd1056
knative.dev/pkg v0.0.0-20230428013435-aacec7fd7a8e
knative.dev/caching v0.0.0-20230501013956-5faf6d6f1255
knative.dev/control-protocol v0.0.0-20230501014357-da93cf1acdb9
knative.dev/hack v0.0.0-20230501013555-7d81248b4638
knative.dev/networking v0.0.0-20230501014756-02055c8804b3
knative.dev/pkg v0.0.0-20230501013355-904966742b58
sigs.k8s.io/yaml v1.3.0
)

Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1193,14 +1193,14 @@ k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+O
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4=
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 h1:GfD9OzL11kvZN5iArC6oTS7RTj7oJOIfnislxYlqTj8=
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/caching v0.0.0-20230418150356-05a86ad5d7db h1:69NjUKOsucU5tmCrnkdlULjai+2bZLHF97ZrA/v8PZI=
knative.dev/caching v0.0.0-20230418150356-05a86ad5d7db/go.mod h1:d261UN7aE93sODDnqVuhlaRcEUPjSBOyrUlBxuBp+UU=
knative.dev/control-protocol v0.0.0-20230420145039-d9cda76c5b03 h1:lbpfzz0rP4HtQkpH5IfUFdsDjrxHbtYWkOWrkwXzTT0=
knative.dev/control-protocol v0.0.0-20230420145039-d9cda76c5b03/go.mod h1:6IjiiR5I2DjCWdOsj0jLvrIjzDaKYaq4jUhT1ImrWwg=
knative.dev/hack v0.0.0-20230417170854-f591fea109b3 h1:+W4WBOq83tfGXKhtv8OB/uJeYqze3zh69GKiz1ucuqk=
knative.dev/hack v0.0.0-20230417170854-f591fea109b3/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/networking v0.0.0-20230419144338-e5d04e805e50 h1:X9rPBYr7Vrm075q0iXTr7/0oklkYoyqvlnrUwNzcUhI=
knative.dev/networking v0.0.0-20230419144338-e5d04e805e50/go.mod h1:o2MyGpGfU5DoSAWCE2f/jnSC9GjGOplCslbA99yDkGo=
knative.dev/caching v0.0.0-20230501013956-5faf6d6f1255 h1:1lQUnpWbm29CBQejfpzZafj03MheP5uSMDP8Ly7p950=
knative.dev/caching v0.0.0-20230501013956-5faf6d6f1255/go.mod h1:T9o6a5CZohhIezyujdVOz4lOkuV8QgjQlUNqq9N1k+4=
knative.dev/control-protocol v0.0.0-20230501014357-da93cf1acdb9 h1:KXzbo/xiiSiw5rxVHOejbLhk+6Xsj9vpkRhg//EruYA=
knative.dev/control-protocol v0.0.0-20230501014357-da93cf1acdb9/go.mod h1:6IjiiR5I2DjCWdOsj0jLvrIjzDaKYaq4jUhT1ImrWwg=
knative.dev/hack v0.0.0-20230501013555-7d81248b4638 h1:9IuXHdwp5jNmIg+0LVTQr8o4u0FYD99uCfynM9tS0XY=
knative.dev/hack v0.0.0-20230501013555-7d81248b4638/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/networking v0.0.0-20230501014756-02055c8804b3 h1:G4a5y5YICk4N1rWXZTbOElfx408QJhbEKU4sOL03sUM=
knative.dev/networking v0.0.0-20230501014756-02055c8804b3/go.mod h1:RCR6mSg74zrog/ZYLI7/ZPJOWGQsADOZXqDOeXeOCQw=
pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U=
pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
15 changes: 10 additions & 5 deletions pkg/autoscaler/statforwarder/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"time"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/websocket"
)

func TestProcessorForwardingViaPodIP(t *testing.T) {
Expand Down Expand Up @@ -104,10 +104,10 @@ func testService(t *testing.T, received chan struct{}) *httptest.Server {
if err != nil {
t.Fatal("error upgrading websocket:", err)
}
nc := websocket.NewNetConnExtension(conn)
defer nc.Close()
defer conn.Close()
for {
t, b, err := nc.ReadMessage()
var messages []wsutil.Message
messages, err = wsutil.ReadMessage(conn, ws.StateServerSide, messages)
if err != nil {
// This is probably caused by connection closed by client side.
return
Expand All @@ -116,7 +116,12 @@ func testService(t *testing.T, received chan struct{}) *httptest.Server {

// Answer messages to keep the connection's keepalive function moving and the
// connection closed quicker than 10s.
nc.WriteMessage(t, b)
for _, m := range messages {
op := m.OpCode
p := m.Payload
wsutil.WriteMessage(conn, ws.StateServerSide, op, p)
}

}
}

Expand Down
51 changes: 28 additions & 23 deletions pkg/autoscaler/statserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"time"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"go.uber.org/zap"
netheader "knative.dev/networking/pkg/http/header"
"knative.dev/pkg/websocket"
"knative.dev/serving/pkg/autoscaler/bucket"
"knative.dev/serving/pkg/autoscaler/metrics"
)
Expand Down Expand Up @@ -141,7 +141,6 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
s.logger.Errorw("error upgrading websocket", zap.Error(err))
return
}
nc := websocket.NewNetConnExtension(conn)

handlerCh := make(chan struct{})

Expand All @@ -152,11 +151,11 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
case <-s.stopCh:
// Send a close message to tell the client to immediately reconnect
s.logger.Debug("Sending close message to client")
err := nc.WriteMessage(ws.OpClose, ws.NewCloseFrameBody(closeCodeServiceRestart, "Restarting"))
err := wsutil.WriteMessage(conn, ws.StateServerSide, ws.OpClose, ws.NewCloseFrameBody(closeCodeServiceRestart, "Restarting"))
if err != nil {
s.logger.Warnw("Failed to send close message to client", zap.Error(err))
}
nc.Close()
conn.Close()
case <-handlerCh:
s.logger.Debug("Handler exit complete")
}
Expand All @@ -165,11 +164,12 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
s.logger.Debug("Connection upgraded to WebSocket. Entering receive loop.")

for {
messageType, msg, err := nc.ReadMessage()
var messages []wsutil.Message
messages, err = wsutil.ReadMessage(conn, ws.StateServerSide, messages)
if err != nil {
// We close abnormally, because we're just closing the connection in the client,
// which is okay. There's no value delaying closure of the connection unnecessarily.
if err == io.ErrUnexpectedEOF {
if errors.Is(err, io.ErrUnexpectedEOF) {
s.logger.Debug("Handler disconnected")
} else {
s.logger.Errorf("Handler exiting on error: %#v", err)
Expand All @@ -178,28 +178,33 @@ func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
return
}

switch messageType {
case ws.OpBinary:
var wsms metrics.WireStatMessages
if err := wsms.Unmarshal(msg); err != nil {
s.logger.Errorw("Failed to unmarshal the object", zap.Error(err))
continue
}

for _, wsm := range wsms.Messages {
if wsm.Stat == nil {
// To allow for future protobuf schema changes.
for _, m := range messages {
messageType := m.OpCode
msg := m.Payload
switch messageType {
case ws.OpBinary:
var wsms metrics.WireStatMessages
if err := wsms.Unmarshal(msg); err != nil {
s.logger.Errorw("Failed to unmarshal the object", zap.Error(err))
continue
}

sm := wsm.ToStatMessage()
s.logger.Debugf("Received stat message: %+v", sm)
s.statsCh <- sm
for _, wsm := range wsms.Messages {
if wsm.Stat == nil {
// To allow for future protobuf schema changes.
continue
}

sm := wsm.ToStatMessage()
s.logger.Debugf("Received stat message: %+v", sm)
s.statsCh <- sm
}
default:
s.logger.Error("Dropping unknown message type.")
continue
}
default:
s.logger.Error("Dropping unknown message type.")
continue
}

}
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/autoscaler/statserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
"time"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
nethttp "knative.dev/networking/pkg/http"
netheader "knative.dev/networking/pkg/http/header"
"knative.dev/pkg/websocket"
"knative.dev/serving/pkg/autoscaler/metrics"

"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestServerShutdown(t *testing.T) {
}

// Check connection has been closed with a close control message with a "service restart" close code
if header, r, _ := statSink.NextReader(); header.OpCode != ws.OpClose {
if header, r, _ := wsutil.NextReader(statSink, ws.StateClientSide); header.OpCode != ws.OpClose {
t.Fatal("Connection not closed")
} else {
payload, err := io.ReadAll(r)
Expand Down Expand Up @@ -235,7 +235,7 @@ func BenchmarkStatServer(b *testing.B) {
}
}

func assertReceivedProto(t *testing.T, sms []metrics.StatMessage, statSink *websocket.NetConnExtension, statsCh <-chan metrics.StatMessage) {
func assertReceivedProto(t *testing.T, sms []metrics.StatMessage, statSink net.Conn, statsCh <-chan metrics.StatMessage) {
t.Helper()

if err := sendProto(statSink, sms); err != nil {
Expand All @@ -251,7 +251,7 @@ func assertReceivedProto(t *testing.T, sms []metrics.StatMessage, statSink *webs
}
}

func dialOK(t *testing.T, serverURL string) *websocket.NetConnExtension {
func dialOK(t *testing.T, serverURL string) net.Conn {
t.Helper()

statSink, err := dial(serverURL)
Expand All @@ -261,7 +261,7 @@ func dialOK(t *testing.T, serverURL string) *websocket.NetConnExtension {
return statSink
}

func dial(serverURL string) (*websocket.NetConnExtension, error) {
func dial(serverURL string) (net.Conn, error) {
u, err := url.Parse(serverURL)
if err != nil {
return nil, err
Expand All @@ -272,26 +272,25 @@ func dial(serverURL string) (*websocket.NetConnExtension, error) {
Timeout: time.Second,
}
ctx := context.TODO()
conn, _, _, err := dialer.Dial(ctx, u.String())
statSink := websocket.NewNetConnExtension(conn)
statSink, _, _, err := dialer.Dial(ctx, u.String())
return statSink, err
}

func sendProto(statSink *websocket.NetConnExtension, sms []metrics.StatMessage) error {
func sendProto(statSink net.Conn, sms []metrics.StatMessage) error {
wsms := metrics.ToWireStatMessages(sms)
msg, err := wsms.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal StatMessage: %w", err)
}

if err := statSink.WriteMessage(ws.OpBinary, msg); err != nil {
if err := wsutil.WriteMessage(statSink, ws.StateClientSide, ws.OpBinary, msg); err != nil {
return fmt.Errorf("failed to write to stat sink: %w", err)
}

return nil
}

func closeSink(t *testing.T, statSink *websocket.NetConnExtension) {
func closeSink(t *testing.T, statSink net.Conn) {
t.Helper()

if err := statSink.Close(); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions test/e2e/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"time"

"github.com/gobwas/ws"
"knative.dev/pkg/websocket"
"github.com/gobwas/ws/wsutil"

"k8s.io/apimachinery/pkg/util/wait"
pkgTest "knative.dev/pkg/test"
Expand All @@ -44,7 +44,7 @@ const message = "Hello, websocket"

// connect attempts to establish WebSocket connection with the Service.
// It will retry until reaching `connectTimeout` duration.
func connect(t *testing.T, clients *test.Clients, domain, timeout string) (*websocket.NetConnExtension, error) {
func connect(t *testing.T, clients *test.Clients, domain, timeout string) (net.Conn, error) {
var (
err error
address string
Expand All @@ -65,7 +65,7 @@ func connect(t *testing.T, clients *test.Clients, domain, timeout string) (*webs
u = url.URL{Scheme: "wss", Host: net.JoinHostPort(address, mapper("443")), Path: "/", RawQuery: rawQuery}
}

var conn *websocket.NetConnExtension
var conn net.Conn
waitErr := wait.PollImmediate(connectRetryInterval, connectTimeout, func() (bool, error) {
t.Logf("Connecting using websocket: url=%s, host=%s", u.String(), domain)
dialer := &ws.Dialer{
Expand All @@ -86,7 +86,7 @@ func connect(t *testing.T, clients *test.Clients, domain, timeout string) (*webs
}
if c != nil {
t.Log("WebSocket connection established.")
conn = websocket.NewNetConnExtension(c)
conn = c
return true, nil
}
return false, nil
Expand All @@ -105,13 +105,15 @@ func ValidateWebSocketConnection(t *testing.T, clients *test.Clients, names test

// Send a message.
t.Logf("Sending message %q to server.", message)
if err = conn.WriteMessage(ws.OpText, []byte(message)); err != nil {
if err = wsutil.WriteMessage(conn, ws.StateClientSide, ws.OpText, []byte(message)); err != nil {
return err
}
t.Log("Message sent.")

// Read back the echoed message and compared with sent.
_, recv, err := conn.ReadMessage()
var messages []wsutil.Message
messages, err = wsutil.ReadMessage(conn, ws.StateClientSide, messages)
recv := messages[0].Payload
if err != nil {
return err
} else if strings.HasPrefix(string(recv), message) {
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ package e2e

import (
"context"
"testing"

"github.com/gobwas/ws"
"golang.org/x/sync/errgroup"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down
39 changes: 22 additions & 17 deletions test/test_images/wsserver/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"errors"
"flag"
"io"
"log"
Expand All @@ -27,8 +28,8 @@ import (

"github.com/gobwas/httphead"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
netheader "knative.dev/networking/pkg/http/header"
"knative.dev/pkg/websocket"
"knative.dev/serving/test"
)

Expand Down Expand Up @@ -69,36 +70,40 @@ func handler(w http.ResponseWriter, r *http.Request) {
log.Println("Error upgrading websocket:", err)
return
}
nc := websocket.NewNetConnExtension(conn)
defer nc.Close()
defer conn.Close()
log.Println("Connection upgraded to WebSocket. Entering receive loop.")
for {
messageType, message, err := nc.ReadMessage()
var messages []wsutil.Message
messages, err = wsutil.ReadMessage(conn, ws.StateServerSide, messages)
if err != nil {
// We close abnormally, because we're just closing the connection in the client,
// which is okay. There's no value delaying closure of the connection unnecessarily.
if err == io.ErrUnexpectedEOF {
if errors.Is(err, io.ErrUnexpectedEOF) {
log.Println("Client disconnected.")
} else {
log.Println("Handler exiting on error:", err)
}
return
}
if suffix := messageSuffix(); suffix != "" {
respMes := string(message) + " " + suffix
message = []byte(respMes)
}
for _, m := range messages {
messageType := m.OpCode
message := m.Payload
if suffix := messageSuffix(); suffix != "" {
respMes := string(message) + " " + suffix
message = []byte(respMes)
}

log.Printf("Successfully received: %q", message)
if delay > 0 {
time.Sleep(delay)
}
log.Printf("Successfully received: %q", message)
if delay > 0 {
time.Sleep(delay)
}

if err = nc.WriteMessage(messageType, message); err != nil {
log.Println("Failed to write message:", err)
return
if err = wsutil.WriteMessage(conn, ws.StateServerSide, messageType, message); err != nil {
log.Println("Failed to write message:", err)
return
}
log.Printf("Successfully wrote: %q", message)
}
log.Printf("Successfully wrote: %q", message)
}
}

Expand Down
Empty file modified vendor/k8s.io/code-generator/generate-groups.sh
100644 → 100755
Empty file.
Empty file modified vendor/knative.dev/pkg/hack/generate-knative.sh
100644 → 100755
Empty file.
Loading

0 comments on commit 6d1cf34

Please sign in to comment.