Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
WIP: add RouterCallImplOld and bench for it
Browse files Browse the repository at this point in the history
  • Loading branch information
nurzhan-saktaganov committed Dec 13, 2024
1 parent 388f2d0 commit 9bc564e
Show file tree
Hide file tree
Showing 2 changed files with 385 additions and 0 deletions.
274 changes: 274 additions & 0 deletions router_call_old_tmp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package vshard_router

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool/v2"
"github.com/vmihailenco/msgpack/v5"
"github.com/vmihailenco/msgpack/v5/msgpcode"
)

type vshardStorageCallResponseProtoOld struct {
AssertError *assertError // not nil if there is assert error
VshardError *StorageCallVShardError // not nil if there is vshard response
Data []interface{} // raw response data
}

func (r *vshardStorageCallResponseProtoOld) DecodeMsgpack(d *msgpack.Decoder) error {
/* vshard.storage.call(func) response has the next 4 possbile formats:
See: https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130
1. vshard error has occurred:
array[nil, vshard_error]
2. User method has finished with some error:
array[false, assert error]
3. User mehod has finished successfully
a) but has not returned anything
array[true]
b) has returned 1 element
array[true, elem1]
c) has returned 2 element
array[true, elem1, elem2]
d) has returned 3 element
array[true, elem1, elem2, elem3]
*/

// Ensure it is an array and get array len for protocol violation check
respArrayLen, err := d.DecodeArrayLen()
if err != nil {
return err
}

if respArrayLen == 0 {
return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
}

// we need peek code to make our check faster than decode interface
// later we will check if code nil or bool
code, err := d.PeekCode()
if err != nil {
return err
}

// this is storage error
if code == msgpcode.Nil {
err = d.DecodeNil()
if err != nil {
return err
}

if respArrayLen != 2 {
return fmt.Errorf("protocol violation: length is %d on vshard error case", respArrayLen)
}

var vshardError StorageCallVShardError

err = d.Decode(&vshardError)
if err != nil {
return fmt.Errorf("failed to decode storage vshard error: %w", err)
}

r.VshardError = &vshardError

return nil
}

isVShardRespOk, err := d.DecodeBool()
if err != nil {
return err
}

if !isVShardRespOk {
// that means we have an assert errors and response is not ok
if respArrayLen != 2 {
return fmt.Errorf("protocol violation: length is %d on assert error case", respArrayLen)
}

var assertError assertError
err = d.Decode(&assertError)
if err != nil {
return fmt.Errorf("failed to decode storage assert error: %w", err)
}

r.AssertError = &assertError

return nil
}

// isVShardRespOk is true
r.Data = make([]interface{}, 0, respArrayLen-1)

for i := 1; i < respArrayLen; i++ {
elem, err := d.DecodeInterface()
if err != nil {
return fmt.Errorf("failed to decode into interface element #%d of response array", i+1)
}
r.Data = append(r.Data, elem)
}

return nil
}

// revive warns us: time-naming: var CallTimeoutMin is of type time.Duration; don't use unit-specific suffix "Min".
// But the original lua vshard implementation uses this naming, so we use it too.
//
//nolint:revive
const CallTimeoutMin = time.Second / 2

// RouterCallImpl Perform shard operation function will restart operation
// after wrong bucket response until timeout is reached
func (r *Router) RouterCallImplOld(ctx context.Context,
bucketID uint64,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

const vshardStorageClientCall = "vshard.storage.call"

if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
}

timeout := opts.Timeout
timeStart := time.Now()

req := tarantool.NewCallRequest(vshardStorageClientCall)
req = req.Context(ctx)
req = req.Args([]interface{}{
bucketID,
opts.VshardMode.String(),
fnc,
args,
})

var err error

for {
if since := time.Since(timeStart); since > timeout {
r.metrics().RequestDuration(since, false, false)

r.log().Debugf(ctx, "Return result on timeout; since %s of timeout %s", since, timeout)
if err == nil {
err = fmt.Errorf("cant get call cause call impl timeout")
}

return nil, nil, err
}

var rs *Replicaset

rs, err = r.BucketResolve(ctx, bucketID)
if err != nil {
r.metrics().RetryOnCall("bucket_resolve_error")

// this error will be returned to a caller in case of timeout
err = fmt.Errorf("cant resolve bucket %d: %w", bucketID, err)

// TODO: lua vshard router just yields here and retires, no pause is applied.
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L713
// So we also retry here. But I guess we should add some pause here.
continue
}

r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)

future := rs.conn.Do(req, opts.PoolMode)

var storageCallResponse vshardStorageCallResponseProtoOld
err = future.GetTyped(&storageCallResponse)
if err != nil {
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
}

r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse)

if storageCallResponse.AssertError != nil {
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
}

if storageCallResponse.VshardError != nil {
vshardError := storageCallResponse.VshardError

switch vshardError.Name {
case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked:
// We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
r.BucketReset(bucketID)

if vshardError.Destination != "" {
destinationUUID, err := uuid.Parse(vshardError.Destination)
if err != nil {
return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
vshardStorageClientCall, vshardError, err)
}

var loggedOnce bool
for {
idToReplicasetRef := r.getIDToReplicaset()
if _, ok := idToReplicasetRef[destinationUUID]; ok {
_, err := r.BucketSet(bucketID, destinationUUID)
if err == nil {
break // breaks loop
}
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationUUID, err)
}

if !loggedOnce {
r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+
"update configuration", destinationUUID)
loggedOnce = true
}

const defaultPoolingPause = 50 * time.Millisecond
time.Sleep(defaultPoolingPause)

if time.Since(timeStart) > timeout {
return nil, nil, vshardError
}
}
}

// retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked

r.metrics().RetryOnCall("bucket_migrate")

r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, vshardError)

// this vshardError will be returned to a caller in case of timeout
err = vshardError
continue
case VShardErrNameTransferIsInProgress:
// Since lua vshard router doesn't retry here, we don't retry too.
// There is a comment why lua vshard router doesn't retry:
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
r.BucketReset(bucketID)
return nil, nil, vshardError
case VShardErrNameNonMaster:
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
// we just return this error as is.
return nil, nil, vshardError
default:
return nil, nil, vshardError
}
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)

return storageCallResponse.Data, func(result interface{}) error {
if len(storageCallResponse.Data) == 0 {
return nil
}

var stub bool

return future.GetTyped(&[]interface{}{&stub, result})
}, nil
}
}
111 changes: 111 additions & 0 deletions tests/tnt/call_bench_test_tmp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package tnt

import (
"context"
"testing"
"time"

vshardrouter "github.com/KaymeKaydex/go-vshard-router"
"github.com/KaymeKaydex/go-vshard-router/providers/static"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/tarantool/go-tarantool/v2/pool"
)

func BenchmarkCallSimpleInsert_GO_RouterCallOld(b *testing.B) {
b.StopTimer()
skipOnInvalidRun(b)

Check failure on line 17 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: skipOnInvalidRun

Check failure on line 17 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: skipOnInvalidRun

Check failure on line 17 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: skipOnInvalidRun

ctx := context.Background()

cfg := getCfg()

Check failure on line 21 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: getCfg

Check failure on line 21 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: getCfg

Check failure on line 21 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: getCfg

router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
TopologyProvider: static.NewProvider(cfg),
DiscoveryTimeout: 5 * time.Second,
DiscoveryMode: vshardrouter.DiscoveryModeOn,
TotalBucketCount: totalBucketCount,

Check failure on line 27 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: totalBucketCount

Check failure on line 27 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: totalBucketCount

Check failure on line 27 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: totalBucketCount
User: defaultTntUser,

Check failure on line 28 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: defaultTntUser

Check failure on line 28 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: defaultTntUser

Check failure on line 28 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: defaultTntUser
Password: defaultTntPassword,

Check failure on line 29 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: defaultTntPassword

Check failure on line 29 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: defaultTntPassword

Check failure on line 29 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: defaultTntPassword
RequestTimeout: time.Minute,
})
require.NoError(b, err)

b.StartTimer()
for i := 0; i < b.N; i++ {
id := uuid.New()

bucketID := router.RouterBucketIDStrCRC32(id.String())
_, _, err := router.RouterCallImplOld(
ctx,
bucketID,
vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW, Timeout: 10 * time.Second},
"product_add",
[]interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}})

Check failure on line 44 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: Product

Check failure on line 44 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: Product

Check failure on line 44 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: Product
require.NoError(b, err)
}

b.ReportAllocs()
}

func BenchmarkCallSimpleSelect_GO_RouterCallOld(b *testing.B) {
b.StopTimer()
skipOnInvalidRun(b)

Check failure on line 53 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: skipOnInvalidRun

Check failure on line 53 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: skipOnInvalidRun

Check failure on line 53 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: skipOnInvalidRun

ctx := context.Background()

cfg := getCfg()

Check failure on line 57 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: getCfg

Check failure on line 57 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: getCfg

Check failure on line 57 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: getCfg

router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
TopologyProvider: static.NewProvider(cfg),
DiscoveryTimeout: 5 * time.Second,
DiscoveryMode: vshardrouter.DiscoveryModeOn,
TotalBucketCount: totalBucketCount,

Check failure on line 63 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: totalBucketCount

Check failure on line 63 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: totalBucketCount

Check failure on line 63 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: totalBucketCount
User: defaultTntUser,

Check failure on line 64 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.8)

undefined: defaultTntUser

Check failure on line 64 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, 2.10)

undefined: defaultTntUser

Check failure on line 64 in tests/tnt/call_bench_test_tmp.go

View workflow job for this annotation

GitHub Actions / all-tests (stable, master)

undefined: defaultTntUser
Password: defaultTntPassword,
})
require.NoError(b, err)

ids := make([]uuid.UUID, b.N)

for i := 0; i < b.N; i++ {
id := uuid.New()
ids[i] = id

bucketID := router.RouterBucketIDStrCRC32(id.String())
_, _, err := router.RouterCallImplOld(
ctx,
bucketID,
vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW},
"product_add",
[]interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}})
require.NoError(b, err)
}

type Request struct {
ID string `msgpack:"id"`
}

b.StartTimer()
for i := 0; i < b.N; i++ {
id := ids[i]

bucketID := router.RouterBucketIDStrCRC32(id.String())
_, getTyped, err1 := router.RouterCallImpl(
ctx,
bucketID,
vshardrouter.CallOpts{VshardMode: vshardrouter.ReadMode, PoolMode: pool.ANY, Timeout: time.Second},
"product_get",
[]interface{}{&Request{ID: id.String()}})

var product Product
err2 := getTyped(&product)

b.StopTimer()
require.NoError(b, err1)
require.NoError(b, err2)
b.StartTimer()
}

b.ReportAllocs()
}

0 comments on commit 9bc564e

Please sign in to comment.