Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
partially resolve #100 (router_map_callrw) (#114)
Browse files Browse the repository at this point in the history
* custom msgpackv5 decoders for router_map_callrw
* new method 'RouterMapCallRW[T]'
  • Loading branch information
nurzhan-saktaganov authored Dec 18, 2024
1 parent 0a8677f commit be1cfc4
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 62 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ CHANGES:
* Decode 'vshard.storage.call' response manually into struct vshardStorageCallResponseProto using DecodeMsgpack interface to reduce allocations (partially #61, #100).
* Remove `mapstructure` tag from StorageCallVShardError.
* Update benchmarks in README files.
* Package `mapstructure` is completely removed from direct dependencies list.

FEATURES:

* Add pause between requests in buckets discovering. Configured by config DiscoveryWorkStep, default is 10ms.
* Add ReplicaUUID to the StorageCallVShardError struct.
* New method 'RouterMapCallRW[T]' to replace the deprecated one 'RouterMapCallRWImpl'.

REFACTOR:

Expand All @@ -26,9 +28,11 @@ REFACTOR:
* Remove bucketStatError type, use StorageCallVShardError type instead.
* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100).
* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content).
* Add custom msgpackv5 decoder for 'RouterMapCallRW'.

TESTS:
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.
* Test for new 'RouterMapCallRW[T]'.

## v1.2.0

Expand Down
213 changes: 152 additions & 61 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/google/uuid"
"github.com/mitchellh/mapstructure"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
Expand Down Expand Up @@ -338,15 +337,154 @@ func (r *Router) RouterCallImpl(ctx context.Context,
}
}

// RouterMapCallRWOptions sets options for RouterMapCallRW.
type RouterMapCallRWOptions struct {
// Timeout defines timeout for RouterMapCallRW.
Timeout time.Duration
}

type storageMapResponseProto[T any] struct {
ok bool
value T
err StorageCallVShardError
}

func (r *storageMapResponseProto[T]) DecodeMsgpack(d *msgpack.Decoder) error {
// proto for 'storage_map' method
// https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/storage/init.lua#L3158
respArrayLen, err := d.DecodeArrayLen()
if err != nil {
return err
}

if respArrayLen == 0 {
return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
}

code, err := d.PeekCode()
if err != nil {
return err
}

if code == msgpcode.Nil {
err = d.DecodeNil()
if err != nil {
return err
}

if respArrayLen != 2 {
return fmt.Errorf("protocol violation: length is %d on vshard error case", respArrayLen)
}

err = d.Decode(&r.err)
if err != nil {
return fmt.Errorf("failed to decode storage vshard error: %w", err)
}

return nil
}

isOk, err := d.DecodeBool()
if err != nil {
return err
}

if !isOk {
return fmt.Errorf("protocol violation: isOk=false")
}

switch respArrayLen {
case 1:
break
case 2:
err = d.Decode(&r.value)
if err != nil {
return fmt.Errorf("can't decode value %T: %w", r.value, err)
}
default:
return fmt.Errorf("protocol violation: invalid array length when no vshard error: %d", respArrayLen)
}

r.ok = true

return nil
}

type storageRefResponseProto struct {
err error
bucketCount uint64
}

func (r *storageRefResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
respArrayLen, err := d.DecodeArrayLen()
if err != nil {
return err
}

if respArrayLen == 0 {
return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
}

code, err := d.PeekCode()
if err != nil {
return err
}

if code == msgpcode.Nil {
err = d.DecodeNil()
if err != nil {
return err
}

if respArrayLen != 2 {
return fmt.Errorf("protocol violation: length is %d on error case", respArrayLen)
}

// The possible variations of error here are fully unknown yet for us, e.g:
// vshard error, assert error or some other type of error. So this question requires research.
// So we do not decode it to some known error format, because we don't use it anyway.
decodedError, err := d.DecodeInterface()
if err != nil {
return err
}

// convert empty interface into error
r.err = fmt.Errorf("%v", decodedError)

return nil
}

r.bucketCount, err = d.DecodeUint64()
if err != nil {
return err
}

return nil
}

// RouterMapCallRWImpl perform call function on all masters in the cluster
// with a guarantee that in case of success it was executed with all
// buckets being accessible for reads and writes.
// Deprecated: RouterMapCallRWImpl is deprecated.
// Use more general RouterMapCallRW instead.
func (r *Router) RouterMapCallRWImpl(
ctx context.Context,
fnc string,
args interface{},
opts CallOpts,
) (map[uuid.UUID]interface{}, error) {
return RouterMapCallRW[interface{}](r, ctx, fnc, args, RouterMapCallRWOptions{Timeout: opts.Timeout})
}

