From 7ff3aa4282ed4322114a291e0d4c3eb64d6398d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= <5459617+joanlopez@users.noreply.github.com> Date: Thu, 4 Jul 2024 11:19:50 +0200 Subject: [PATCH] Update to xk6-websockets@v0.6.0 (#3824) --- go.mod | 2 +- go.sum | 4 +- .../grafana/xk6-websockets/websockets/blob.go | 192 ++++++++++++++++++ .../xk6-websockets/websockets/helpers.go | 52 +++++ .../xk6-websockets/websockets/websockets.go | 90 +++++--- vendor/modules.txt | 2 +- 6 files changed, 305 insertions(+), 37 deletions(-) create mode 100644 vendor/github.com/grafana/xk6-websockets/websockets/blob.go diff --git a/go.mod b/go.mod index 5c9e1b11f12..0e72bab190a 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/grafana/xk6-output-prometheus-remote v0.4.0 github.com/grafana/xk6-redis v0.3.0 github.com/grafana/xk6-webcrypto v0.4.0 - github.com/grafana/xk6-websockets v0.5.1 + github.com/grafana/xk6-websockets v0.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc github.com/jhump/protoreflect v1.16.0 diff --git a/go.sum b/go.sum index aa3d4f0de04..c528d777933 100644 --- a/go.sum +++ b/go.sum @@ -94,8 +94,8 @@ github.com/grafana/xk6-redis v0.3.0 h1:eV1YO0miPqGFilN8sL/3OdO6Mm+hZH2nsvJm5dkE0 github.com/grafana/xk6-redis v0.3.0/go.mod h1:3e/U9i1Nm3WEaMy4nZSGMjVf8ZsFau+aXurYJhJ7MfQ= github.com/grafana/xk6-webcrypto v0.4.0 h1:CXRGkvVg8snYEyGCq3d5XGzDPxTPJ1m5CS68jPdtZZk= github.com/grafana/xk6-webcrypto v0.4.0/go.mod h1:+THllImZ8OWlsFc8llWqvzzjottlGdXq/7rIQ16zmFs= -github.com/grafana/xk6-websockets v0.5.1 h1:wymI6UWpwDorv3mEInytrQjC9cmXYxQFygBOCMY1q6k= -github.com/grafana/xk6-websockets v0.5.1/go.mod h1:yPadv8R00MPCnV+GGSlYV/vwVgxKRCiiJoIfWsNGoQg= +github.com/grafana/xk6-websockets v0.6.0 h1:k1z8Xy4sGOB2+VF0q621MoNo0mYY12s8Cqqo5MMeO4w= +github.com/grafana/xk6-websockets v0.6.0/go.mod h1:D76ALTjp3bMugqx7ulseJ9TZrmSvDogieWxWXr8S0+A= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No= diff --git a/vendor/github.com/grafana/xk6-websockets/websockets/blob.go b/vendor/github.com/grafana/xk6-websockets/websockets/blob.go new file mode 100644 index 00000000000..9c36acc80b2 --- /dev/null +++ b/vendor/github.com/grafana/xk6-websockets/websockets/blob.go @@ -0,0 +1,192 @@ +package websockets + +import ( + "bytes" + "errors" + "fmt" + "strconv" + "unsafe" + + "github.com/grafana/sobek" + + "go.k6.io/k6/js/common" + "go.k6.io/k6/js/modules/k6/experimental/streams" +) + +type blob struct { + typ string + data bytes.Buffer +} + +func (b *blob) text() string { + return b.data.String() +} + +func (r *WebSocketsAPI) blob(call sobek.ConstructorCall) *sobek.Object { + rt := r.vu.Runtime() + + b := &blob{} + var blobParts []interface{} + if len(call.Arguments) > 0 { + if err := rt.ExportTo(call.Arguments[0], &blobParts); err != nil { + common.Throw(rt, fmt.Errorf("failed to process [blobParts]: %w", err)) + } + } + + if len(blobParts) > 0 { + r.fillData(b, blobParts, call) + } + + if len(call.Arguments) > 1 && !sobek.IsUndefined(call.Arguments[1]) { + opts := call.Arguments[1] + if !isObject(opts) { + common.Throw(rt, errors.New("[options] must be an object")) + } + + typeOpt := opts.ToObject(rt).Get("type") + if !sobek.IsUndefined(typeOpt) { + b.typ = typeOpt.String() + } + } + + obj := rt.NewObject() + must(rt, obj.DefineAccessorProperty("size", rt.ToValue(func() sobek.Value { + return rt.ToValue(b.data.Len()) + }), nil, sobek.FLAG_FALSE, sobek.FLAG_TRUE)) + must(rt, obj.DefineAccessorProperty("type", rt.ToValue(func() sobek.Value { + return rt.ToValue(b.typ) + }), nil, sobek.FLAG_FALSE, sobek.FLAG_TRUE)) + + must(rt, obj.Set("arrayBuffer", func(_ sobek.FunctionCall) sobek.Value { + promise, resolve, _ := rt.NewPromise() + resolve(rt.NewArrayBuffer(b.data.Bytes())) + return rt.ToValue(promise) + })) + must(rt, obj.Set("bytes", func(_ sobek.FunctionCall) sobek.Value { + promise, resolve, reject := rt.NewPromise() + data, err := rt.New(rt.Get("Uint8Array"), rt.ToValue(b.data.Bytes())) + if err == nil { + resolve(data) + } else { + reject(fmt.Errorf("failed to create Uint8Array: %w", err)) + } + return rt.ToValue(promise) + })) + must(rt, obj.Set("slice", func(call sobek.FunctionCall) sobek.Value { + return r.slice(call, b, rt) + })) + must(rt, obj.Set("text", func(_ sobek.FunctionCall) sobek.Value { + promise, resolve, _ := rt.NewPromise() + resolve(b.text()) + return rt.ToValue(promise) + })) + must(rt, obj.Set("stream", func(_ sobek.FunctionCall) sobek.Value { + return rt.ToValue(streams.NewReadableStreamFromReader(r.vu, &b.data)) + })) + + proto := call.This.Prototype() + must(rt, proto.Set("toString", func(_ sobek.FunctionCall) sobek.Value { + return rt.ToValue("[object Blob]") + })) + must(rt, obj.SetPrototype(proto)) + + return obj +} + +func (r *WebSocketsAPI) fillData(b *blob, blobParts []interface{}, call sobek.ConstructorCall) { + rt := r.vu.Runtime() + + if len(blobParts) > 0 { + for n, part := range blobParts { + var err error + switch v := part.(type) { + case []uint8: + _, err = b.data.Write(v) + case []int8, []int16, []int32, []int64, []uint16, []uint32, []uint64, []float32, []float64: + _, err = b.data.Write(toByteSlice(v)) + case sobek.ArrayBuffer: + _, err = b.data.Write(v.Bytes()) + case *sobek.ArrayBuffer: + _, err = b.data.Write(v.Bytes()) + case string: + _, err = b.data.WriteString(v) + case map[string]interface{}: + obj := call.Arguments[0].ToObject(rt).Get(strconv.FormatInt(int64(n), 10)).ToObject(rt) + switch { + case isDataView(obj, rt): + _, err = b.data.Write(obj.Get("buffer").Export().(sobek.ArrayBuffer).Bytes()) + case isBlob(obj, r.blobConstructor): + _, err = b.data.Write(extractBytes(obj, rt)) + default: + err = fmt.Errorf("unsupported type: %T", part) + } + default: + err = fmt.Errorf("unsupported type: %T", part) + } + if err != nil { + common.Throw(rt, fmt.Errorf("failed to process [blobParts]: %w", err)) + } + } + } +} + +func (r *WebSocketsAPI) slice(call sobek.FunctionCall, b *blob, rt *sobek.Runtime) sobek.Value { + var ( + from int + to = b.data.Len() + ct = "" + ) + + if len(call.Arguments) > 0 { + from = int(call.Arguments[0].ToInteger()) + } + + if len(call.Arguments) > 1 { + to = int(call.Arguments[1].ToInteger()) + if to < 0 { + to = b.data.Len() + to + } + } + + if len(call.Arguments) > 2 { + ct = call.Arguments[2].String() + } + + opts := rt.NewObject() + must(rt, opts.Set("type", ct)) + + sliced, err := rt.New(r.blobConstructor, rt.ToValue([]interface{}{b.data.Bytes()[from:to]}), opts) + must(rt, err) + + return sliced +} + +// toByteSlice converts a slice of numbers to a slice of bytes. +// +//nolint:gosec +func toByteSlice(data interface{}) []byte { + switch v := data.(type) { + case []int8: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)) + case []uint16: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)*2) + case []int16: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)*2) + case []uint32: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)*4) + case []int32: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)*4) + case []uint64: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)*8) + case []int64: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)*8) + case []float32: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)*4) + case []float64: + return unsafe.Slice((*byte)(unsafe.Pointer(&v[0])), len(v)*8) + default: + // this should never happen + common.Throw(nil, fmt.Errorf("unsupported type: %T", data)) + return nil + } +} diff --git a/vendor/github.com/grafana/xk6-websockets/websockets/helpers.go b/vendor/github.com/grafana/xk6-websockets/websockets/helpers.go index 282b45f754e..a03889806b1 100644 --- a/vendor/github.com/grafana/xk6-websockets/websockets/helpers.go +++ b/vendor/github.com/grafana/xk6-websockets/websockets/helpers.go @@ -1,6 +1,10 @@ package websockets import ( + "errors" + "fmt" + "reflect" + "github.com/grafana/sobek" "go.k6.io/k6/js/common" ) @@ -11,3 +15,51 @@ func must(rt *sobek.Runtime, err error) { common.Throw(rt, err) } } + +func isString(o *sobek.Object, rt *sobek.Runtime) bool { + return o.Prototype().Get("constructor") == rt.GlobalObject().Get("String") +} + +func isArray(o *sobek.Object, rt *sobek.Runtime) bool { + return o.Prototype().Get("constructor") == rt.GlobalObject().Get("Array") +} + +func isUint8Array(o *sobek.Object, rt *sobek.Runtime) bool { + return o.Prototype().Get("constructor") == rt.GlobalObject().Get("Uint8Array") +} + +func isDataView(o *sobek.Object, rt *sobek.Runtime) bool { + return o.Prototype().Get("constructor") == rt.GlobalObject().Get("DataView") +} + +func isBlob(o *sobek.Object, blobConstructor sobek.Value) bool { + return o.Prototype().Get("constructor") == blobConstructor +} + +func isObject(val sobek.Value) bool { + return val != nil && val.ExportType() != nil && val.ExportType().Kind() == reflect.Map +} + +func extractBytes(o *sobek.Object, rt *sobek.Runtime) []byte { + arrayBuffer, ok := sobek.AssertFunction(o.Get("arrayBuffer")) + if !ok { + common.Throw(rt, errors.New("Blob.[arrayBuffer] is not a function")) + } + + buffer, err := arrayBuffer(sobek.Undefined()) + if err != nil { + common.Throw(rt, fmt.Errorf("call to Blob.[arrayBuffer] failed: %w", err)) + } + + p, ok := buffer.Export().(*sobek.Promise) + if !ok { + common.Throw(rt, errors.New("Blob.[arrayBuffer] return is not a Promise")) + } + + ab, ok := p.Result().Export().(sobek.ArrayBuffer) + if !ok { + common.Throw(rt, errors.New("Blob.[arrayBuffer] promise's return is not an ArrayBuffer")) + } + + return ab.Bytes() +} diff --git a/vendor/github.com/grafana/xk6-websockets/websockets/websockets.go b/vendor/github.com/grafana/xk6-websockets/websockets/websockets.go index 008d40fe100..0fd751acd13 100644 --- a/vendor/github.com/grafana/xk6-websockets/websockets/websockets.go +++ b/vendor/github.com/grafana/xk6-websockets/websockets/websockets.go @@ -28,7 +28,8 @@ type RootModule struct{} // WebSocketsAPI is the k6 extension implementing the websocket API as defined in https://websockets.spec.whatwg.org type WebSocketsAPI struct { //nolint:revive - vu modules.VU + vu modules.VU + blobConstructor sobek.Value } var _ modules.Module = &RootModule{} @@ -42,9 +43,11 @@ func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { // Exports implements the modules.Instance interface's Exports func (r *WebSocketsAPI) Exports() modules.Exports { + r.blobConstructor = r.vu.Runtime().ToValue(r.blob) return modules.Exports{ Named: map[string]interface{}{ "WebSocket": r.websocket, + "Blob": r.blobConstructor, }, } } @@ -64,7 +67,9 @@ const ( ) type webSocket struct { - vu modules.VU + vu modules.VU + blobConstructor sobek.Value + url *url.URL conn *websocket.Conn tagsAndMeta *metrics.TagsAndMeta @@ -93,14 +98,6 @@ type ping struct { timestamps map[string]time.Time } -func isString(o *sobek.Object, rt *sobek.Runtime) bool { - return o.Prototype().Get("constructor") == rt.GlobalObject().Get("String") -} - -func isArray(o *sobek.Object, rt *sobek.Runtime) bool { - return o.Prototype().Get("constructor") == rt.GlobalObject().Get("Array") -} - func (r *WebSocketsAPI) websocket(c sobek.ConstructorCall) *sobek.Object { rt := r.vu.Runtime() @@ -128,17 +125,18 @@ func (r *WebSocketsAPI) websocket(c sobek.ConstructorCall) *sobek.Object { } w := &webSocket{ - vu: r.vu, - url: url, - tq: taskqueue.New(r.vu.RegisterCallback), - readyState: CONNECTING, - builtinMetrics: r.vu.State().BuiltinMetrics, - done: make(chan struct{}), - writeQueueCh: make(chan message), - eventListeners: newEventListeners(), - obj: rt.NewObject(), - tagsAndMeta: params.tagsAndMeta, - sendPings: ping{timestamps: make(map[string]time.Time)}, + vu: r.vu, + blobConstructor: r.blobConstructor, + url: url, + tq: taskqueue.New(r.vu.RegisterCallback), + readyState: CONNECTING, + builtinMetrics: r.vu.State().BuiltinMetrics, + done: make(chan struct{}), + writeQueueCh: make(chan message), + eventListeners: newEventListeners(), + obj: rt.NewObject(), + tagsAndMeta: params.tagsAndMeta, + sendPings: ping{timestamps: make(map[string]time.Time)}, } // Maybe have this after the goroutine below ?!? @@ -203,13 +201,11 @@ func defineWebsocket(rt *sobek.Runtime, w *webSocket) { return rt.ToValue(w.binaryType) }), rt.ToValue(func(s string) error { switch s { - case blobBinaryType: - return errors.New("blob is currently not supported, only arraybuffer is") - case arraybufferBinaryType: + case blobBinaryType, arraybufferBinaryType: w.binaryType = s return nil default: - return fmt.Errorf("unknown binaryType %s, the supported one is arraybuffer", s) + return fmt.Errorf(`unknown binaryType %s, the supported ones are "blob" and "arraybuffer"`, s) } }), sobek.FLAG_FALSE, sobek.FLAG_TRUE)) @@ -425,8 +421,9 @@ func (w *webSocket) loop() { } } -const binarytypeWarning = `You have not set a Websocket binaryType to "arraybuffer", but you got a binary response. ` + - `This has been done automatically now, but in the future this will not work.` +const binarytypeError = `websocket's binaryType hasn't been set to either "blob" or "arraybuffer", ` + + `but a binary message has been received. ` + + `"blob" is still not the default so the websocket is erroring out` func (w *webSocket) queueMessage(msg *message) { w.tq.Queue(func() error { @@ -448,13 +445,25 @@ func (w *webSocket) queueMessage(msg *message) { ev := w.newEvent(events.MESSAGE, msg.t) if msg.mtype == websocket.BinaryMessage { - if w.binaryType == "" { - w.binaryType = arraybufferBinaryType - w.vu.State().Logger.Warn(binarytypeWarning) + var data any + // Lets error out for a k6 release, at least, when there's no binaryType set. + // In the future, we'll use "blob" as default, as per spec: + // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/binaryType + switch w.binaryType { + case "": + return errors.New(binarytypeError) + case blobBinaryType: + var err error + data, err = rt.New(w.blobConstructor, rt.ToValue([]interface{}{msg.data})) + if err != nil { + return fmt.Errorf("failed to create Blob: %w", err) + } + case arraybufferBinaryType: + data = rt.NewArrayBuffer(msg.data) + default: + return fmt.Errorf(`unknown binaryType %s, the supported ones are "blob" and "arraybuffer"`, w.binaryType) } - // TODO this technically could be BLOB , but we don't support that - ab := rt.NewArrayBuffer(msg.data) - must(rt, ev.DefineDataProperty("data", rt.ToValue(ab), sobek.FLAG_FALSE, sobek.FLAG_FALSE, sobek.FLAG_TRUE)) + must(rt, ev.DefineDataProperty("data", rt.ToValue(data), sobek.FLAG_FALSE, sobek.FLAG_FALSE, sobek.FLAG_TRUE)) } else { must( rt, @@ -615,6 +624,21 @@ func (w *webSocket) send(msg sobek.Value) { data: b, t: time.Now(), } + case map[string]interface{}: + rt := w.vu.Runtime() + obj := msg.ToObject(rt) + if !isBlob(obj, w.blobConstructor) { + common.Throw(rt, fmt.Errorf("unsupported send type %T", o)) + } + + b := extractBytes(obj, rt) + w.bufferedAmount += len(b) + w.writeQueueCh <- message{ + mtype: websocket.BinaryMessage, + data: b, + t: time.Now(), + } + default: common.Throw(w.vu.Runtime(), fmt.Errorf("unsupported send type %T", o)) } diff --git a/vendor/modules.txt b/vendor/modules.txt index e0c553b4cbf..765507dfa3f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -200,7 +200,7 @@ github.com/grafana/xk6-redis/redis # github.com/grafana/xk6-webcrypto v0.4.0 ## explicit; go 1.20 github.com/grafana/xk6-webcrypto/webcrypto -# github.com/grafana/xk6-websockets v0.5.1 +# github.com/grafana/xk6-websockets v0.6.0 ## explicit; go 1.20 github.com/grafana/xk6-websockets/websockets github.com/grafana/xk6-websockets/websockets/events