From 0035bcc7d4b1bd1242e2dcba7b5acdb9d18aae58 Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Sat, 7 Sep 2024 00:16:29 +0300 Subject: [PATCH] fix several issues (partially close #42) * RouterCallImpl: fix decoding responce from storage_ref * RouterCallImpl: fix decoding responce from storage_map * BucketDiscovery: check res for nil * BucketStat: decode bsInfo by ptr * Add tnt tests for disvoery logic * Add tnt tests for RouterCallImpl * Add tnt tests for RouterMapCallRWImpl --- CHANGELOG.md | 9 +++ api.go | 61 ++++++++++++-------- discovery.go | 9 ++- replicaset.go | 2 +- tests/tnt/Makefile | 4 +- tests/tnt/concurrent_topology_test.go | 3 +- tests/tnt/discovery_test.go | 65 +++++++++++++++++++++ tests/tnt/router_call_test.go | 82 ++++++++++++++++++++++++--- tests/tnt/routermap_call_test.go | 32 ++++++++++- tests/tnt/tnt_test.go | 6 ++ 10 files changed, 229 insertions(+), 44 deletions(-) create mode 100644 tests/tnt/discovery_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 723ce9b..e8164d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## Unreleased +BUG FIXES: + +* RouterCallImpl: fix decoding responce from storage_ref (partially #42) +* RouterCallImpl: fix decoding responce from storage_map (partially #42) +* BucketDiscovery: check res for nil +* BucketStat: decode bsInfo by ptr + FEATURES: * Support new Sprintf-like logging interface (#48) @@ -10,6 +17,8 @@ REFACTOR: * resolve issue #46: drastically simplify RouterMapCallRWImpl and added tests with real tnt * Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs * New test for RouterCallImpl (and fix the old one) +* New tnt tests for discovery logic +* New tnt tests for RouterMapCallRWImpl EXAMPLES: * customer go mod fixed diff --git a/api.go b/api.go index 05a8128..f41ccf7 100644 --- a/api.go +++ b/api.go @@ -76,7 +76,7 @@ func (r *Router) RouterCallImpl(ctx context.Context, fnc string, args interface{}) (interface{}, StorageResultTypedFunc, error) { - if bucketID > r.cfg.TotalBucketCount { + if bucketID < 1 || r.cfg.TotalBucketCount < bucketID { return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) } @@ -254,7 +254,7 @@ func (r *Router) RouterMapCallRWImpl( for _, rs := range idToReplicasetRef { future := rs.conn.Do(storageUnrefReq, pool.RW) - future.SetError(nil) + future.SetError(nil) // TODO: does it cancel the request above or not? } }() @@ -284,21 +284,26 @@ func (r *Router) RouterMapCallRWImpl( // ref stage: get their responses var totalBucketCount uint64 + // proto for 'storage_ref' method: + // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3137 for _, rsFuture := range rsFutures { respData, err := rsFuture.future.Get() if err != nil { return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.uuid, err) } - if respData[0] == nil { - vshardErr := &StorageCallAssertError{} + if len(respData) < 1 { + return nil, fmt.Errorf("protocol violation: storage_ref: expected len(respData) 1 or 2, got: %d", len(respData)) + } - err = mapstructure.Decode(respData[1], vshardErr) - if err != nil { - return nil, err + if respData[0] == nil { + if len(respData) != 2 { + return nil, fmt.Errorf("protocol vioaltion: storage_ref: expected len(respData) = 2 when respData[0] == nil, got %d", len((respData))) } - return nil, vshardErr + // The possible variations of error in respData[1] are fully unknown yet for us, this question requires research. + // So we do not convert respData[1] to some known error format, because we don't use it anyway. + return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.uuid, respData[1]) } var bucketCount uint64 @@ -333,45 +338,51 @@ func (r *Router) RouterMapCallRWImpl( // map stage: get their responses idToResult := make(map[uuid.UUID]interface{}) + // proto for 'storage_map' method: + // https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/storage/init.lua#L3158 for _, rsFuture := range rsFutures { respData, err := rsFuture.future.Get() if err != nil { return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.uuid, err) } - if len(respData) != 2 { - return nil, fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData)) + if len(respData) < 1 { + return nil, fmt.Errorf("protocol violation: invalid respData length: must be >= 1, current: %d", len(respData)) } if respData[0] == nil { - vshardErr := &StorageCallAssertError{} + if len(respData) != 2 { + return nil, fmt.Errorf("protocol violation: invalid respData length when respData[0] == nil, must be = 2, current: %d", len(respData)) + } - err = mapstructure.Decode(respData[1], vshardErr) + assertError := &StorageCallAssertError{} + err = mapstructure.Decode(respData[1], assertError) if err != nil { - return nil, err + // TODO: not only StorageCallAssertError is possible here? + return nil, fmt.Errorf("storage_map failed on %v: %+v (decoding to assertError failed %v)", rsFuture.uuid, respData[1], err) } - return nil, vshardErr + return nil, fmt.Errorf("storage_map failed on %v: %w", rsFuture.uuid, assertError) } var isVShardRespOk bool err = rsFuture.future.GetTyped(&[]interface{}{&isVShardRespOk}) if err != nil { - return nil, err + return nil, fmt.Errorf("can't decode isVShardRespOk for storage_map response: %v", err) } - if !isVShardRespOk { // error - errorResp := &StorageCallAssertError{} - - err = rsFuture.future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp}) - if err != nil { - return nil, fmt.Errorf("cant get typed vshard err with err: %v", err) - } - - return nil, errorResp + if !isVShardRespOk { + return nil, fmt.Errorf("protocol violation: isVShardRespOk = false from storage_map: replicaset %v", rsFuture.uuid) } - idToResult[rsFuture.uuid] = respData[1] + switch l := len(respData); l { + case 1: + idToResult[rsFuture.uuid] = nil + case 2: + idToResult[rsFuture.uuid] = respData[1] + default: + return nil, fmt.Errorf("protocol vioaltion: invalid respData when respData[0] == true, expected 1 or 2, got %d", l) + } } r.metrics().RequestDuration(time.Since(timeStart), true, true) diff --git a/discovery.go b/discovery.go index 6aedbb7..60d4b64 100644 --- a/discovery.go +++ b/discovery.go @@ -70,11 +70,10 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica wg.Wait() res := resultAtomic.Load() - resultRs, err := res.rs, res.err - - if err != nil || resultRs == nil { + if res == nil || res.err != nil || res.rs == nil { return nil, Errors[9] // NO_ROUTE_TO_BUCKET } + /* -- All replicasets were scanned, but a bucket was not -- found anywhere, so most likely it does not exist. It @@ -84,12 +83,12 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica -- discovery). */ - return resultRs, nil + return res.rs, nil } // BucketResolve resolve bucket id to replicaset func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error) { - if bucketID > r.cfg.TotalBucketCount { + if bucketID < 1 || r.cfg.TotalBucketCount < bucketID { return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) } diff --git a/replicaset.go b/replicaset.go index e7f4a00..f073693 100644 --- a/replicaset.go +++ b/replicaset.go @@ -75,7 +75,7 @@ func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketSt // A problem with key-code 1 // todo: fix after https://github.com/tarantool/go-tarantool/issues/368 - err = mapstructure.Decode(respData[0], bsInfo) + err = mapstructure.Decode(respData[0], &bsInfo) if err != nil { return bsInfo, fmt.Errorf("can't decode bsInfo: %w", err) } diff --git a/tests/tnt/Makefile b/tests/tnt/Makefile index b6b32d1..d758771 100644 --- a/tests/tnt/Makefile +++ b/tests/tnt/Makefile @@ -55,4 +55,6 @@ cluster-down: gotest: @echo "${GREEN}STAGE: RUN GOTESTS${NC}" -go test -race -parallel=20 -coverpkg="../../" -coverprofile cover.out -timeout=90s -# go tool cover -html=cover.out + +open-coverage: + go tool cover -html=cover.out diff --git a/tests/tnt/concurrent_topology_test.go b/tests/tnt/concurrent_topology_test.go index 972f03f..597f962 100644 --- a/tests/tnt/concurrent_topology_test.go +++ b/tests/tnt/concurrent_topology_test.go @@ -149,8 +149,7 @@ func TestConncurrentTopologyChange(t *testing.T) { default: } - //nolint:gosec - bucketID := uint64((rand.Int() % totalBucketCount) + 1) + bucketID := randBucketID(totalBucketCount) args := []interface{}{"arg1"} callOpts := vshardrouter.CallOpts{ diff --git a/tests/tnt/discovery_test.go b/tests/tnt/discovery_test.go new file mode 100644 index 0000000..d852eb9 --- /dev/null +++ b/tests/tnt/discovery_test.go @@ -0,0 +1,65 @@ +package tnt_test + +import ( + "context" + "log" + "testing" + "time" + + vshardrouter "github.com/KaymeKaydex/go-vshard-router" + "github.com/KaymeKaydex/go-vshard-router/providers/static" + "github.com/stretchr/testify/require" +) + +func TestBucketDiscovery(t *testing.T) { + if !isCorrectRun() { + log.Printf("Incorrect run of tnt-test framework") + return + } + + t.Parallel() + + ctx := context.Background() + + cfg := getCfg() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(cfg), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: defaultTntUser, + Password: defaultTntPassword, + }) + + if err != nil { + panic(err) + } + + // pick some random bucket + bucketID := randBucketID(totalBucketCount) + + // clean everything + router.RouteMapClean() + + // resolve it + rs, err := router.BucketResolve(ctx, bucketID) + require.Nil(t, err, "BucketResolve ok") + + // reset it again + router.BucketReset(bucketID) + + // call RouterRouteAll, because: + // 1. increase coverage + // 2. we cannot get replicaset uuid by rs instance (lack of interface) + rsMap := router.RouterRouteAll() + for k, v := range rsMap { + if v == rs { + // set it again + res, err := router.BucketSet(bucketID, k) + require.Nil(t, err, nil, "BucketSet ok") + require.Equal(t, res, rs, "BucketSet res ok") + break + } + } +} diff --git a/tests/tnt/router_call_test.go b/tests/tnt/router_call_test.go index 5eccd2d..d11f835 100644 --- a/tests/tnt/router_call_test.go +++ b/tests/tnt/router_call_test.go @@ -3,7 +3,6 @@ package tnt_test import ( "context" "log" - "math/rand" "testing" "time" @@ -13,7 +12,7 @@ import ( "github.com/tarantool/go-tarantool/v2/pool" ) -func TestRouterCall(t *testing.T) { +func TestRouterCallProto(t *testing.T) { if !isCorrectRun() { log.Printf("Incorrect run of tnt-test framework") return @@ -36,15 +35,82 @@ func TestRouterCall(t *testing.T) { require.Nil(t, err, "NewRouter finished successfully") - //nolint:gosec - bucketID := uint64((rand.Int() % totalBucketCount) + 1) - args := []interface{}{"arg1"} - - resp, _, err := router.RouterCallImpl(ctx, bucketID, vshardrouter.CallOpts{ + bucketID := randBucketID(totalBucketCount) + arg1 := "arg1" + args := []interface{}{arg1} + callOpts := vshardrouter.CallOpts{ VshardMode: vshardrouter.ReadMode, PoolMode: pool.PreferRO, - }, "echo", args) + } + resp, getTyped, err := router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args) require.Nil(t, err, "RouterCallImpl echo finished with no err") require.EqualValues(t, args, resp, "RouterCallImpl echo resp correct") + var arg1Got string + err = getTyped(&arg1Got) + require.Nil(t, err, "RouterCallImpl getTyped call ok") + require.Equal(t, arg1, arg1Got, "RouterCallImpl getTyped res ok") + + _, _, err = router.RouterCallImpl(ctx, totalBucketCount+1, callOpts, "echo", args) + require.Error(t, err, "RouterCallImpl echo finished with err when bucketID is out of range") + + _, _, err = router.RouterCallImpl(ctx, 0, callOpts, "echo", args) + require.Error(t, err, "RouterCallImpl echo finished with err when bucketID is 0") + + _, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", nil) + require.NotNil(t, err, "RouterCallImpl echo finised with nil args") + + _, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "raise_luajit_error", args) + require.IsType(t, &vshardrouter.StorageCallAssertError{}, err, "RouterCallImpl raise_luajit_error finished with StorageCallAssertError") + + _, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "raise_client_error", args) + require.IsType(t, &vshardrouter.StorageCallAssertError{}, err, "RouterCallImpl raise_client_error finished with StorageCallAssertError") + + // maxRespLen is due to: + // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130 + const maxRespLen = 3 + for argLen := 0; argLen <= maxRespLen+1; argLen++ { + args := []interface{}{} + for i := 0; i < argLen; i++ { + args = append(args, "arg") + } + + resp, getTyped, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args) + require.Nilf(t, err, "RouterCallImpl no err for arglen %d", argLen) + + expect := args + if argLen > maxRespLen { + expect = expect[:maxRespLen] + } + + require.Equal(t, expect, resp, "RouterCallImpl resp ok for arglen %d", argLen) + var typed interface{} + err = getTyped(&typed) + require.Nil(t, err, "RouterCallImpl getTyped no err for arglen %d", argLen) + + if argLen > 0 { + // TODO: Should we handle multiple return values in getTyped? + require.Equal(t, expect[0], typed, "RouterCallImpl getTyped resp ok for arglen %d", argLen) + } + } + + // simulate vshard error + + // 1. Replace replicaset for bucketID + rs, err := router.BucketResolve(ctx, bucketID) + require.Nil(t, err, "BucketResolve finished with no err") + rsMap := router.RouterRouteAll() + + for k, v := range rsMap { + if rs != v { + res, err := router.BucketSet(bucketID, k) + require.Nil(t, err, "BucketSet finished with no err") + require.Equal(t, res, v) + break + } + } + + // 2. Try to call something + _, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args) + require.Nil(t, err, "RouterCallImpl echo finished with no err even on dirty bucket map") } diff --git a/tests/tnt/routermap_call_test.go b/tests/tnt/routermap_call_test.go index d0bdf3e..60fed01 100644 --- a/tests/tnt/routermap_call_test.go +++ b/tests/tnt/routermap_call_test.go @@ -34,17 +34,37 @@ func TestRouterMapCall(t *testing.T) { require.Nil(t, err, "NewRouter finished successfully") + callOpts := vshardrouter.CallOpts{} + const arg = "arg1" // Enusre that RouterMapCallRWImpl works at all echoArgs := []interface{}{arg} - resp, err := router.RouterMapCallRWImpl(ctx, "echo", echoArgs, vshardrouter.CallOpts{}) + resp, err := router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) + require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") + + for k, v := range resp { + require.Equalf(t, arg, v, "RouterMapCallRWImpl value ok for %v", k) + } + + // RouterMapCallRWImpl returns only one value + echoArgs = []interface{}{arg, "arg2"} + resp, err = router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") for k, v := range resp { require.Equalf(t, arg, v, "RouterMapCallRWImpl value ok for %v", k) } + // RouterMapCallRWImpl returns nil when no return value + noArgs := []interface{}{} + resp, err = router.RouterMapCallRWImpl(ctx, "echo", noArgs, callOpts) + require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") + + for k, v := range resp { + require.Equalf(t, nil, v, "RouterMapCallRWImpl value ok for %v", k) + } + // Ensure that RouterMapCallRWImpl sends requests concurrently const sleepToSec int = 1 sleepArgs := []interface{}{sleepToSec} @@ -59,6 +79,14 @@ func TestRouterMapCall(t *testing.T) { require.Greater(t, len(cfg), 1, "There are more than one replicasets") require.Less(t, duration, 1200*time.Millisecond, "Requests were send concurrently") + // RouterMapCallRWImpl returns err on raise_luajit_error + _, err = router.RouterMapCallRWImpl(ctx, "raise_luajit_error", noArgs, callOpts) + require.NotNil(t, err, "RouterMapCallRWImpl raise_luajit_error finished with error") + + // RouterMapCallRWImpl invalid usage + _, err = router.RouterMapCallRWImpl(ctx, "echo", nil, callOpts) + require.NotNil(t, err, "RouterMapCallRWImpl with nil args finished with error") + // Ensure that RouterMapCallRWImpl doesn't work when it mean't to for k := range cfg { errs := router.RemoveReplicaset(ctx, k.UUID) @@ -66,6 +94,6 @@ func TestRouterMapCall(t *testing.T) { break } - _, err = router.RouterMapCallRWImpl(ctx, "echo", echoArgs, vshardrouter.CallOpts{}) + _, err = router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) require.NotNilf(t, err, "RouterMapCallRWImpl failed on not full cluster") } diff --git a/tests/tnt/tnt_test.go b/tests/tnt/tnt_test.go index 06c8c73..5747941 100644 --- a/tests/tnt/tnt_test.go +++ b/tests/tnt/tnt_test.go @@ -2,6 +2,7 @@ package tnt_test import ( "fmt" + "math/rand" "os" "strconv" "testing" @@ -51,6 +52,11 @@ func getCfg() map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo { return c.clusterCfg() } +func randBucketID(totalBucketCount uint64) uint64 { + //nolint:gosec + return (rand.Uint64() % totalBucketCount) + 1 +} + func TestConcurrentRouterCall(t *testing.T) { /* TODO 1) Invalidate some random bucket id