Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

resolve issue #110 #111

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ FEATURES:
* Add pause between requests in buckets discovering. Configured by config DiscoveryWorkStep, default is 10ms.
* Add ReplicaUUID to the StorageCallVShardError struct.
* New method 'RouterMapCallRW[T]' to replace the deprecated one 'RouterMapCallRWImpl'.
* New method 'Router.Call' to replace the deprecated one 'RouterCallImpl'.

REFACTOR:

Expand All @@ -29,6 +30,7 @@ REFACTOR:
* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100).
* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content).
* Add custom msgpackv5 decoder for 'RouterMapCallRW'.
* New backward-compatible signature for StorageResultTypedFunc to fix interface for RouterCallImpl.

TESTS:
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.
Expand Down
240 changes: 190 additions & 50 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type VshardMode string
const (
ReadMode VshardMode = "read"
WriteMode VshardMode = "write"

// callTimeoutDefault is a default timeout when no timeout is provided
callTimeoutDefault = 500 * time.Millisecond
KaymeKaydex marked this conversation as resolved.
Show resolved Hide resolved
)

func (c VshardMode) String() string {
Expand All @@ -31,7 +34,7 @@ func (c VshardMode) String() string {
type vshardStorageCallResponseProto struct {
AssertError *assertError // not nil if there is assert error
VshardError *StorageCallVShardError // not nil if there is vshard response
Data []interface{} // raw response data
CallResp VshardRouterCallResp
}

func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -115,14 +118,13 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error
}

// isVShardRespOk is true
r.Data = make([]interface{}, 0, respArrayLen-1)

r.CallResp.rawMessages = make([]msgpack.RawMessage, 0, respArrayLen-1)
for i := 1; i < respArrayLen; i++ {
elem, err := d.DecodeInterface()
elem, err := d.DecodeRaw()
if err != nil {
return fmt.Errorf("failed to decode into interface element #%d of response array", i+1)
return fmt.Errorf("failed to decode into msgpack.RawMessage element #%d of response array", i-1)
}
r.Data = append(r.Data, elem)
r.CallResp.rawMessages = append(r.CallResp.rawMessages, elem)
}

return nil
Expand Down Expand Up @@ -167,62 +169,180 @@ func (s StorageCallVShardError) Error() string {
return fmt.Sprintf("%+v", alias(s))
}

type StorageResultTypedFunc = func(result interface{}) error
type StorageResultTypedFunc = func(result ...interface{}) error

type CallOpts struct {
VshardMode VshardMode // vshard mode in call
PoolMode pool.Mode
Timeout time.Duration
}

// revive warns us: time-naming: var CallTimeoutMin is of type time.Duration; don't use unit-specific suffix "Min".
// But the original lua vshard implementation uses this naming, so we use it too.
//
//nolint:revive
const CallTimeoutMin = time.Second / 2
// VshardRouterCallMode is a type to represent call mode for Router.Call method.
type VshardRouterCallMode int

const (
nurzhan-saktaganov marked this conversation as resolved.
Show resolved Hide resolved
// VshardRouterCallModeRO sets a read-only mode for Router.Call.
VshardRouterCallModeRO VshardRouterCallMode = iota
// VshardRouterCallModeRW sets a read-write mode for Router.Call.
VshardRouterCallModeRW
// VshardRouterCallModeRE acts like VshardRouterCallModeRO
// with preference for a replica rather than a master.
// This mode is not supported yet.
VshardRouterCallModeRE
// VshardRouterCallModeBRO acts like VshardRouterCallModeRO with balancing.
VshardRouterCallModeBRO
// VshardRouterCallModeBRE acts like VshardRouterCallModeRO with balancing
// and preference for a replica rather than a master.
VshardRouterCallModeBRE
)

// VshardRouterCallOptions represents options to Router.Call[XXX] methods.
type VshardRouterCallOptions struct {
Timeout time.Duration
}

// VshardRouterCallResp represents a response from Router.Call[XXX] methods.
type VshardRouterCallResp struct {
rawMessages []msgpack.RawMessage
}

// Get returns a response from user defined function as []interface{}.
func (r VshardRouterCallResp) Get() ([]interface{}, error) {
resp := make([]interface{}, len(r.rawMessages))
return resp, r.GetTyped(resp)
}

// GetTyped decodes a response from user defined function into custom values.
func (r VshardRouterCallResp) GetTyped(result []interface{}) error {
minLen := len(result)
if dataLen := len(r.rawMessages); dataLen < minLen {
minLen = dataLen
}

for i := 0; i < minLen; i++ {
if err := msgpack.Unmarshal(r.rawMessages[i], &result[i]); err != nil {
return fmt.Errorf("failed to decode into result[%d] element #%d of response array: %w", i, i, err)
}
}

return nil
}

// RouterCallImpl Perform shard operation function will restart operation
// after wrong bucket response until timeout is reached
// Deprecated: RouterCallImpl is deprecated.
// See https://github.com/KaymeKaydex/go-vshard-router/issues/110.
// Use Call method with RO, RW, RE, BRO, BRE modes instead.
func (r *Router) RouterCallImpl(ctx context.Context,
bucketID uint64,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

var vshardCallOpts = VshardRouterCallOptions{
Timeout: opts.Timeout,
}

var vshardCallMode VshardRouterCallMode

switch opts.VshardMode {
case WriteMode:
vshardCallMode = VshardRouterCallModeRW
case ReadMode:
switch opts.PoolMode {
case pool.ANY:
vshardCallMode = VshardRouterCallModeBRO
case pool.RO:
vshardCallMode = VshardRouterCallModeRO
case pool.RW:
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
case pool.PreferRO:
vshardCallMode = VshardRouterCallModeBRE
case pool.PreferRW:
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
default:
return nil, nil, fmt.Errorf("unexpected opts.PoolMode %v", opts.PoolMode)
}
default:
return nil, nil, fmt.Errorf("unexpected opts.VshardMode %v", opts.VshardMode)
}

vshardCallResp, err := r.Call(ctx, bucketID, vshardCallMode, fnc, args, vshardCallOpts)
if err != nil {
return nil, nil, err
}

data, err := vshardCallResp.Get()
if err != nil {
return nil, nil, err
}

return data, func(result ...interface{}) error {
return vshardCallResp.GetTyped(result)
}, nil
}

// Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.
func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardRouterCallMode,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
const vshardStorageClientCall = "vshard.storage.call"

if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
return VshardRouterCallResp{}, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
var poolMode pool.Mode
var vshardMode VshardMode

switch mode {
case VshardRouterCallModeRO:
poolMode, vshardMode = pool.RO, ReadMode
case VshardRouterCallModeRW:
poolMode, vshardMode = pool.RW, WriteMode
case VshardRouterCallModeRE:
// poolMode, vshardMode = pool.PreferRO, ReadMode
// since go-tarantool always use balance=true politic,
// we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
case VshardRouterCallModeBRO:
poolMode, vshardMode = pool.ANY, ReadMode
case VshardRouterCallModeBRE:
poolMode, vshardMode = pool.PreferRO, ReadMode
default:
return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
}

timeout := opts.Timeout
timeStart := time.Now()
timeout := callTimeoutDefault
if opts.Timeout > 0 {
timeout = opts.Timeout
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

tntReq := tarantool.NewCallRequest(vshardStorageClientCall).
Context(ctx).
Args([]interface{}{
bucketID,
vshardMode,
fnc,
args,
})

req := tarantool.NewCallRequest(vshardStorageClientCall)
req = req.Context(ctx)
req = req.Args([]interface{}{
bucketID,
opts.VshardMode.String(),
fnc,
args,
})
requestStartTime := time.Now()

var err error

for {
if since := time.Since(timeStart); since > timeout {
r.metrics().RequestDuration(since, false, false)
if spent := time.Since(requestStartTime); spent > timeout {
r.metrics().RequestDuration(spent, false, false)

r.log().Debugf(ctx, "Return result on timeout; since %s of timeout %s", since, timeout)
r.log().Debugf(ctx, "Return result on timeout; spent %s of timeout %s", spent, timeout)
if err == nil {
err = fmt.Errorf("cant get call cause call impl timeout")
}

return nil, nil, err
return VshardRouterCallResp{}, err
}

var rs *Replicaset
Expand All @@ -242,18 +362,16 @@ func (r *Router) RouterCallImpl(ctx context.Context,

r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)

future := rs.conn.Do(req, opts.PoolMode)

var storageCallResponse vshardStorageCallResponseProto
err = future.GetTyped(&storageCallResponse)
err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
if err != nil {
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
}

r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse)

if storageCallResponse.AssertError != nil {
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
return VshardRouterCallResp{}, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
}

if storageCallResponse.VshardError != nil {
Expand All @@ -267,7 +385,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if vshardError.Destination != "" {
destinationUUID, err := uuid.Parse(vshardError.Destination)
if err != nil {
return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
return VshardRouterCallResp{}, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
vshardStorageClientCall, vshardError, err)
}

Expand All @@ -291,8 +409,8 @@ func (r *Router) RouterCallImpl(ctx context.Context,
const defaultPoolingPause = 50 * time.Millisecond
time.Sleep(defaultPoolingPause)

if time.Since(timeStart) > timeout {
return nil, nil, vshardError
if spent := time.Since(requestStartTime); spent > timeout {
return VshardRouterCallResp{}, vshardError
}
}
}
Expand All @@ -311,30 +429,52 @@ func (r *Router) RouterCallImpl(ctx context.Context,
// There is a comment why lua vshard router doesn't retry:
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
r.BucketReset(bucketID)
return nil, nil, vshardError
return VshardRouterCallResp{}, vshardError
case VShardErrNameNonMaster:
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
// we just return this error as is.
return nil, nil, vshardError
return VshardRouterCallResp{}, vshardError
default:
return nil, nil, vshardError
return VshardRouterCallResp{}, vshardError
}
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)
r.metrics().RequestDuration(time.Since(requestStartTime), true, false)

return storageCallResponse.Data, func(result interface{}) error {
if len(storageCallResponse.Data) == 0 {
return nil
}
return storageCallResponse.CallResp, nil
}
}

// CallRO is an alias for Call with VshardRouterCallModeRO.
func (r *Router) CallRO(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeRO, fnc, args, opts)
}

var stub bool
// CallRW is an alias for Call with VshardRouterCallModeRW.
func (r *Router) CallRW(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeRW, fnc, args, opts)
}

return future.GetTyped(&[]interface{}{&stub, result})
}, nil
}
// CallRE is an alias for Call with VshardRouterCallModeRE.
func (r *Router) CallRE(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeRE, fnc, args, opts)
}

// CallBRO is an alias for Call with VshardRouterCallModeBRO.
func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeBRO, fnc, args, opts)
}

// CallBRE is an alias for Call with VshardRouterCallModeBRE.
func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeBRE, fnc, args, opts)
}

// RouterMapCallRWOptions sets options for RouterMapCallRW.
Expand Down Expand Up @@ -487,7 +627,7 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context,
) (map[uuid.UUID]T, error) {
const vshardStorageServiceCall = "vshard.storage._call"

timeout := CallTimeoutMin
timeout := callTimeoutDefault
if opts.Timeout > 0 {
timeout = opts.Timeout
}
Expand Down
2 changes: 1 addition & 1 deletion api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
conn: mPool,
})

_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second}, "test", []byte("test"))
_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second, VshardMode: ReadMode}, "test", []byte("test"))
require.ErrorIs(t, err, futureError)
})
}
Loading
Loading