Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #110
Browse files Browse the repository at this point in the history
- TODO: add description
- TODO: add info into CHANGES file
  • Loading branch information
nurzhan-saktaganov committed Dec 13, 2024
1 parent a486c03 commit a3d1c2f
Show file tree
Hide file tree
Showing 6 changed files with 688 additions and 65 deletions.
238 changes: 183 additions & 55 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type VshardMode string
const (
ReadMode VshardMode = "read"
WriteMode VshardMode = "write"

// callTimeoutDefault is a default timeout when no timeout is provided
callTimeoutDefault = 500 * time.Millisecond
)

func (c VshardMode) String() string {
Expand All @@ -32,7 +35,7 @@ func (c VshardMode) String() string {
type vshardStorageCallResponseProto struct {
AssertError *assertError // not nil if there is assert error
VshardError *StorageCallVShardError // not nil if there is vshard response
Data []interface{} // raw response data
CallResp VshardCallResp
}

func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -116,14 +119,13 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error
}

// isVShardRespOk is true
r.Data = make([]interface{}, 0, respArrayLen-1)

r.CallResp.data = make([]msgpack.RawMessage, 0, respArrayLen-1)
for i := 1; i < respArrayLen; i++ {
elem, err := d.DecodeInterface()
elem, err := d.DecodeRaw()
if err != nil {
return fmt.Errorf("failed to decode into interface element #%d of response array", i+1)
return fmt.Errorf("failed to decode into msgpack.RawMessage element #%d of response array", i-1)
}
r.Data = append(r.Data, elem)
r.CallResp.data = append(r.CallResp.data, elem)
}

return nil
Expand Down Expand Up @@ -176,54 +178,111 @@ type CallOpts struct {
Timeout time.Duration
}

// revive warns us: time-naming: var CallTimeoutMin is of type time.Duration; don't use unit-specific suffix "Min".
// But the original lua vshard implementation uses this naming, so we use it too.
//
//nolint:revive
const CallTimeoutMin = time.Second / 2
type VshardCallMode int

// RouterCallImpl Perform shard operation function will restart operation
// after wrong bucket response until timeout is reached
func (r *Router) RouterCallImpl(ctx context.Context,
bucketID uint64,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {
const (
VshardCallModeRO VshardCallMode = iota
VshardCallModeRW
VshardCallModeRE
VshardCallModeBRO
VshardCallModeBRE
)

type VshardCallOptions struct {
Timeout time.Duration
}

type VshardCallResp struct {
data []msgpack.RawMessage
}

func (r VshardCallResp) Get() ([]interface{}, error) {
resp := make([]interface{}, len(r.data))

for i, rawMessage := range r.data {
if err := msgpack.Unmarshal(rawMessage, &resp[i]); err != nil {
return nil, fmt.Errorf("failed to decode into interface element #%d of response array: %w", i, err)
}
}

return resp, nil
}

func (r VshardCallResp) GetTyped(result []interface{}) error {
minLen := len(result)
if dataLen := len(r.data); dataLen < minLen {
minLen = dataLen
}

for i := 0; i < minLen; i++ {
if err := msgpack.Unmarshal(r.data[i], &result[i]); err != nil {
return fmt.Errorf("failed to decode into result[%d] element #%d of response array: %w", i, i, err)
}
}

return nil
}

func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardCallMode,
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
const vshardStorageClientCall = "vshard.storage.call"

if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
return VshardCallResp{}, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
var poolMode pool.Mode
var vshardMode VshardMode

switch mode {
case VshardCallModeRO:
poolMode, vshardMode = pool.RO, ReadMode
case VshardCallModeRW:
poolMode, vshardMode = pool.RW, WriteMode
case VshardCallModeRE:
// poolMode, vshardMode = pool.PreferRO, ReadMode
// since go-tarantool always use balance=true politic,
// we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
return VshardCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
case VshardCallModeBRO:
poolMode, vshardMode = pool.ANY, ReadMode
case VshardCallModeBRE:
poolMode, vshardMode = pool.PreferRO, ReadMode
default:
return VshardCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
}

timeout := opts.Timeout
timeStart := time.Now()
timeout := callTimeoutDefault
if opts.Timeout > 0 {
timeout = opts.Timeout
}

req := tarantool.NewCallRequest(vshardStorageClientCall)
req = req.Context(ctx)
req = req.Args([]interface{}{
bucketID,
opts.VshardMode.String(),
fnc,
args,
})
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

tntReq := tarantool.NewCallRequest(vshardStorageClientCall).
Context(ctx).
Args([]interface{}{
bucketID,
vshardMode,
fnc,
args,
})

requestStartTime := time.Now()

var err error

for {
if since := time.Since(timeStart); since > timeout {
r.metrics().RequestDuration(since, false, false)
if spent := time.Since(requestStartTime); spent > timeout {
r.metrics().RequestDuration(spent, false, false)

r.log().Debugf(ctx, "Return result on timeout; since %s of timeout %s", since, timeout)
r.log().Debugf(ctx, "Return result on timeout; spent %s of timeout %s", spent, timeout)
if err == nil {
err = fmt.Errorf("cant get call cause call impl timeout")
}

return nil, nil, err
return VshardCallResp{}, err
}

var rs *Replicaset
Expand All @@ -243,18 +302,16 @@ func (r *Router) RouterCallImpl(ctx context.Context,

r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)

future := rs.conn.Do(req, opts.PoolMode)

var storageCallResponse vshardStorageCallResponseProto
err = future.GetTyped(&storageCallResponse)
err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
if err != nil {
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
return VshardCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
}

r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse)

if storageCallResponse.AssertError != nil {
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
return VshardCallResp{}, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
}

if storageCallResponse.VshardError != nil {
Expand All @@ -268,7 +325,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if vshardError.Destination != "" {
destinationUUID, err := uuid.Parse(vshardError.Destination)
if err != nil {
return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
return VshardCallResp{}, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
vshardStorageClientCall, vshardError, err)
}

Expand All @@ -292,8 +349,8 @@ func (r *Router) RouterCallImpl(ctx context.Context,
const defaultPoolingPause = 50 * time.Millisecond
time.Sleep(defaultPoolingPause)

if time.Since(timeStart) > timeout {
return nil, nil, vshardError
if spent := time.Since(requestStartTime); spent > timeout {
return VshardCallResp{}, vshardError
}
}
}
Expand All @@ -312,30 +369,101 @@ func (r *Router) RouterCallImpl(ctx context.Context,
// There is a comment why lua vshard router doesn't retry:
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
r.BucketReset(bucketID)
return nil, nil, vshardError
return VshardCallResp{}, vshardError
case VShardErrNameNonMaster:
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
// we just return this error as is.
return nil, nil, vshardError
return VshardCallResp{}, vshardError
default:
return nil, nil, vshardError
return VshardCallResp{}, vshardError
}
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)
r.metrics().RequestDuration(time.Since(requestStartTime), true, false)

return storageCallResponse.Data, func(result interface{}) error {
if len(storageCallResponse.Data) == 0 {
return nil
}
return storageCallResponse.CallResp, nil
}
}

func (r *Router) CallRO(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
return r.Call(ctx, bucketID, VshardCallModeRO, fnc, args, opts)
}

var stub bool
func (r *Router) CallRW(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
return r.Call(ctx, bucketID, VshardCallModeRW, fnc, args, opts)
}

func (r *Router) CallRE(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
return r.Call(ctx, bucketID, VshardCallModeRE, fnc, args, opts)
}

func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
return r.Call(ctx, bucketID, VshardCallModeBRO, fnc, args, opts)
}

return future.GetTyped(&[]interface{}{&stub, result})
}, nil
func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
return r.Call(ctx, bucketID, VshardCallModeBRE, fnc, args, opts)
}