// RouterMapCallRW is a consistent Map-Reduce. The given function is called on all masters in the
// cluster with a guarantee that in case of success it was executed with all
// buckets being accessible for reads and writes.
// T is a return type of user defined function 'fnc'.
// We define it as a distinct function, not a Router method, because golang limitations,
// see: https://github.com/golang/go/issues/49085.
func RouterMapCallRW[T any](r *Router, ctx context.Context,
fnc string, args interface{}, opts RouterMapCallRWOptions,
) (map[uuid.UUID]T, error) {
const vshardStorageServiceCall = "vshard.storage._call"

timeout := CallTimeoutMin
Expand Down Expand Up @@ -399,32 +537,17 @@ func (r *Router) RouterMapCallRWImpl(
// proto for 'storage_ref' method:
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3137
for _, rsFuture := range rsFutures {
respData, err := rsFuture.future.Get()
if err != nil {
return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.uuid, err)
}

if len(respData) < 1 {
return nil, fmt.Errorf("protocol violation: storage_ref: expected len(respData) 1 or 2, got: %d", len(respData))
}
var storageRefResponse storageRefResponseProto

if respData[0] == nil {
if len(respData) != 2 {
return nil, fmt.Errorf("protocol vioaltion: storage_ref: expected len(respData) = 2 when respData[0] == nil, got %d", len((respData)))
}

// The possible variations of error in respData[1] are fully unknown yet for us, this question requires research.
// So we do not convert respData[1] to some known error format, because we don't use it anyway.
return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.uuid, respData[1])
if err := rsFuture.future.GetTyped(&storageRefResponse); err != nil {
return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.uuid, err)
}

var bucketCount uint64
err = rsFuture.future.GetTyped(&[]interface{}{&bucketCount})
if err != nil {
return nil, err
if storageRefResponse.err != nil {
return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.uuid, storageRefResponse.err)
}

totalBucketCount += bucketCount
totalBucketCount += storageRefResponse.bucketCount
}

if totalBucketCount != r.cfg.TotalBucketCount {
Expand All @@ -449,52 +572,20 @@ func (r *Router) RouterMapCallRWImpl(
}

// map stage: get their responses
idToResult := make(map[uuid.UUID]interface{})
// proto for 'storage_map' method:
// https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/storage/init.lua#L3158
idToResult := make(map[uuid.UUID]T)
for _, rsFuture := range rsFutures {
respData, err := rsFuture.future.Get()
if err != nil {
return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.uuid, err)
}

if len(respData) < 1 {
return nil, fmt.Errorf("protocol violation: invalid respData length: must be >= 1, current: %d", len(respData))
}

if respData[0] == nil {
if len(respData) != 2 {
return nil, fmt.Errorf("protocol violation: invalid respData length when respData[0] == nil, must be = 2, current: %d", len(respData))
}

var assertError assertError
err = mapstructure.Decode(respData[1], &assertError)
if err != nil {
// We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode.
return nil, fmt.Errorf("storage_map failed on %v: %+v (decoding to assertError failed %v)", rsFuture.uuid, respData[1], err)
}
var storageMapResponse storageMapResponseProto[T]

return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.uuid, assertError)
}

var isVShardRespOk bool
err = rsFuture.future.GetTyped(&[]interface{}{&isVShardRespOk})
err := rsFuture.future.GetTyped(&storageMapResponse)
if err != nil {
return nil, fmt.Errorf("can't decode isVShardRespOk for storage_map response: %v", err)
return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.uuid, err)
}

if !isVShardRespOk {
return nil, fmt.Errorf("protocol violation: isVShardRespOk = false from storage_map: replicaset %v", rsFuture.uuid)
if !storageMapResponse.ok {
return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.uuid, storageMapResponse.err)
}

switch l := len(respData); l {
case 1:
idToResult[rsFuture.uuid] = nil
case 2:
idToResult[rsFuture.uuid] = respData[1]
default:
return nil, fmt.Errorf("protocol vioaltion: invalid respData when respData[0] == true, expected 1 or 2, got %d", l)
}
idToResult[rsFuture.uuid] = storageMapResponse.value
}

r.metrics().RequestDuration(time.Since(timeStart), true, true)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.20

require (
github.com/google/uuid v1.6.0
github.com/mitchellh/mapstructure v1.5.0
github.com/snksoft/crc v1.1.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
Expand All @@ -21,6 +20,7 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions tests/tnt/routermap_call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ func TestRouterMapCall(t *testing.T) {
require.Equalf(t, arg, v, "RouterMapCallRWImpl value ok for %v", k)
}

echoArgs = []interface{}{1}
respInt, err := vshardrouter.RouterMapCallRW[int](router, ctx, "echo", echoArgs, vshardrouter.RouterMapCallRWOptions{})
require.NoError(t, err, "RouterMapCallRW[int] echo finished with no err")
for k, v := range respInt {
require.Equalf(t, 1, v, "RouterMapCallRW[int] value ok for %v", k)
}

// RouterMapCallRWImpl returns only one value
echoArgs = []interface{}{arg, "arg2"}
resp, err = router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts)
Expand Down

0 comments on commit be1cfc4

Please sign in to comment.