Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
fix several issues (partially close #42)
Browse files Browse the repository at this point in the history
* RouterCallImpl: fix decoding responce from storage_ref
* RouterCallImpl: fix decoding responce from storage_map
* BucketDiscovery: check res for nil
* BucketStat: decode bsInfo by ptr
* Add tnt tests for disvoery logic
* Add tnt tests for RouterCallImpl
* Add tnt tests for RouterMapCallRWImpl
  • Loading branch information
nurzhan-saktaganov committed Sep 7, 2024
1 parent 4e0cbe0 commit a287b91
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 44 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
## Unreleased

BUG FIXES:

* RouterCallImpl: fix decoding responce from storage_ref (partially #42)
* RouterCallImpl: fix decoding responce from storage_map (partially #42)
* BucketDiscovery: check res for nil
* BucketStat: decode bsInfo by ptr

FEATURES:

* Support new Sprintf-like logging interface (#48)
Expand All @@ -10,6 +17,8 @@ REFACTOR:
* resolve issue #46: drastically simplify RouterMapCallRWImpl and added tests with real tnt
* Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs
* New test for RouterCallImpl (and fix the old one)
* New tnt tests for discovery logic
* New tnt tests for RouterMapCallRWImpl

## 0.0.12

Expand Down
61 changes: 36 additions & 25 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

if bucketID > r.cfg.TotalBucketCount {
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

Expand Down Expand Up @@ -254,7 +254,7 @@ func (r *Router) RouterMapCallRWImpl(

for _, rs := range idToReplicasetRef {
future := rs.conn.Do(storageUnrefReq, pool.RW)
future.SetError(nil)
future.SetError(nil) // TODO: does it cancel the request above or not?
}
}()

Expand Down Expand Up @@ -284,21 +284,26 @@ func (r *Router) RouterMapCallRWImpl(

// ref stage: get their responses
var totalBucketCount uint64
// proto for 'storage_ref' method:
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3137
for _, rsFuture := range rsFutures {
respData, err := rsFuture.future.Get()
if err != nil {
return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.uuid, err)
}

if respData[0] == nil {
vshardErr := &StorageCallAssertError{}
if len(respData) < 1 {
return nil, fmt.Errorf("protocol violation: storage_ref: expected len(respData) 1 or 2, got: %d", len(respData))
}

err = mapstructure.Decode(respData[1], vshardErr)
if err != nil {
return nil, err
if respData[0] == nil {
if len(respData) != 2 {
return nil, fmt.Errorf("protocol vioaltion: storage_ref: expected len(respData) = 2 when respData[0] == nil, got %d", len((respData)))
}

return nil, vshardErr
// The possible variations of error in respData[1] are fully unknown yet for us, this question requires research.
// So we do not convert respData[1] to some known error format, because we don't use it anyway.
return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.uuid, respData[1])
}

var bucketCount uint64
Expand Down Expand Up @@ -333,45 +338,51 @@ func (r *Router) RouterMapCallRWImpl(

// map stage: get their responses
idToResult := make(map[uuid.UUID]interface{})
// proto for 'storage_map' method:
// https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/storage/init.lua#L3158
for _, rsFuture := range rsFutures {
respData, err := rsFuture.future.Get()
if err != nil {
return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.uuid, err)
}

if len(respData) != 2 {
return nil, fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData))
if len(respData) < 1 {
return nil, fmt.Errorf("protocol violation: invalid respData length: must be >= 1, current: %d", len(respData))
}

if respData[0] == nil {
vshardErr := &StorageCallAssertError{}
if len(respData) != 2 {
return nil, fmt.Errorf("protocol violation: invalid respData length when respData[0] == nil, must be = 2, current: %d", len(respData))
}

err = mapstructure.Decode(respData[1], vshardErr)
assertError := &StorageCallAssertError{}
err = mapstructure.Decode(respData[1], assertError)
if err != nil {
return nil, err
// TODO: not only StorageCallAssertError is possible here?
return nil, fmt.Errorf("storage_map failed on %v: %+v (decoding to assertError failed %v)", rsFuture.uuid, respData[1], err)
}

return nil, vshardErr
return nil, fmt.Errorf("storage_map failed on %v: %w", rsFuture.uuid, assertError)
}

var isVShardRespOk bool
err = rsFuture.future.GetTyped(&[]interface{}{&isVShardRespOk})
if err != nil {
return nil, err
return nil, fmt.Errorf("can't decode isVShardRespOk for storage_map response: %v", err)
}

if !isVShardRespOk { // error
errorResp := &StorageCallAssertError{}

err = rsFuture.future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
if err != nil {
return nil, fmt.Errorf("cant get typed vshard err with err: %v", err)
}

return nil, errorResp
if !isVShardRespOk {
return nil, fmt.Errorf("protocol violation: isVShardRespOk = false from storage_map: replicaset %v", rsFuture.uuid)
}

idToResult[rsFuture.uuid] = respData[1]
switch l := len(respData); l {
case 1:
idToResult[rsFuture.uuid] = nil
case 2:
idToResult[rsFuture.uuid] = respData[1]
default:
return nil, fmt.Errorf("protocol vioaltion: invalid respData when respData[0] == true, expected 1 or 2, got %d", l)
}
}

r.metrics().RequestDuration(time.Since(timeStart), true, true)
Expand Down
9 changes: 4 additions & 5 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
wg.Wait()

res := resultAtomic.Load()
resultRs, err := res.rs, res.err

if err != nil || resultRs == nil {
if res == nil || res.err != nil || res.rs == nil {
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
}

/*
-- All replicasets were scanned, but a bucket was not
-- found anywhere, so most likely it does not exist. It
Expand All @@ -84,12 +83,12 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
-- discovery).
*/

return resultRs, nil
return res.rs, nil
}

// BucketResolve resolve bucket id to replicaset
func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error) {
if bucketID > r.cfg.TotalBucketCount {
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

Expand Down
2 changes: 1 addition & 1 deletion replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketSt

// A problem with key-code 1
// todo: fix after https://github.com/tarantool/go-tarantool/issues/368
err = mapstructure.Decode(respData[0], bsInfo)
err = mapstructure.Decode(respData[0], &bsInfo)
if err != nil {
return bsInfo, fmt.Errorf("can't decode bsInfo: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion tests/tnt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,6 @@ cluster-down:
gotest:
@echo "${GREEN}STAGE: RUN GOTESTS${NC}"
-NREPLICASETS=${NREPLICASETS} START_PORT=${START_PORT} go test -race -parallel=20 -coverpkg="../../" -coverprofile cover.out -timeout=90s
# go tool cover -html=cover.out

open-coverage:
go tool cover -html=cover.out
3 changes: 1 addition & 2 deletions tests/tnt/concurrent_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ func TestConncurrentTopologyChange(t *testing.T) {
default:
}

//nolint:gosec
bucketID := uint64((rand.Int() % totalBucketCount) + 1)
bucketID := randBucketID(totalBucketCount)
args := []interface{}{"arg1"}

callOpts := vshardrouter.CallOpts{
Expand Down
65 changes: 65 additions & 0 deletions tests/tnt/discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package tnt_test

import (
"context"
"log"
"testing"
"time"

vshardrouter "github.com/KaymeKaydex/go-vshard-router"
"github.com/KaymeKaydex/go-vshard-router/providers/static"
"github.com/stretchr/testify/require"
)

func TestBucketDiscovery(t *testing.T) {
if !isCorrectRun() {
log.Printf("Incorrect run of tnt-test framework")
return
}

t.Parallel()

ctx := context.Background()

cfg := getCfg()

router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
TopologyProvider: static.NewProvider(cfg),
DiscoveryTimeout: 5 * time.Second,
DiscoveryMode: vshardrouter.DiscoveryModeOn,
TotalBucketCount: totalBucketCount,
User: defaultTntUser,
Password: defaultTntPassword,
})

if err != nil {
panic(err)
}

// pick some random bucket
bucketID := randBucketID(totalBucketCount)

// clean everything
router.RouteMapClean()

// resolve it
rs, err := router.BucketResolve(ctx, bucketID)
require.Nil(t, err, "BucketResolve ok")

// reset it again
router.BucketReset(bucketID)

// call RouterRouteAll, because:
// 1. increase coverage
// 2. we cannot get replicaset uuid by rs instance (lack of interface)
rsMap := router.RouterRouteAll()
for k, v := range rsMap {
if v == rs {
// set it again
res, err := router.BucketSet(bucketID, k)
require.Nil(t, err, nil, "BucketSet ok")
require.Equal(t, res, rs, "BucketSet res ok")
break
}
}
}
82 changes: 74 additions & 8 deletions tests/tnt/router_call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tnt_test
import (
"context"
"log"
"math/rand"
"testing"
"time"

Expand All @@ -13,7 +12,7 @@ import (
"github.com/tarantool/go-tarantool/v2/pool"
)

func TestRouterCall(t *testing.T) {
func TestRouterCallProto(t *testing.T) {
if !isCorrectRun() {
log.Printf("Incorrect run of tnt-test framework")
return
Expand All @@ -38,15 +37,82 @@ func TestRouterCall(t *testing.T) {
panic(err)
}

//nolint:gosec
bucketID := uint64((rand.Int() % totalBucketCount) + 1)
args := []interface{}{"arg1"}

resp, _, err := router.RouterCallImpl(ctx, bucketID, vshardrouter.CallOpts{
bucketID := randBucketID(totalBucketCount)
arg1 := "arg1"
args := []interface{}{arg1}
callOpts := vshardrouter.CallOpts{
VshardMode: vshardrouter.ReadMode,
PoolMode: pool.PreferRO,
}, "echo", args)
}

resp, getTyped, err := router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args)
require.Nil(t, err, "RouterCallImpl echo finished with no err")
require.EqualValues(t, args, resp, "RouterCallImpl echo resp correct")
var arg1Got string
err = getTyped(&arg1Got)
require.Nil(t, err, "RouterCallImpl getTyped call ok")
require.Equal(t, arg1, arg1Got, "RouterCallImpl getTyped res ok")

_, _, err = router.RouterCallImpl(ctx, totalBucketCount+1, callOpts, "echo", args)
require.Error(t, err, "RouterCallImpl echo finished with err when bucketID is out of range")

_, _, err = router.RouterCallImpl(ctx, 0, callOpts, "echo", args)
require.Error(t, err, "RouterCallImpl echo finished with err when bucketID is 0")

_, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", nil)
require.NotNil(t, err, "RouterCallImpl echo finised with nil args")

_, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "raise_luajit_error", args)
require.IsType(t, &vshardrouter.StorageCallAssertError{}, err, "RouterCallImpl raise_luajit_error finished with StorageCallAssertError")

_, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "raise_client_error", args)
require.IsType(t, &vshardrouter.StorageCallAssertError{}, err, "RouterCallImpl raise_client_error finished with StorageCallAssertError")

// maxRespLen is due to:
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130
const maxRespLen = 3
for argLen := 0; argLen <= maxRespLen+1; argLen++ {
args := []interface{}{}
for i := 0; i < argLen; i++ {
args = append(args, "arg")
}

resp, getTyped, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args)
require.Nilf(t, err, "RouterCallImpl no err for arglen %d", argLen)

expect := args
if argLen > maxRespLen {
expect = expect[:maxRespLen]
}

require.Equal(t, expect, resp, "RouterCallImpl resp ok for arglen %d", argLen)
var typed interface{}
err = getTyped(&typed)
require.Nil(t, err, "RouterCallImpl getTyped no err for arglen %d", argLen)

if argLen > 0 {
// TODO: Should we handle multiple return values in getTyped?
require.Equal(t, expect[0], typed, "RouterCallImpl getTyped resp ok for arglen %d", argLen)
}
}

// simulate vshard error

// 1. Replace replicaset for bucketID
rs, err := router.BucketResolve(ctx, bucketID)
require.Nil(t, err, "BucketResolve finished with no err")
rsMap := router.RouterRouteAll()

for k, v := range rsMap {
if rs != v {
res, err := router.BucketSet(bucketID, k)
require.Nil(t, err, "BucketSet finished with no err")
require.Equal(t, res, v)
break
}
}

// 2. Try to call something
_, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args)
require.Nil(t, err, "RouterCallImpl echo finished with no err even on dirty bucket map")
}
Loading

0 comments on commit a287b91

Please sign in to comment.