From 389cfdfacb0dbffa46d692ebbc2d394db0cef5ad Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Thu, 19 Dec 2024 00:46:55 +0300 Subject: [PATCH] resolve issue #110 * new methods 'Router.Call[XYZ]' to replace the deprecated one 'RouterCallImpl' * new backward-compatible signature for StorageResultTypedFunc to fix interface for RouterCallImpl --- api.go | 240 +++++++++++++++++++++++++++------- api_test.go | 2 +- replicaset.go | 13 +- tests/tnt/call_bench_test.go | 119 ++++++++++++++++- tests/tnt/router_call_test.go | 11 +- 5 files changed, 318 insertions(+), 67 deletions(-) diff --git a/api.go b/api.go index 4424438..4430ccc 100644 --- a/api.go +++ b/api.go @@ -22,6 +22,9 @@ type VshardMode string const ( ReadMode VshardMode = "read" WriteMode VshardMode = "write" + + // callTimeoutDefault is a default timeout when no timeout is provided + callTimeoutDefault = 500 * time.Millisecond ) func (c VshardMode) String() string { @@ -31,7 +34,7 @@ func (c VshardMode) String() string { type vshardStorageCallResponseProto struct { AssertError *assertError // not nil if there is assert error VshardError *StorageCallVShardError // not nil if there is vshard response - Data []interface{} // raw response data + CallResp VshardRouterCallResp } func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error { @@ -115,14 +118,13 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error } // isVShardRespOk is true - r.Data = make([]interface{}, 0, respArrayLen-1) - + r.CallResp.rawMessages = make([]msgpack.RawMessage, 0, respArrayLen-1) for i := 1; i < respArrayLen; i++ { - elem, err := d.DecodeInterface() + elem, err := d.DecodeRaw() if err != nil { - return fmt.Errorf("failed to decode into interface element #%d of response array", i+1) + return fmt.Errorf("failed to decode into msgpack.RawMessage element #%d of response array", i-1) } - r.Data = append(r.Data, elem) + r.CallResp.rawMessages = append(r.CallResp.rawMessages, elem) } return nil @@ -167,7 +169,7 @@ func (s StorageCallVShardError) Error() string { return fmt.Sprintf("%+v", alias(s)) } -type StorageResultTypedFunc = func(result interface{}) error +type StorageResultTypedFunc = func(result ...interface{}) error type CallOpts struct { VshardMode VshardMode // vshard mode in call @@ -175,54 +177,172 @@ type CallOpts struct { Timeout time.Duration } -// revive warns us: time-naming: var CallTimeoutMin is of type time.Duration; don't use unit-specific suffix "Min". -// But the original lua vshard implementation uses this naming, so we use it too. -// -//nolint:revive -const CallTimeoutMin = time.Second / 2 +// VshardRouterCallMode is a type to represent call mode for Router.Call method. +type VshardRouterCallMode int + +const ( + // VshardRouterCallModeRO sets a read-only mode for Router.Call. + VshardRouterCallModeRO VshardRouterCallMode = iota + // VshardRouterCallModeRW sets a read-write mode for Router.Call. + VshardRouterCallModeRW + // VshardRouterCallModeRE acts like VshardRouterCallModeRO + // with preference for a replica rather than a master. + // This mode is not supported yet. + VshardRouterCallModeRE + // VshardRouterCallModeBRO acts like VshardRouterCallModeRO with balancing. + VshardRouterCallModeBRO + // VshardRouterCallModeBRE acts like VshardRouterCallModeRO with balancing + // and preference for a replica rather than a master. + VshardRouterCallModeBRE +) + +// VshardRouterCallOptions represents options to Router.Call[XXX] methods. +type VshardRouterCallOptions struct { + Timeout time.Duration +} + +// VshardRouterCallResp represents a response from Router.Call[XXX] methods. +type VshardRouterCallResp struct { + rawMessages []msgpack.RawMessage +} + +// Get returns a response from user defined function as []interface{}. +func (r VshardRouterCallResp) Get() ([]interface{}, error) { + resp := make([]interface{}, len(r.rawMessages)) + return resp, r.GetTyped(resp) +} + +// GetTyped decodes a reponse from user defined fuction into custom values. +func (r VshardRouterCallResp) GetTyped(result []interface{}) error { + minLen := len(result) + if dataLen := len(r.rawMessages); dataLen < minLen { + minLen = dataLen + } + + for i := 0; i < minLen; i++ { + if err := msgpack.Unmarshal(r.rawMessages[i], &result[i]); err != nil { + return fmt.Errorf("failed to decode into result[%d] element #%d of response array: %w", i, i, err) + } + } + + return nil +} // RouterCallImpl Perform shard operation function will restart operation // after wrong bucket response until timeout is reached +// Deprecated: RouterCallImpl is deprecated. +// See https://github.com/KaymeKaydex/go-vshard-router/issues/110. +// Use Call, Call[RO, RW, RE, BRO, BRE] methods instead. func (r *Router) RouterCallImpl(ctx context.Context, bucketID uint64, opts CallOpts, fnc string, args interface{}) (interface{}, StorageResultTypedFunc, error) { + var vshardCallOpts = VshardRouterCallOptions{ + Timeout: opts.Timeout, + } + + var vshardCallMode VshardRouterCallMode + + switch opts.VshardMode { + case WriteMode: + vshardCallMode = VshardRouterCallModeRW + case ReadMode: + switch opts.PoolMode { + case pool.ANY: + vshardCallMode = VshardRouterCallModeBRO + case pool.RO: + vshardCallMode = VshardRouterCallModeRO + case pool.RW: + return nil, nil, fmt.Errorf("unexpected opts %+v", opts) + case pool.PreferRO: + vshardCallMode = VshardRouterCallModeBRE + case pool.PreferRW: + return nil, nil, fmt.Errorf("unexpected opts %+v", opts) + default: + return nil, nil, fmt.Errorf("unexpected opts.PoolMode %v", opts.PoolMode) + } + default: + return nil, nil, fmt.Errorf("unexpected opts.VshardMode %v", opts.VshardMode) + } + + vshardCallResp, err := r.Call(ctx, bucketID, vshardCallMode, fnc, args, vshardCallOpts) + if err != nil { + return nil, nil, err + } + + data, err := vshardCallResp.Get() + if err != nil { + return nil, nil, err + } + + return data, func(result ...interface{}) error { + return vshardCallResp.GetTyped(result) + }, nil +} + +// Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'. +func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardRouterCallMode, + fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { const vshardStorageClientCall = "vshard.storage.call" if bucketID < 1 || r.cfg.TotalBucketCount < bucketID { - return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) + return VshardRouterCallResp{}, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) } - if opts.Timeout == 0 { - opts.Timeout = CallTimeoutMin + var poolMode pool.Mode + var vshardMode VshardMode + + switch mode { + case VshardRouterCallModeRO: + poolMode, vshardMode = pool.RO, ReadMode + case VshardRouterCallModeRW: + poolMode, vshardMode = pool.RW, WriteMode + case VshardRouterCallModeRE: + // poolMode, vshardMode = pool.PreferRO, ReadMode + // since go-tarantool always use balance=true politic, + // we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400 + return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet") + case VshardRouterCallModeBRO: + poolMode, vshardMode = pool.ANY, ReadMode + case VshardRouterCallModeBRE: + poolMode, vshardMode = pool.PreferRO, ReadMode + default: + return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode) } - timeout := opts.Timeout - timeStart := time.Now() + timeout := callTimeoutDefault + if opts.Timeout > 0 { + timeout = opts.Timeout + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + tntReq := tarantool.NewCallRequest(vshardStorageClientCall). + Context(ctx). + Args([]interface{}{ + bucketID, + vshardMode, + fnc, + args, + }) - req := tarantool.NewCallRequest(vshardStorageClientCall) - req = req.Context(ctx) - req = req.Args([]interface{}{ - bucketID, - opts.VshardMode.String(), - fnc, - args, - }) + requestStartTime := time.Now() var err error for { - if since := time.Since(timeStart); since > timeout { - r.metrics().RequestDuration(since, false, false) + if spent := time.Since(requestStartTime); spent > timeout { + r.metrics().RequestDuration(spent, false, false) - r.log().Debugf(ctx, "Return result on timeout; since %s of timeout %s", since, timeout) + r.log().Debugf(ctx, "Return result on timeout; spent %s of timeout %s", spent, timeout) if err == nil { err = fmt.Errorf("cant get call cause call impl timeout") } - return nil, nil, err + return VshardRouterCallResp{}, err } var rs *Replicaset @@ -242,18 +362,16 @@ func (r *Router) RouterCallImpl(ctx context.Context, r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID) - future := rs.conn.Do(req, opts.PoolMode) - var storageCallResponse vshardStorageCallResponseProto - err = future.GetTyped(&storageCallResponse) + err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse) if err != nil { - return nil, nil, fmt.Errorf("got error on future.Get(): %w", err) + return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err) } r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse) if storageCallResponse.AssertError != nil { - return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError) + return VshardRouterCallResp{}, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError) } if storageCallResponse.VshardError != nil { @@ -267,7 +385,7 @@ func (r *Router) RouterCallImpl(ctx context.Context, if vshardError.Destination != "" { destinationUUID, err := uuid.Parse(vshardError.Destination) if err != nil { - return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w", + return VshardRouterCallResp{}, fmt.Errorf("protocol violation %s: malformed destination %w: %w", vshardStorageClientCall, vshardError, err) } @@ -291,8 +409,8 @@ func (r *Router) RouterCallImpl(ctx context.Context, const defaultPoolingPause = 50 * time.Millisecond time.Sleep(defaultPoolingPause) - if time.Since(timeStart) > timeout { - return nil, nil, vshardError + if spent := time.Since(requestStartTime); spent > timeout { + return VshardRouterCallResp{}, vshardError } } } @@ -311,30 +429,52 @@ func (r *Router) RouterCallImpl(ctx context.Context, // There is a comment why lua vshard router doesn't retry: // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697 r.BucketReset(bucketID) - return nil, nil, vshardError + return VshardRouterCallResp{}, vshardError case VShardErrNameNonMaster: // vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case: // See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704. // Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master, // we just return this error as is. - return nil, nil, vshardError + return VshardRouterCallResp{}, vshardError default: - return nil, nil, vshardError + return VshardRouterCallResp{}, vshardError } } - r.metrics().RequestDuration(time.Since(timeStart), true, false) + r.metrics().RequestDuration(time.Since(requestStartTime), true, false) - return storageCallResponse.Data, func(result interface{}) error { - if len(storageCallResponse.Data) == 0 { - return nil - } + return storageCallResponse.CallResp, nil + } +} + +// CallRO is an alias for Call with VshardRouterCallModeRO. +func (r *Router) CallRO(ctx context.Context, bucketID uint64, + fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, VshardRouterCallModeRO, fnc, args, opts) +} - var stub bool +// CallRW is an alias for Call with VshardRouterCallModeRW. +func (r *Router) CallRW(ctx context.Context, bucketID uint64, + fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, VshardRouterCallModeRW, fnc, args, opts) +} - return future.GetTyped(&[]interface{}{&stub, result}) - }, nil - } +// CallRE is an alias for Call with VshardRouterCallModeRE. +func (r *Router) CallRE(ctx context.Context, bucketID uint64, + fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, VshardRouterCallModeRE, fnc, args, opts) +} + +// CallBRO is an alias for Call with VshardRouterCallModeBRO. +func (r *Router) CallBRO(ctx context.Context, bucketID uint64, + fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, VshardRouterCallModeBRO, fnc, args, opts) +} + +// CallBRE is an alias for Call with VshardRouterCallModeBRE. +func (r *Router) CallBRE(ctx context.Context, bucketID uint64, + fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, VshardRouterCallModeBRE, fnc, args, opts) } // RouterMapCallRWOptions sets options for RouterMapCallRW. @@ -487,7 +627,7 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context, ) (map[uuid.UUID]T, error) { const vshardStorageServiceCall = "vshard.storage._call" - timeout := CallTimeoutMin + timeout := callTimeoutDefault if opts.Timeout > 0 { timeout = opts.Timeout } diff --git a/api_test.go b/api_test.go index 898099d..19f7f44 100644 --- a/api_test.go +++ b/api_test.go @@ -67,7 +67,7 @@ func TestRouter_RouterCallImpl(t *testing.T) { conn: mPool, }) - _, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second}, "test", []byte("test")) + _, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second, VshardMode: ReadMode}, "test", []byte("test")) require.ErrorIs(t, err, futureError) }) } diff --git a/replicaset.go b/replicaset.go index fe60a73..714b9f4 100644 --- a/replicaset.go +++ b/replicaset.go @@ -120,8 +120,11 @@ func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) { } // ReplicaCall perform function on remote storage -// link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661 -// This method is deprecated, because looks like it has a little bit broken interface +// link https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L661 +// Deprecated: RouterCallImpl is deprecated, +// because looks like it has a little bit broken interface. +// See https://github.com/KaymeKaydex/go-vshard-router/issues/42. +// Use CallAsync instead. func (rs *Replicaset) ReplicaCall( ctx context.Context, opts ReplicasetCallOpts, @@ -129,7 +132,7 @@ func (rs *Replicaset) ReplicaCall( args interface{}, ) (interface{}, StorageResultTypedFunc, error) { if opts.Timeout == 0 { - opts.Timeout = CallTimeoutMin + opts.Timeout = callTimeoutDefault } future := rs.CallAsync(ctx, opts, fnc, args) @@ -144,8 +147,8 @@ func (rs *Replicaset) ReplicaCall( return nil, nil, fmt.Errorf("%s response data is empty", fnc) } - return respData[0], func(result interface{}) error { - return future.GetTyped(&[]interface{}{&result}) + return respData[0], func(result ...interface{}) error { + return future.GetTyped(&result) }, nil } diff --git a/tests/tnt/call_bench_test.go b/tests/tnt/call_bench_test.go index 46c2dbd..7b6f5d9 100644 --- a/tests/tnt/call_bench_test.go +++ b/tests/tnt/call_bench_test.go @@ -20,7 +20,7 @@ type Product struct { Count uint64 `msgpack:"count"` } -func BenchmarkCallSimpleInsert_GO(b *testing.B) { +func BenchmarkCallSimpleInsert_GO_RouterCall(b *testing.B) { b.StopTimer() skipOnInvalidRun(b) @@ -56,6 +56,43 @@ func BenchmarkCallSimpleInsert_GO(b *testing.B) { b.ReportAllocs() } +func BenchmarkCallSimpleInsert_GO_Call(b *testing.B) { + b.StopTimer() + skipOnInvalidRun(b) + + ctx := context.Background() + + cfg := getCfg() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(cfg), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: defaultTntUser, + Password: defaultTntPassword, + RequestTimeout: time.Minute, + }) + require.NoError(b, err) + + b.StartTimer() + for i := 0; i < b.N; i++ { + id := uuid.New() + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + _, err := router.Call( + ctx, + bucketID, + vshardrouter.VshardRouterCallModeRW, + "product_add", + []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}, + vshardrouter.VshardRouterCallOptions{Timeout: 10 * time.Second}) + require.NoError(b, err) + } + + b.ReportAllocs() +} + func BenchmarkCallSimpleInsert_Lua(b *testing.B) { b.StopTimer() @@ -92,7 +129,7 @@ func BenchmarkCallSimpleInsert_Lua(b *testing.B) { b.ReportAllocs() } -func BenchmarkCallSimpleSelect_GO(b *testing.B) { +func BenchmarkCallSimpleSelect_GO_RouterCall(b *testing.B) { b.StopTimer() skipOnInvalidRun(b) @@ -135,15 +172,85 @@ func BenchmarkCallSimpleSelect_GO(b *testing.B) { id := ids[i] bucketID := router.RouterBucketIDStrCRC32(id.String()) - faces, _, err := router.RouterCallImpl( + _, getTyped, err1 := router.RouterCallImpl( ctx, bucketID, vshardrouter.CallOpts{VshardMode: vshardrouter.ReadMode, PoolMode: pool.ANY, Timeout: time.Second}, "product_get", []interface{}{&Request{ID: id.String()}}) + + var product Product + err2 := getTyped(&product) + b.StopTimer() + require.NoError(b, err1) + require.NoError(b, err2) + b.StartTimer() + } + + b.ReportAllocs() +} + +func BenchmarkCallSimpleSelect_GO_Call(b *testing.B) { + b.StopTimer() + skipOnInvalidRun(b) + + ctx := context.Background() + + cfg := getCfg() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(cfg), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: defaultTntUser, + Password: defaultTntPassword, + }) + require.NoError(b, err) + + ids := make([]uuid.UUID, b.N) + + for i := 0; i < b.N; i++ { + id := uuid.New() + ids[i] = id + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + _, err := router.Call( + ctx, + bucketID, + vshardrouter.VshardRouterCallModeRW, + "product_add", + []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}, + vshardrouter.VshardRouterCallOptions{}, + ) require.NoError(b, err) - require.NotEmpty(b, faces) + } + + type Request struct { + ID string `msgpack:"id"` + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + id := ids[i] + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + resp, err1 := router.Call( + ctx, + bucketID, + vshardrouter.VshardRouterCallModeBRO, + "product_get", + []interface{}{&Request{ID: id.String()}}, + vshardrouter.VshardRouterCallOptions{Timeout: time.Second}, + ) + + var product Product + err2 := resp.GetTyped([]interface{}{&product}) + + b.StopTimer() + require.NoError(b, err1) + require.NoError(b, err2) b.StartTimer() } @@ -210,11 +317,11 @@ func BenchmarkCallSimpleSelect_Lua(b *testing.B) { Args([]interface{}{&Request{ID: id.String()}}) feature := p.Do(req, pool.ANY) - faces, err := feature.Get() + var product Product + err := feature.GetTyped(&[]interface{}{&product}) b.StopTimer() require.NoError(b, err) - require.NotNil(b, faces) b.StartTimer() } diff --git a/tests/tnt/router_call_test.go b/tests/tnt/router_call_test.go index d5b1182..c68c57d 100644 --- a/tests/tnt/router_call_test.go +++ b/tests/tnt/router_call_test.go @@ -32,8 +32,8 @@ func TestRouterCallProto(t *testing.T) { require.Nil(t, err, "NewRouter finished successfully") bucketID := randBucketID(totalBucketCount) - arg1 := "arg1" - args := []interface{}{arg1} + arg1, arg2 := "arg1", "arg2" + args := []interface{}{arg1, arg2} callOpts := vshardrouter.CallOpts{ VshardMode: vshardrouter.ReadMode, PoolMode: pool.PreferRO, @@ -42,10 +42,11 @@ func TestRouterCallProto(t *testing.T) { resp, getTyped, err := router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args) require.Nil(t, err, "RouterCallImpl echo finished with no err") require.EqualValues(t, args, resp, "RouterCallImpl echo resp correct") - var arg1Got string - err = getTyped(&arg1Got) + var arg1Got, arg2Got string + err = getTyped(&arg1Got, &arg2Got) require.Nil(t, err, "RouterCallImpl getTyped call ok") - require.Equal(t, arg1, arg1Got, "RouterCallImpl getTyped res ok") + require.Equal(t, arg1, arg1Got, "RouterCallImpl getTyped arg1 res ok") + require.Equal(t, arg2, arg2Got, "RouterCallImpl getTyped arg2 res ok") _, _, err = router.RouterCallImpl(ctx, totalBucketCount+1, callOpts, "echo", args) require.Error(t, err, "RouterCallImpl echo finished with err when bucketID is out of range")