From bcfa2c29b90f3bf729c8107fdb96da39a4e96004 Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Sat, 14 Sep 2024 16:13:46 +0300 Subject: [PATCH] resolve issue #42 * ReplicaCall - fix decoding response (#42) - fix ignoring timeout while waiting for future.Get() * Introduce Replicaset.CallAsync * Add tests for methods of Replicaset --- CHANGELOG.md | 10 +- replicaset.go | 52 +++++---- tests/tnt/replicaset_test.go | 214 +++++++++++++++++++++++++++++++++++ 3 files changed, 254 insertions(+), 22 deletions(-) create mode 100644 tests/tnt/replicaset_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 5804093..0a929cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,16 +2,20 @@ BUG FIXES: -* RouterCallImpl: fix decoding responce from storage_ref (partially #42) -* RouterCallImpl: fix decoding responce from storage_map (partially #42) +* RouterCallImpl: fix decoding response from storage_ref (partially #42) +* RouterCallImpl: fix decoding response from storage_map (partially #42) * BucketDiscovery: check res for nil * BucketStat: decode bsInfo by ptr +* ReplicaCall: fix decoding response (#42) +* ReplicaCall: fix ignoring timeout while waiting for future.Get() FEATURES: * Support new Sprintf-like logging interface (#48) * DiscoveryTimeout by default is 1 minute (zero DiscoveryTimeout is not allowed #60) * All discovering logs has new prefix [DISCOVERY] +* Introduce Replicaset.CallAsync, it is usefull to send concurrent requests to replicasets; + additionally, CallAsync provides new interface to interact with replicaset without cons of interface of ReplicaCall REFACTOR: @@ -31,6 +35,8 @@ TESTS: * 2 sections for CI: static checks and tests * integration tests run on ci with Tarantool cluster on vshard * implemented luacheck for static checks +* New tnt tests for ReplicaCall +* New tnt tests for CallAsync EXAMPLES: * customer go mod fixed diff --git a/replicaset.go b/replicaset.go index 1e231a4..38ff18b 100644 --- a/replicaset.go +++ b/replicaset.go @@ -82,22 +82,27 @@ func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketSt // ReplicaCall perform function on remote storage // link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661 +// This method is deprecated, because looks like it has a little bit broken interface func (rs *Replicaset) ReplicaCall( ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}, ) (interface{}, StorageResultTypedFunc, error) { - if opts.Timeout == 0 { - opts.Timeout = CallTimeoutMin + timeout := CallTimeoutMin + + if opts.Timeout > 0 { + timeout = opts.Timeout } - timeout := opts.Timeout timeStart := time.Now() - req := tarantool.NewCallRequest(fnc) - req = req.Context(ctx) - req = req.Args(args) + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + req := tarantool.NewCallRequest(fnc). + Context(ctx). + Args(args) var ( respData []interface{} @@ -116,20 +121,9 @@ func (rs *Replicaset) ReplicaCall( continue } - if len(respData) != 2 { - err = fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData)) - continue - } - - if respData[1] != nil { - assertErr := &StorageCallAssertError{} - - err = mapstructure.Decode(respData[1], assertErr) - if err != nil { - continue - } - - err = assertErr + if len(respData) == 0 { + // Since this method returns the first element of respData by contract, we can't return anything is this case (broken interface) + err = fmt.Errorf("response data is empty") continue } @@ -138,3 +132,21 @@ func (rs *Replicaset) ReplicaCall( }, nil } } + +// Call sends async request to remote storage +func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future { + if opts.Timeout > 0 { + // Don't set any timeout by default, parent context timeout would be inherited in this case. + // Don't call cancel in defer, because this we send request asynchronously, + // and wait for result outside from this function. + // suppress linter warning: lostcancel: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak (govet) + //nolint:govet + ctx, _ = context.WithTimeout(ctx, opts.Timeout) + } + + req := tarantool.NewCallRequest(fnc). + Context(ctx). + Args(args) + + return rs.conn.Do(req, opts.PoolMode) +} diff --git a/tests/tnt/replicaset_test.go b/tests/tnt/replicaset_test.go new file mode 100644 index 0000000..1194f52 --- /dev/null +++ b/tests/tnt/replicaset_test.go @@ -0,0 +1,214 @@ +package tnt + +import ( + "context" + "log" + "testing" + "time" + + vshardrouter "github.com/KaymeKaydex/go-vshard-router" + "github.com/KaymeKaydex/go-vshard-router/providers/static" + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/pool" +) + +func TestReplicasetReplicaCall(t *testing.T) { + if !isCorrectRun() { + log.Printf("Incorrect run of tnt-test framework") + return + } + + t.Parallel() + + ctx := context.Background() + + cfg := getCfg() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(cfg), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: defaultTntUser, + Password: defaultTntPassword, + }) + + require.Nil(t, err, "NewRouter finished successfully") + + rsMap := router.RouterRouteAll() + + var rs *vshardrouter.Replicaset + // pick random rs + for _, v := range rsMap { + rs = v + break + } + + _ = rs.String() // just for coverage + + callOpts := vshardrouter.ReplicasetCallOpts{ + PoolMode: pool.ANY, + } + + _, _, err = rs.ReplicaCall(ctx, callOpts, "echo", nil) + require.NotNil(t, err, "ReplicaCall finished with err on nil args") + + _, _, err = rs.ReplicaCall(ctx, callOpts, "echo", []interface{}{}) + require.NotNil(t, err, "ReplicaCall returns err on empty response (broken interface)") + + // args len is 1 + args := []interface{}{"arg1"} + resp, getTyped, err := rs.ReplicaCall(ctx, callOpts, "echo", args) + require.Nilf(t, err, "ReplicaCall finished with no err for args: %v", args) + require.Equalf(t, args[0], resp, "ReplicaCall resp ok for args: %v", args) + var typed interface{} + err = getTyped(&typed) + require.Nilf(t, err, "getTyped finished with no err for args: %v", args) + require.Equalf(t, args[0], typed, "getTyped result is ok for args: %v", args) + + // args len is 2 + args = []interface{}{"arg1", "arg2"} + resp, getTyped, err = rs.ReplicaCall(ctx, callOpts, "echo", args) + require.Nilf(t, err, "ReplicaCall finished with no err for args: %v", args) + require.Equalf(t, args[0], resp, "ReplicaCall resp ok for args: %v", args) + typed = nil // set to nil, otherwise getTyped tries to use the old content + err = getTyped(&typed) + require.Nilf(t, err, "getTyped finished with no err for args: %v", args) + require.Equalf(t, args[0], typed, "getTyped result is ok for args: %v", args) + + // don't decode assert error + args = []interface{}{nil, "non nil"} + _, _, err = rs.ReplicaCall(ctx, callOpts, "echo", args) + require.Nil(t, err, "ReplicaCall doesn't try decode assert error") + + args = []interface{}{2} + callOpts.Timeout = 500 * time.Millisecond + start := time.Now() + _, _, err = rs.ReplicaCall(ctx, callOpts, "sleep", args) + duration := time.Since(start) + require.NotNil(t, err, "ReplicaCall timeout happened") + require.Less(t, duration, 600*time.Millisecond, "ReplicaCall timeout works correctly") + callOpts.Timeout = 0 // return back default value + + // raise_luajit_error + _, _, err = rs.ReplicaCall(ctx, callOpts, "raise_luajit_error", nil) + require.NotNil(t, err, "raise_luajit_error returns error") + + // raise_client_error + _, _, err = rs.ReplicaCall(ctx, callOpts, "raise_client_error", nil) + require.NotNil(t, err, "raise_client_error returns error") +} + +func TestReplicsetCallAsync(t *testing.T) { + if !isCorrectRun() { + log.Printf("Incorrect run of tnt-test framework") + return + } + + t.Parallel() + + ctx := context.Background() + + cfg := getCfg() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(cfg), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: defaultTntUser, + Password: defaultTntPassword, + }) + + require.Nil(t, err, "NewRouter finished successfully") + + rsMap := router.RouterRouteAll() + + var rs *vshardrouter.Replicaset + // pick random rs + for _, v := range rsMap { + rs = v + break + } + + callOpts := vshardrouter.ReplicasetCallOpts{ + PoolMode: pool.ANY, + } + + // Tests for arglen ans response parsing + future := rs.CallAsync(ctx, callOpts, "echo", nil) + resp, err := future.Get() + require.Nil(t, err, "CallAsync finished with no err on nil args") + require.Equal(t, resp, []interface{}{}, "CallAsync returns empty arr on nil args") + var typed interface{} + err = future.GetTyped(&typed) + require.Nil(t, err, "GetTyped finished with no err on nil args") + require.Equal(t, []interface{}{}, resp, "GetTyped returns empty arr on nil args") + + const checkUpTo = 100 + for argLen := 1; argLen <= checkUpTo; argLen++ { + args := []interface{}{} + + for i := 0; i < argLen; i++ { + args = append(args, "arg") + } + + future := rs.CallAsync(ctx, callOpts, "echo", args) + resp, err := future.Get() + require.Nilf(t, err, "CallAsync finished with no err for argLen %d", argLen) + require.Equalf(t, args, resp, "CallAsync resp ok for argLen %d", argLen) + + var typed interface{} + err = future.GetTyped(&typed) + require.Nilf(t, err, "GetTyped finished with no err for argLen %d", argLen) + require.Equal(t, args, typed, "GetTyped resp ok for argLen %d", argLen) + } + + // Test for async execution + timeBefore := time.Now() + + var futures = make([]*tarantool.Future, 0, len(rsMap)) + for _, rs := range rsMap { + future := rs.CallAsync(ctx, callOpts, "sleep", []interface{}{1}) + futures = append(futures, future) + } + + for i, future := range futures { + _, err := future.Get() + require.Nil(t, err, "future[%d].Get finished with no err for async test", i) + } + + duration := time.Since(timeBefore) + require.True(t, len(rsMap) > 1, "Async test: more than one replicaset") + require.Less(t, duration, 1200*time.Millisecond, "Async test: requests were sent concurrently") + + // Test no timeout by default + future = rs.CallAsync(ctx, callOpts, "sleep", []interface{}{1}) + _, err = future.Get() + require.Nil(t, err, "CallAsync no timeout by default") + + // Test for timeout via ctx + ctxTimeout, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + future = rs.CallAsync(ctxTimeout, callOpts, "sleep", []interface{}{1}) + _, err = future.Get() + require.NotNil(t, err, "CallAsync timeout by context does work") + + // Test for timeout via config + callOptsTimeout := vshardrouter.ReplicasetCallOpts{ + PoolMode: pool.ANY, + Timeout: 500 * time.Millisecond, + } + future = rs.CallAsync(ctx, callOptsTimeout, "sleep", []interface{}{1}) + _, err = future.Get() + require.NotNil(t, err, "CallAsync timeout by callOpts does work") + + future = rs.CallAsync(ctx, callOpts, "raise_luajit_error", nil) + _, err = future.Get() + require.NotNil(t, err, "raise_luajit_error returns error") + + future = rs.CallAsync(ctx, callOpts, "raise_client_error", nil) + _, err = future.Get() + require.NotNil(t, err, "raise_client_error returns error") +}