From 9bc564e92afa26764701debd997db4c3d3dc32b6 Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Fri, 13 Dec 2024 14:48:50 +0300 Subject: [PATCH] WIP: add RouterCallImplOld and bench for it --- router_call_old_tmp.go | 274 +++++++++++++++++++++++++++++++ tests/tnt/call_bench_test_tmp.go | 111 +++++++++++++ 2 files changed, 385 insertions(+) create mode 100644 router_call_old_tmp.go create mode 100644 tests/tnt/call_bench_test_tmp.go diff --git a/router_call_old_tmp.go b/router_call_old_tmp.go new file mode 100644 index 0000000..7d9f0ae --- /dev/null +++ b/router_call_old_tmp.go @@ -0,0 +1,274 @@ +package vshard_router + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/tarantool/go-tarantool/v2" + "github.com/vmihailenco/msgpack/v5" + "github.com/vmihailenco/msgpack/v5/msgpcode" +) + +type vshardStorageCallResponseProtoOld struct { + AssertError *assertError // not nil if there is assert error + VshardError *StorageCallVShardError // not nil if there is vshard response + Data []interface{} // raw response data +} + +func (r *vshardStorageCallResponseProtoOld) DecodeMsgpack(d *msgpack.Decoder) error { + /* vshard.storage.call(func) response has the next 4 possbile formats: + See: https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130 + 1. vshard error has occurred: + array[nil, vshard_error] + 2. User method has finished with some error: + array[false, assert error] + 3. User mehod has finished successfully + a) but has not returned anything + array[true] + b) has returned 1 element + array[true, elem1] + c) has returned 2 element + array[true, elem1, elem2] + d) has returned 3 element + array[true, elem1, elem2, elem3] + */ + + // Ensure it is an array and get array len for protocol violation check + respArrayLen, err := d.DecodeArrayLen() + if err != nil { + return err + } + + if respArrayLen == 0 { + return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen) + } + + // we need peek code to make our check faster than decode interface + // later we will check if code nil or bool + code, err := d.PeekCode() + if err != nil { + return err + } + + // this is storage error + if code == msgpcode.Nil { + err = d.DecodeNil() + if err != nil { + return err + } + + if respArrayLen != 2 { + return fmt.Errorf("protocol violation: length is %d on vshard error case", respArrayLen) + } + + var vshardError StorageCallVShardError + + err = d.Decode(&vshardError) + if err != nil { + return fmt.Errorf("failed to decode storage vshard error: %w", err) + } + + r.VshardError = &vshardError + + return nil + } + + isVShardRespOk, err := d.DecodeBool() + if err != nil { + return err + } + + if !isVShardRespOk { + // that means we have an assert errors and response is not ok + if respArrayLen != 2 { + return fmt.Errorf("protocol violation: length is %d on assert error case", respArrayLen) + } + + var assertError assertError + err = d.Decode(&assertError) + if err != nil { + return fmt.Errorf("failed to decode storage assert error: %w", err) + } + + r.AssertError = &assertError + + return nil + } + + // isVShardRespOk is true + r.Data = make([]interface{}, 0, respArrayLen-1) + + for i := 1; i < respArrayLen; i++ { + elem, err := d.DecodeInterface() + if err != nil { + return fmt.Errorf("failed to decode into interface element #%d of response array", i+1) + } + r.Data = append(r.Data, elem) + } + + return nil +} + +// revive warns us: time-naming: var CallTimeoutMin is of type time.Duration; don't use unit-specific suffix "Min". +// But the original lua vshard implementation uses this naming, so we use it too. +// +//nolint:revive +const CallTimeoutMin = time.Second / 2 + +// RouterCallImpl Perform shard operation function will restart operation +// after wrong bucket response until timeout is reached +func (r *Router) RouterCallImplOld(ctx context.Context, + bucketID uint64, + opts CallOpts, + fnc string, + args interface{}) (interface{}, StorageResultTypedFunc, error) { + + const vshardStorageClientCall = "vshard.storage.call" + + if bucketID < 1 || r.cfg.TotalBucketCount < bucketID { + return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) + } + + if opts.Timeout == 0 { + opts.Timeout = CallTimeoutMin + } + + timeout := opts.Timeout + timeStart := time.Now() + + req := tarantool.NewCallRequest(vshardStorageClientCall) + req = req.Context(ctx) + req = req.Args([]interface{}{ + bucketID, + opts.VshardMode.String(), + fnc, + args, + }) + + var err error + + for { + if since := time.Since(timeStart); since > timeout { + r.metrics().RequestDuration(since, false, false) + + r.log().Debugf(ctx, "Return result on timeout; since %s of timeout %s", since, timeout) + if err == nil { + err = fmt.Errorf("cant get call cause call impl timeout") + } + + return nil, nil, err + } + + var rs *Replicaset + + rs, err = r.BucketResolve(ctx, bucketID) + if err != nil { + r.metrics().RetryOnCall("bucket_resolve_error") + + // this error will be returned to a caller in case of timeout + err = fmt.Errorf("cant resolve bucket %d: %w", bucketID, err) + + // TODO: lua vshard router just yields here and retires, no pause is applied. + // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L713 + // So we also retry here. But I guess we should add some pause here. + continue + } + + r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID) + + future := rs.conn.Do(req, opts.PoolMode) + + var storageCallResponse vshardStorageCallResponseProtoOld + err = future.GetTyped(&storageCallResponse) + if err != nil { + return nil, nil, fmt.Errorf("got error on future.Get(): %w", err) + } + + r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse) + + if storageCallResponse.AssertError != nil { + return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError) + } + + if storageCallResponse.VshardError != nil { + vshardError := storageCallResponse.VshardError + + switch vshardError.Name { + case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked: + // We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663 + r.BucketReset(bucketID) + + if vshardError.Destination != "" { + destinationUUID, err := uuid.Parse(vshardError.Destination) + if err != nil { + return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w", + vshardStorageClientCall, vshardError, err) + } + + var loggedOnce bool + for { + idToReplicasetRef := r.getIDToReplicaset() + if _, ok := idToReplicasetRef[destinationUUID]; ok { + _, err := r.BucketSet(bucketID, destinationUUID) + if err == nil { + break // breaks loop + } + r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationUUID, err) + } + + if !loggedOnce { + r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+ + "update configuration", destinationUUID) + loggedOnce = true + } + + const defaultPoolingPause = 50 * time.Millisecond + time.Sleep(defaultPoolingPause) + + if time.Since(timeStart) > timeout { + return nil, nil, vshardError + } + } + } + + // retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked + + r.metrics().RetryOnCall("bucket_migrate") + + r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, vshardError) + + // this vshardError will be returned to a caller in case of timeout + err = vshardError + continue + case VShardErrNameTransferIsInProgress: + // Since lua vshard router doesn't retry here, we don't retry too. + // There is a comment why lua vshard router doesn't retry: + // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697 + r.BucketReset(bucketID) + return nil, nil, vshardError + case VShardErrNameNonMaster: + // vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case: + // See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704. + // Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master, + // we just return this error as is. + return nil, nil, vshardError + default: + return nil, nil, vshardError + } + } + + r.metrics().RequestDuration(time.Since(timeStart), true, false) + + return storageCallResponse.Data, func(result interface{}) error { + if len(storageCallResponse.Data) == 0 { + return nil + } + + var stub bool + + return future.GetTyped(&[]interface{}{&stub, result}) + }, nil + } +} diff --git a/tests/tnt/call_bench_test_tmp.go b/tests/tnt/call_bench_test_tmp.go new file mode 100644 index 0000000..393be62 --- /dev/null +++ b/tests/tnt/call_bench_test_tmp.go @@ -0,0 +1,111 @@ +package tnt + +import ( + "context" + "testing" + "time" + + vshardrouter "github.com/KaymeKaydex/go-vshard-router" + "github.com/KaymeKaydex/go-vshard-router/providers/static" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool/v2/pool" +) + +func BenchmarkCallSimpleInsert_GO_RouterCallOld(b *testing.B) { + b.StopTimer() + skipOnInvalidRun(b) + + ctx := context.Background() + + cfg := getCfg() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(cfg), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: defaultTntUser, + Password: defaultTntPassword, + RequestTimeout: time.Minute, + }) + require.NoError(b, err) + + b.StartTimer() + for i := 0; i < b.N; i++ { + id := uuid.New() + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + _, _, err := router.RouterCallImplOld( + ctx, + bucketID, + vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW, Timeout: 10 * time.Second}, + "product_add", + []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}) + require.NoError(b, err) + } + + b.ReportAllocs() +} + +func BenchmarkCallSimpleSelect_GO_RouterCallOld(b *testing.B) { + b.StopTimer() + skipOnInvalidRun(b) + + ctx := context.Background() + + cfg := getCfg() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(cfg), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: defaultTntUser, + Password: defaultTntPassword, + }) + require.NoError(b, err) + + ids := make([]uuid.UUID, b.N) + + for i := 0; i < b.N; i++ { + id := uuid.New() + ids[i] = id + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + _, _, err := router.RouterCallImplOld( + ctx, + bucketID, + vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW}, + "product_add", + []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}) + require.NoError(b, err) + } + + type Request struct { + ID string `msgpack:"id"` + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + id := ids[i] + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + _, getTyped, err1 := router.RouterCallImpl( + ctx, + bucketID, + vshardrouter.CallOpts{VshardMode: vshardrouter.ReadMode, PoolMode: pool.ANY, Timeout: time.Second}, + "product_get", + []interface{}{&Request{ID: id.String()}}) + + var product Product + err2 := getTyped(&product) + + b.StopTimer() + require.NoError(b, err1) + require.NoError(b, err2) + b.StartTimer() + } + + b.ReportAllocs() +}