Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

RouterCallImpl: fix unpacking 'vshard.storage.call' response #35

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Unreleased

BUG FIXES:

* RouterCallImpl: fix decoding the response from vshard.storage.call
* RouterCallImpl: do not return nil error when StorageCallAssertError has happened

FEATURES:

* Added etcd v2 topology provider implementation (#16)
Expand Down
49 changes: 39 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func (r *Router) RouterCallImpl(ctx context.Context,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

if bucketID > r.cfg.TotalBucketCount {
return nil, nil, fmt.Errorf("bucket is unreachable: bucket id is out of range")
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

if opts.Timeout == 0 {
Expand Down Expand Up @@ -117,19 +118,26 @@ func (r *Router) RouterCallImpl(ctx context.Context,
continue
}

r.log().Info(ctx, fmt.Sprintf("try call replicaset %s", rs.info.Name))
r.log().Info(ctx, fmt.Sprintf("try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID))

future := rs.conn.Do(req, opts.PoolMode)
respData, err := future.Get()

var respData []interface{}
respData, err = future.Get()
if err != nil {
r.log().Error(ctx, fmt.Sprintf("got future error: %s", err))
r.metrics().RetryOnCall("future_get_error")

continue
}

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

if len(respData) < 1 {
err = fmt.Errorf("invalid length of response data: must be >1, current: %d", len(respData))
// vshard.storage.call(func) returns up to two values:
// - true/false
// - func result, omitted if func does not return anything
err = fmt.Errorf("invalid length of response data: must be >= 1, current: %d", len(respData))

r.log().Error(ctx, err.Error())

Expand All @@ -140,11 +148,18 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if respData[0] == nil {
vshardErr := &StorageCallVShardError{}

err = mapstructure.Decode(respData[1], vshardErr)
if len(respData) < 2 {
err = fmt.Errorf("unexpected response length when respData[0] == nil: %d", len(respData))
} else {
err = mapstructure.Decode(respData[1], vshardErr)
}

if err != nil {
r.metrics().RetryOnCall("internal_error")

r.log().Error(ctx, fmt.Sprintf("cant decode vhsard err by trarantool with err: %s; continue try", err))
err = fmt.Errorf("cant decode vhsard err by trarantool with err: %s; continue try", err)

r.log().Error(ctx, err.Error())
continue
}

Expand Down Expand Up @@ -175,19 +190,33 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if !isVShardRespOk { // error
errorResp := &StorageCallAssertError{}

err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
// Since we got respData[0] == false, it means that assert has happened
// while executing user-defined function on vshard storage.
// In this case, vshard storage must return a pair: false, error.
if len(respData) < 2 {
err = fmt.Errorf("protocol violation: unexpected response length when respData[0] == false: %d", len(respData))
} else {
err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
}

if err != nil {
// Either protocol has been violated or decoding has failed.
err = fmt.Errorf("cant get typed vshard err with err: %s", err)
} else {
KaymeKaydex marked this conversation as resolved.
Show resolved Hide resolved
// StorageCallAssertError successfully has been decoded.
err = errorResp
}

err = errorResp
continue
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

return respData[1:], func(result interface{}) error {
if len(respData) < 2 {
return nil
}

var stub interface{}

return future.GetTyped(&[]interface{}{&stub, result})
Expand Down
2 changes: 1 addition & 1 deletion vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func validateCfg(cfg Config) error {
}

if cfg.TotalBucketCount == 0 {
return fmt.Errorf("bucket count must be grather then 0")
return fmt.Errorf("bucket count must be greater than 0")
}

return nil
Expand Down
Loading