Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
RouterCallImpl: fix unpacking 'vshard.storage.call' response (#35)
Browse files Browse the repository at this point in the history
* RouterCallImpl: fix unpacking 'vshard.storage.call' response

* handle case when len(respData) < 2
* more informative logs and typo fix
* fix "err" redeclaring in inner block
* add missed "continue" operator in case when isVShardRespOk is false

* review fixes for PR #35

* add more informative comments to RouterCallImpl
* update CHANGELOG.md
  • Loading branch information
nurzhan-saktaganov authored Aug 26, 2024
1 parent 84c49a6 commit 87be7d5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Unreleased

BUG FIXES:

* RouterCallImpl: fix decoding the response from vshard.storage.call
* RouterCallImpl: do not return nil error when StorageCallAssertError has happened

FEATURES:

* Added etcd v2 topology provider implementation (#16)
Expand Down
49 changes: 39 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func (r *Router) RouterCallImpl(ctx context.Context,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

if bucketID > r.cfg.TotalBucketCount {
return nil, nil, fmt.Errorf("bucket is unreachable: bucket id is out of range")
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

if opts.Timeout == 0 {
Expand Down Expand Up @@ -117,19 +118,26 @@ func (r *Router) RouterCallImpl(ctx context.Context,
continue
}

r.log().Info(ctx, fmt.Sprintf("try call replicaset %s", rs.info.Name))
r.log().Info(ctx, fmt.Sprintf("try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID))

future := rs.conn.Do(req, opts.PoolMode)
respData, err := future.Get()

var respData []interface{}
respData, err = future.Get()
if err != nil {
r.log().Error(ctx, fmt.Sprintf("got future error: %s", err))
r.metrics().RetryOnCall("future_get_error")

continue
}

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

if len(respData) < 1 {
err = fmt.Errorf("invalid length of response data: must be >1, current: %d", len(respData))
// vshard.storage.call(func) returns up to two values:
// - true/false
// - func result, omitted if func does not return anything
err = fmt.Errorf("invalid length of response data: must be >= 1, current: %d", len(respData))

r.log().Error(ctx, err.Error())

Expand All @@ -140,11 +148,18 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if respData[0] == nil {
vshardErr := &StorageCallVShardError{}

err = mapstructure.Decode(respData[1], vshardErr)
if len(respData) < 2 {
err = fmt.Errorf("unexpected response length when respData[0] == nil: %d", len(respData))
} else {
err = mapstructure.Decode(respData[1], vshardErr)
}

if err != nil {
r.metrics().RetryOnCall("internal_error")

r.log().Error(ctx, fmt.Sprintf("cant decode vhsard err by trarantool with err: %s; continue try", err))
err = fmt.Errorf("cant decode vhsard err by trarantool with err: %s; continue try", err)

r.log().Error(ctx, err.Error())
continue
}

Expand Down Expand Up @@ -175,19 +190,33 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if !isVShardRespOk { // error
errorResp := &StorageCallAssertError{}

err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
// Since we got respData[0] == false, it means that assert has happened
// while executing user-defined function on vshard storage.
// In this case, vshard storage must return a pair: false, error.
if len(respData) < 2 {
err = fmt.Errorf("protocol violation: unexpected response length when respData[0] == false: %d", len(respData))
} else {
err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
}

if err != nil {
// Either protocol has been violated or decoding has failed.
err = fmt.Errorf("cant get typed vshard err with err: %s", err)
} else {
// StorageCallAssertError successfully has been decoded.
err = errorResp
}

err = errorResp
continue
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

return respData[1:], func(result interface{}) error {
if len(respData) < 2 {
return nil
}

var stub interface{}

return future.GetTyped(&[]interface{}{&stub, result})
Expand Down
2 changes: 1 addition & 1 deletion vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func validateCfg(cfg Config) error {
}

if cfg.TotalBucketCount == 0 {
return fmt.Errorf("bucket count must be grather then 0")
return fmt.Errorf("bucket count must be greater than 0")
}

return nil
Expand Down

0 comments on commit 87be7d5

Please sign in to comment.