Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
RouterCallImpl: fix unpacking 'vshard.storage.call' response
Browse files Browse the repository at this point in the history
* handle case when len(respData) < 2
* more informative logs and typo fix
* fix "err" redeclaring in inner block
* add missed "continue" operator in case when isVShardRespOk is false
  • Loading branch information
nurzhan-saktaganov committed Aug 8, 2024
1 parent 84c49a6 commit 10c8cbb
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
44 changes: 34 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func (r *Router) RouterCallImpl(ctx context.Context,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

if bucketID > r.cfg.TotalBucketCount {
return nil, nil, fmt.Errorf("bucket is unreachable: bucket id is out of range")
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

if opts.Timeout == 0 {
Expand Down Expand Up @@ -117,19 +118,26 @@ func (r *Router) RouterCallImpl(ctx context.Context,
continue
}

r.log().Info(ctx, fmt.Sprintf("try call replicaset %s", rs.info.Name))
r.log().Info(ctx, fmt.Sprintf("try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID))

future := rs.conn.Do(req, opts.PoolMode)
respData, err := future.Get()

var respData []interface{}
respData, err = future.Get()
if err != nil {
r.log().Error(ctx, fmt.Sprintf("got future error: %s", err))
r.metrics().RetryOnCall("future_get_error")

continue
}

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

if len(respData) < 1 {
err = fmt.Errorf("invalid length of response data: must be >1, current: %d", len(respData))
// vshard.storage.call(func) returns up to two values:
// - true/false
// - func result, omitted if func does not return anything
err = fmt.Errorf("invalid length of response data: must be >= 1, current: %d", len(respData))

r.log().Error(ctx, err.Error())

Expand All @@ -140,11 +148,18 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if respData[0] == nil {
vshardErr := &StorageCallVShardError{}

err = mapstructure.Decode(respData[1], vshardErr)
if len(respData) < 2 {
err = fmt.Errorf("unexpected response length when respData[0] == nil: %d", len(respData))
} else {
err = mapstructure.Decode(respData[1], vshardErr)
}

if err != nil {
r.metrics().RetryOnCall("internal_error")

r.log().Error(ctx, fmt.Sprintf("cant decode vhsard err by trarantool with err: %s; continue try", err))
err = fmt.Errorf("cant decode vhsard err by trarantool with err: %s; continue try", err)

r.log().Error(ctx, err.Error())
continue
}

Expand Down Expand Up @@ -175,19 +190,28 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if !isVShardRespOk { // error
errorResp := &StorageCallAssertError{}

err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
if len(respData) < 2 {
err = fmt.Errorf("unexpected response length when respData[0] == nil: %d", len(respData))
} else {
err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
}

if err != nil {
err = fmt.Errorf("cant get typed vshard err with err: %s", err)
} else {
err = errorResp
}

err = errorResp
continue
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

return respData[1:], func(result interface{}) error {
if len(respData) < 2 {
return nil
}

var stub interface{}

return future.GetTyped(&[]interface{}{&stub, result})
Expand Down
2 changes: 1 addition & 1 deletion vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func validateCfg(cfg Config) error {
}

if cfg.TotalBucketCount == 0 {
return fmt.Errorf("bucket count must be grather then 0")
return fmt.Errorf("bucket count must be greater than 0")
}

return nil
Expand Down

0 comments on commit 10c8cbb

Please sign in to comment.