Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #42
Browse files Browse the repository at this point in the history
* ReplicaCall
	- fix decoding response (#42)
	- fix ignoring timeout while waiting for future.Get()
* Introduce Replicaset.CallAsync
* Add tests for methods of Replicaset
  • Loading branch information
nurzhan-saktaganov committed Sep 17, 2024
1 parent 77bd403 commit bcfa2c2
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 22 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

BUG FIXES:

* RouterCallImpl: fix decoding responce from storage_ref (partially #42)
* RouterCallImpl: fix decoding responce from storage_map (partially #42)
* RouterCallImpl: fix decoding response from storage_ref (partially #42)
* RouterCallImpl: fix decoding response from storage_map (partially #42)
* BucketDiscovery: check res for nil
* BucketStat: decode bsInfo by ptr
* ReplicaCall: fix decoding response (#42)
* ReplicaCall: fix ignoring timeout while waiting for future.Get()

FEATURES:

* Support new Sprintf-like logging interface (#48)
* DiscoveryTimeout by default is 1 minute (zero DiscoveryTimeout is not allowed #60)
* All discovering logs has new prefix [DISCOVERY]
* Introduce Replicaset.CallAsync, it is usefull to send concurrent requests to replicasets;
additionally, CallAsync provides new interface to interact with replicaset without cons of interface of ReplicaCall

REFACTOR:

Expand All @@ -31,6 +35,8 @@ TESTS:
* 2 sections for CI: static checks and tests
* integration tests run on ci with Tarantool cluster on vshard
* implemented luacheck for static checks
* New tnt tests for ReplicaCall
* New tnt tests for CallAsync

EXAMPLES:
* customer go mod fixed
Expand Down
52 changes: 32 additions & 20 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,27 @@ func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketSt

// ReplicaCall perform function on remote storage
// link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661
// This method is deprecated, because looks like it has a little bit broken interface
func (rs *Replicaset) ReplicaCall(
ctx context.Context,
opts ReplicasetCallOpts,
fnc string,
args interface{},
) (interface{}, StorageResultTypedFunc, error) {
if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
timeout := CallTimeoutMin

if opts.Timeout > 0 {
timeout = opts.Timeout
}

timeout := opts.Timeout
timeStart := time.Now()

req := tarantool.NewCallRequest(fnc)
req = req.Context(ctx)
req = req.Args(args)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

req := tarantool.NewCallRequest(fnc).
Context(ctx).
Args(args)

var (
respData []interface{}
Expand All @@ -116,20 +121,9 @@ func (rs *Replicaset) ReplicaCall(
continue
}

if len(respData) != 2 {
err = fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData))
continue
}

if respData[1] != nil {
assertErr := &StorageCallAssertError{}

err = mapstructure.Decode(respData[1], assertErr)
if err != nil {
continue
}

err = assertErr
if len(respData) == 0 {
// Since this method returns the first element of respData by contract, we can't return anything is this case (broken interface)
err = fmt.Errorf("response data is empty")
continue
}

Expand All @@ -138,3 +132,21 @@ func (rs *Replicaset) ReplicaCall(
}, nil
}
}

// Call sends async request to remote storage
func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future {
if opts.Timeout > 0 {
// Don't set any timeout by default, parent context timeout would be inherited in this case.
// Don't call cancel in defer, because this we send request asynchronously,
// and wait for result outside from this function.
// suppress linter warning: lostcancel: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak (govet)
//nolint:govet
ctx, _ = context.WithTimeout(ctx, opts.Timeout)
}

req := tarantool.NewCallRequest(fnc).
Context(ctx).
Args(args)

return rs.conn.Do(req, opts.PoolMode)
}
214 changes: 214 additions & 0 deletions tests/tnt/replicaset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package tnt

import (
"context"
"log"
"testing"
"time"

vshardrouter "github.com/KaymeKaydex/go-vshard-router"
"github.com/KaymeKaydex/go-vshard-router/providers/static"
"github.com/stretchr/testify/require"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
)

func TestReplicasetReplicaCall(t *testing.T) {
if !isCorrectRun() {
log.Printf("Incorrect run of tnt-test framework")
return
}

t.Parallel()

ctx := context.Background()

cfg := getCfg()

router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
TopologyProvider: static.NewProvider(cfg),
DiscoveryTimeout: 5 * time.Second,
DiscoveryMode: vshardrouter.DiscoveryModeOn,
TotalBucketCount: totalBucketCount,
User: defaultTntUser,
Password: defaultTntPassword,
})

require.Nil(t, err, "NewRouter finished successfully")

rsMap := router.RouterRouteAll()

var rs *vshardrouter.Replicaset
// pick random rs
for _, v := range rsMap {
rs = v
break
}

_ = rs.String() // just for coverage

callOpts := vshardrouter.ReplicasetCallOpts{
PoolMode: pool.ANY,
}

_, _, err = rs.ReplicaCall(ctx, callOpts, "echo", nil)
require.NotNil(t, err, "ReplicaCall finished with err on nil args")

_, _, err = rs.ReplicaCall(ctx, callOpts, "echo", []interface{}{})
require.NotNil(t, err, "ReplicaCall returns err on empty response (broken interface)")

// args len is 1
args := []interface{}{"arg1"}
resp, getTyped, err := rs.ReplicaCall(ctx, callOpts, "echo", args)
require.Nilf(t, err, "ReplicaCall finished with no err for args: %v", args)
require.Equalf(t, args[0], resp, "ReplicaCall resp ok for args: %v", args)
var typed interface{}
err = getTyped(&typed)
require.Nilf(t, err, "getTyped finished with no err for args: %v", args)
require.Equalf(t, args[0], typed, "getTyped result is ok for args: %v", args)

// args len is 2
args = []interface{}{"arg1", "arg2"}
resp, getTyped, err = rs.ReplicaCall(ctx, callOpts, "echo", args)
require.Nilf(t, err, "ReplicaCall finished with no err for args: %v", args)
require.Equalf(t, args[0], resp, "ReplicaCall resp ok for args: %v", args)
typed = nil // set to nil, otherwise getTyped tries to use the old content
err = getTyped(&typed)
require.Nilf(t, err, "getTyped finished with no err for args: %v", args)
require.Equalf(t, args[0], typed, "getTyped result is ok for args: %v", args)

// don't decode assert error
args = []interface{}{nil, "non nil"}
_, _, err = rs.ReplicaCall(ctx, callOpts, "echo", args)
require.Nil(t, err, "ReplicaCall doesn't try decode assert error")

args = []interface{}{2}
callOpts.Timeout = 500 * time.Millisecond
start := time.Now()
_, _, err = rs.ReplicaCall(ctx, callOpts, "sleep", args)
duration := time.Since(start)
require.NotNil(t, err, "ReplicaCall timeout happened")
require.Less(t, duration, 600*time.Millisecond, "ReplicaCall timeout works correctly")
callOpts.Timeout = 0 // return back default value

// raise_luajit_error
_, _, err = rs.ReplicaCall(ctx, callOpts, "raise_luajit_error", nil)
require.NotNil(t, err, "raise_luajit_error returns error")

// raise_client_error
_, _, err = rs.ReplicaCall(ctx, callOpts, "raise_client_error", nil)
require.NotNil(t, err, "raise_client_error returns error")
}

func TestReplicsetCallAsync(t *testing.T) {
if !isCorrectRun() {
log.Printf("Incorrect run of tnt-test framework")
return
}

t.Parallel()

ctx := context.Background()

cfg := getCfg()

router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
TopologyProvider: static.NewProvider(cfg),
DiscoveryTimeout: 5 * time.Second,
DiscoveryMode: vshardrouter.DiscoveryModeOn,
TotalBucketCount: totalBucketCount,
User: defaultTntUser,
Password: defaultTntPassword,
})

require.Nil(t, err, "NewRouter finished successfully")

rsMap := router.RouterRouteAll()

var rs *vshardrouter.Replicaset
// pick random rs
for _, v := range rsMap {
rs = v
break
}

callOpts := vshardrouter.ReplicasetCallOpts{
PoolMode: pool.ANY,
}

// Tests for arglen ans response parsing
future := rs.CallAsync(ctx, callOpts, "echo", nil)
resp, err := future.Get()
require.Nil(t, err, "CallAsync finished with no err on nil args")
require.Equal(t, resp, []interface{}{}, "CallAsync returns empty arr on nil args")
var typed interface{}
err = future.GetTyped(&typed)
require.Nil(t, err, "GetTyped finished with no err on nil args")
require.Equal(t, []interface{}{}, resp, "GetTyped returns empty arr on nil args")

const checkUpTo = 100
for argLen := 1; argLen <= checkUpTo; argLen++ {
args := []interface{}{}

for i := 0; i < argLen; i++ {
args = append(args, "arg")
}

future := rs.CallAsync(ctx, callOpts, "echo", args)
resp, err := future.Get()
require.Nilf(t, err, "CallAsync finished with no err for argLen %d", argLen)
require.Equalf(t, args, resp, "CallAsync resp ok for argLen %d", argLen)

var typed interface{}
err = future.GetTyped(&typed)
require.Nilf(t, err, "GetTyped finished with no err for argLen %d", argLen)
require.Equal(t, args, typed, "GetTyped resp ok for argLen %d", argLen)
}

// Test for async execution
timeBefore := time.Now()

var futures = make([]*tarantool.Future, 0, len(rsMap))
for _, rs := range rsMap {
future := rs.CallAsync(ctx, callOpts, "sleep", []interface{}{1})
futures = append(futures, future)
}

for i, future := range futures {
_, err := future.Get()
require.Nil(t, err, "future[%d].Get finished with no err for async test", i)
}

duration := time.Since(timeBefore)
require.True(t, len(rsMap) > 1, "Async test: more than one replicaset")
require.Less(t, duration, 1200*time.Millisecond, "Async test: requests were sent concurrently")

// Test no timeout by default
future = rs.CallAsync(ctx, callOpts, "sleep", []interface{}{1})
_, err = future.Get()
require.Nil(t, err, "CallAsync no timeout by default")

// Test for timeout via ctx
ctxTimeout, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
future = rs.CallAsync(ctxTimeout, callOpts, "sleep", []interface{}{1})
_, err = future.Get()
require.NotNil(t, err, "CallAsync timeout by context does work")

// Test for timeout via config
callOptsTimeout := vshardrouter.ReplicasetCallOpts{
PoolMode: pool.ANY,
Timeout: 500 * time.Millisecond,
}
future = rs.CallAsync(ctx, callOptsTimeout, "sleep", []interface{}{1})
_, err = future.Get()
require.NotNil(t, err, "CallAsync timeout by callOpts does work")

future = rs.CallAsync(ctx, callOpts, "raise_luajit_error", nil)
_, err = future.Get()
require.NotNil(t, err, "raise_luajit_error returns error")

future = rs.CallAsync(ctx, callOpts, "raise_client_error", nil)
_, err = future.Get()
require.NotNil(t, err, "raise_client_error returns error")
}

0 comments on commit bcfa2c2

Please sign in to comment.