// RouterCallImpl Perform shard operation function will restart operation
// after wrong bucket response until timeout is reached
// Deprecated: RouterCallImpl is deprecated.
// See https://github.com/KaymeKaydex/go-vshard-router/issues/110.
// Use Call, Call[RO, RW, RE, BRO, BRE] methods instead.
func (r *Router) RouterCallImpl(ctx context.Context,
bucketID uint64,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

var vshardCallOpts = VshardCallOptions{
Timeout: opts.Timeout,
}

var vshardCallMode VshardCallMode

switch opts.VshardMode {
case WriteMode:
vshardCallMode = VshardCallModeRW
case ReadMode:
switch opts.PoolMode {
case pool.ANY:
vshardCallMode = VshardCallModeBRO
case pool.RO:
vshardCallMode = VshardCallModeRO
case pool.RW:
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
case pool.PreferRO:
vshardCallMode = VshardCallModeBRE
case pool.PreferRW:
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
default:
return nil, nil, fmt.Errorf("unexpected opts.PoolMode %v", opts.PoolMode)
}
default:
return nil, nil, fmt.Errorf("unexpected opts.VshardMode %v", opts.VshardMode)
}

vshardCallResp, err := r.Call(ctx, bucketID, vshardCallMode, fnc, args, vshardCallOpts)
if err != nil {
return nil, nil, err
}

data, err := vshardCallResp.Get()
if err != nil {
return nil, nil, err
}

return data, func(result interface{}) error {
return vshardCallResp.GetTyped([]interface{}{result})
}, nil
}

// RouterMapCallRWImpl perform call function on all masters in the cluster
Expand All @@ -349,7 +477,7 @@ func (r *Router) RouterMapCallRWImpl(
) (map[uuid.UUID]interface{}, error) {
const vshardStorageServiceCall = "vshard.storage._call"

timeout := CallTimeoutMin
timeout := callTimeoutDefault
if opts.Timeout > 0 {
timeout = opts.Timeout
}
Expand Down
2 changes: 1 addition & 1 deletion api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
conn: mPool,
})

_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second}, "test", []byte("test"))
_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second, VshardMode: ReadMode}, "test", []byte("test"))
require.ErrorIs(t, err, futureError)
})
}
9 changes: 6 additions & 3 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,19 @@ func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
}

// ReplicaCall perform function on remote storage
// link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661
// This method is deprecated, because looks like it has a little bit broken interface
// link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661.
// Deprecated: RouterCallImpl is deprecated,
// because looks like it has a little bit broken interface.
// See https://github.com/KaymeKaydex/go-vshard-router/issues/42.
// Use CallAsync instead.
func (rs *Replicaset) ReplicaCall(
ctx context.Context,
opts ReplicasetCallOpts,
fnc string,
args interface{},
) (interface{}, StorageResultTypedFunc, error) {
if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
opts.Timeout = callTimeoutDefault
}

future := rs.CallAsync(ctx, opts, fnc, args)
Expand Down
Loading

0 comments on commit a3d1c2f

Please sign in to comment.