Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #110
Browse files Browse the repository at this point in the history
* new methods 'Router.Call[XYZ]' to replace the deprecated one 'RouterCallImpl'
* new backward-compatible signature for StorageResultTypedFunc to fix interface for RouterCallImpl
  • Loading branch information
nurzhan-saktaganov committed Dec 18, 2024
1 parent be1cfc4 commit 389cfdf
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 67 deletions.
240 changes: 190 additions & 50 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type VshardMode string
const (
ReadMode VshardMode = "read"
WriteMode VshardMode = "write"

// callTimeoutDefault is a default timeout when no timeout is provided
callTimeoutDefault = 500 * time.Millisecond
)

func (c VshardMode) String() string {
Expand All @@ -31,7 +34,7 @@ func (c VshardMode) String() string {
type vshardStorageCallResponseProto struct {
AssertError *assertError // not nil if there is assert error
VshardError *StorageCallVShardError // not nil if there is vshard response
Data []interface{} // raw response data
CallResp VshardRouterCallResp
}

func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -115,14 +118,13 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error
}

// isVShardRespOk is true
r.Data = make([]interface{}, 0, respArrayLen-1)

r.CallResp.rawMessages = make([]msgpack.RawMessage, 0, respArrayLen-1)
for i := 1; i < respArrayLen; i++ {
elem, err := d.DecodeInterface()
elem, err := d.DecodeRaw()
if err != nil {
return fmt.Errorf("failed to decode into interface element #%d of response array", i+1)
return fmt.Errorf("failed to decode into msgpack.RawMessage element #%d of response array", i-1)
}
r.Data = append(r.Data, elem)
r.CallResp.rawMessages = append(r.CallResp.rawMessages, elem)
}

return nil
Expand Down Expand Up @@ -167,62 +169,180 @@ func (s StorageCallVShardError) Error() string {
return fmt.Sprintf("%+v", alias(s))
}

type StorageResultTypedFunc = func(result interface{}) error
type StorageResultTypedFunc = func(result ...interface{}) error

type CallOpts struct {
VshardMode VshardMode // vshard mode in call
PoolMode pool.Mode
Timeout time.Duration
}

// revive warns us: time-naming: var CallTimeoutMin is of type time.Duration; don't use unit-specific suffix "Min".
// But the original lua vshard implementation uses this naming, so we use it too.
//
//nolint:revive
const CallTimeoutMin = time.Second / 2
// VshardRouterCallMode is a type to represent call mode for Router.Call method.
type VshardRouterCallMode int

const (
// VshardRouterCallModeRO sets a read-only mode for Router.Call.
VshardRouterCallModeRO VshardRouterCallMode = iota
// VshardRouterCallModeRW sets a read-write mode for Router.Call.
VshardRouterCallModeRW
// VshardRouterCallModeRE acts like VshardRouterCallModeRO
// with preference for a replica rather than a master.
// This mode is not supported yet.
VshardRouterCallModeRE
// VshardRouterCallModeBRO acts like VshardRouterCallModeRO with balancing.
VshardRouterCallModeBRO
// VshardRouterCallModeBRE acts like VshardRouterCallModeRO with balancing
// and preference for a replica rather than a master.
VshardRouterCallModeBRE
)

// VshardRouterCallOptions represents options to Router.Call[XXX] methods.
type VshardRouterCallOptions struct {
Timeout time.Duration
}

// VshardRouterCallResp represents a response from Router.Call[XXX] methods.
type VshardRouterCallResp struct {
rawMessages []msgpack.RawMessage
}

// Get returns a response from user defined function as []interface{}.
func (r VshardRouterCallResp) Get() ([]interface{}, error) {
resp := make([]interface{}, len(r.rawMessages))
return resp, r.GetTyped(resp)
}

// GetTyped decodes a reponse from user defined fuction into custom values.

Check failure on line 215 in api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

`reponse` is a misspelling of `response` (misspell)
func (r VshardRouterCallResp) GetTyped(result []interface{}) error {
minLen := len(result)
if dataLen := len(r.rawMessages); dataLen < minLen {
minLen = dataLen
}

for i := 0; i < minLen; i++ {
if err := msgpack.Unmarshal(r.rawMessages[i], &result[i]); err != nil {
return fmt.Errorf("failed to decode into result[%d] element #%d of response array: %w", i, i, err)
}
}

return nil
}

// RouterCallImpl Perform shard operation function will restart operation
// after wrong bucket response until timeout is reached
// Deprecated: RouterCallImpl is deprecated.
// See https://github.com/KaymeKaydex/go-vshard-router/issues/110.
// Use Call, Call[RO, RW, RE, BRO, BRE] methods instead.
func (r *Router) RouterCallImpl(ctx context.Context,
bucketID uint64,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

var vshardCallOpts = VshardRouterCallOptions{
Timeout: opts.Timeout,
}

var vshardCallMode VshardRouterCallMode

switch opts.VshardMode {
case WriteMode:
vshardCallMode = VshardRouterCallModeRW
case ReadMode:
switch opts.PoolMode {
case pool.ANY:
vshardCallMode = VshardRouterCallModeBRO
case pool.RO:
vshardCallMode = VshardRouterCallModeRO
case pool.RW:
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
case pool.PreferRO:
vshardCallMode = VshardRouterCallModeBRE
case pool.PreferRW:
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
default:
return nil, nil, fmt.Errorf("unexpected opts.PoolMode %v", opts.PoolMode)
}
default:
return nil, nil, fmt.Errorf("unexpected opts.VshardMode %v", opts.VshardMode)
}

vshardCallResp, err := r.Call(ctx, bucketID, vshardCallMode, fnc, args, vshardCallOpts)
if err != nil {
return nil, nil, err
}

data, err := vshardCallResp.Get()
if err != nil {
return nil, nil, err
}

return data, func(result ...interface{}) error {
return vshardCallResp.GetTyped(result)
}, nil
}

// Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.
func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardRouterCallMode,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
const vshardStorageClientCall = "vshard.storage.call"

if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
return VshardRouterCallResp{}, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
var poolMode pool.Mode
var vshardMode VshardMode

switch mode {
case VshardRouterCallModeRO:
poolMode, vshardMode = pool.RO, ReadMode
case VshardRouterCallModeRW:
poolMode, vshardMode = pool.RW, WriteMode
case VshardRouterCallModeRE:
// poolMode, vshardMode = pool.PreferRO, ReadMode
// since go-tarantool always use balance=true politic,
// we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
case VshardRouterCallModeBRO:
poolMode, vshardMode = pool.ANY, ReadMode
case VshardRouterCallModeBRE:
poolMode, vshardMode = pool.PreferRO, ReadMode
default:
return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
}

timeout := opts.Timeout
timeStart := time.Now()
timeout := callTimeoutDefault
if opts.Timeout > 0 {
timeout = opts.Timeout
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

tntReq := tarantool.NewCallRequest(vshardStorageClientCall).
Context(ctx).
Args([]interface{}{
bucketID,
vshardMode,
fnc,
args,
})

req := tarantool.NewCallRequest(vshardStorageClientCall)
req = req.Context(ctx)
req = req.Args([]interface{}{
bucketID,
opts.VshardMode.String(),
fnc,
args,
})
requestStartTime := time.Now()

var err error

for {
if since := time.Since(timeStart); since > timeout {
r.metrics().RequestDuration(since, false, false)
if spent := time.Since(requestStartTime); spent > timeout {
r.metrics().RequestDuration(spent, false, false)

r.log().Debugf(ctx, "Return result on timeout; since %s of timeout %s", since, timeout)
r.log().Debugf(ctx, "Return result on timeout; spent %s of timeout %s", spent, timeout)
if err == nil {
err = fmt.Errorf("cant get call cause call impl timeout")
}

return nil, nil, err
return VshardRouterCallResp{}, err
}

var rs *Replicaset
Expand All @@ -242,18 +362,16 @@ func (r *Router) RouterCallImpl(ctx context.Context,

r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)

future := rs.conn.Do(req, opts.PoolMode)

var storageCallResponse vshardStorageCallResponseProto
err = future.GetTyped(&storageCallResponse)
err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
if err != nil {
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
}

r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse)

if storageCallResponse.AssertError != nil {
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
return VshardRouterCallResp{}, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
}

if storageCallResponse.VshardError != nil {
Expand All @@ -267,7 +385,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if vshardError.Destination != "" {
destinationUUID, err := uuid.Parse(vshardError.Destination)
if err != nil {
return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
return VshardRouterCallResp{}, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
vshardStorageClientCall, vshardError, err)
}

Expand All @@ -291,8 +409,8 @@ func (r *Router) RouterCallImpl(ctx context.Context,
const defaultPoolingPause = 50 * time.Millisecond
time.Sleep(defaultPoolingPause)

if time.Since(timeStart) > timeout {
return nil, nil, vshardError
if spent := time.Since(requestStartTime); spent > timeout {
return VshardRouterCallResp{}, vshardError
}
}
}
Expand All @@ -311,30 +429,52 @@ func (r *Router) RouterCallImpl(ctx context.Context,
// There is a comment why lua vshard router doesn't retry:
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
r.BucketReset(bucketID)
return nil, nil, vshardError
return VshardRouterCallResp{}, vshardError
case VShardErrNameNonMaster:
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
// we just return this error as is.
return nil, nil, vshardError
return VshardRouterCallResp{}, vshardError
default:
return nil, nil, vshardError
return VshardRouterCallResp{}, vshardError
}
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)
r.metrics().RequestDuration(time.Since(requestStartTime), true, false)

return storageCallResponse.Data, func(result interface{}) error {
if len(storageCallResponse.Data) == 0 {
return nil
}
return storageCallResponse.CallResp, nil
}
}

// CallRO is an alias for Call with VshardRouterCallModeRO.
func (r *Router) CallRO(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeRO, fnc, args, opts)
}

var stub bool
// CallRW is an alias for Call with VshardRouterCallModeRW.
func (r *Router) CallRW(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeRW, fnc, args, opts)
}

return future.GetTyped(&[]interface{}{&stub, result})
}, nil
}
// CallRE is an alias for Call with VshardRouterCallModeRE.
func (r *Router) CallRE(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeRE, fnc, args, opts)
}

// CallBRO is an alias for Call with VshardRouterCallModeBRO.
func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeBRO, fnc, args, opts)
}

// CallBRE is an alias for Call with VshardRouterCallModeBRE.
func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
return r.Call(ctx, bucketID, VshardRouterCallModeBRE, fnc, args, opts)
}

// RouterMapCallRWOptions sets options for RouterMapCallRW.
Expand Down Expand Up @@ -487,7 +627,7 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context,
) (map[uuid.UUID]T, error) {
const vshardStorageServiceCall = "vshard.storage._call"

timeout := CallTimeoutMin
timeout := callTimeoutDefault
if opts.Timeout > 0 {
timeout = opts.Timeout
}
Expand Down
2 changes: 1 addition & 1 deletion api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
conn: mPool,
})

_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second}, "test", []byte("test"))
_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second, VshardMode: ReadMode}, "test", []byte("test"))
require.ErrorIs(t, err, futureError)
})
}
Loading

0 comments on commit 389cfdf

Please sign in to comment.