diff --git a/CHANGELOG.md b/CHANGELOG.md index c41367c..2a32e84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ ## Unreleased +BUG FIXES: + +* RouterCallImpl: fix decoding the response from vshard.storage.call +* RouterCallImpl: do not return nil error when StorageCallAssertError has happened + FEATURES: * Added etcd v2 topology provider implementation (#16) diff --git a/api.go b/api.go index 52f87ec..d7b1666 100644 --- a/api.go +++ b/api.go @@ -73,8 +73,9 @@ func (r *Router) RouterCallImpl(ctx context.Context, opts CallOpts, fnc string, args interface{}) (interface{}, StorageResultTypedFunc, error) { + if bucketID > r.cfg.TotalBucketCount { - return nil, nil, fmt.Errorf("bucket is unreachable: bucket id is out of range") + return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) } if opts.Timeout == 0 { @@ -117,10 +118,12 @@ func (r *Router) RouterCallImpl(ctx context.Context, continue } - r.log().Info(ctx, fmt.Sprintf("try call replicaset %s", rs.info.Name)) + r.log().Info(ctx, fmt.Sprintf("try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)) future := rs.conn.Do(req, opts.PoolMode) - respData, err := future.Get() + + var respData []interface{} + respData, err = future.Get() if err != nil { r.log().Error(ctx, fmt.Sprintf("got future error: %s", err)) r.metrics().RetryOnCall("future_get_error") @@ -128,8 +131,13 @@ func (r *Router) RouterCallImpl(ctx context.Context, continue } + r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData)) + if len(respData) < 1 { - err = fmt.Errorf("invalid length of response data: must be >1, current: %d", len(respData)) + // vshard.storage.call(func) returns up to two values: + // - true/false + // - func result, omitted if func does not return anything + err = fmt.Errorf("invalid length of response data: must be >= 1, current: %d", len(respData)) r.log().Error(ctx, err.Error()) @@ -140,11 +148,18 @@ func (r *Router) RouterCallImpl(ctx context.Context, if respData[0] == nil { vshardErr := &StorageCallVShardError{} - err = mapstructure.Decode(respData[1], vshardErr) + if len(respData) < 2 { + err = fmt.Errorf("unexpected response length when respData[0] == nil: %d", len(respData)) + } else { + err = mapstructure.Decode(respData[1], vshardErr) + } + if err != nil { r.metrics().RetryOnCall("internal_error") - r.log().Error(ctx, fmt.Sprintf("cant decode vhsard err by trarantool with err: %s; continue try", err)) + err = fmt.Errorf("cant decode vhsard err by trarantool with err: %s; continue try", err) + + r.log().Error(ctx, err.Error()) continue } @@ -175,19 +190,33 @@ func (r *Router) RouterCallImpl(ctx context.Context, if !isVShardRespOk { // error errorResp := &StorageCallAssertError{} - err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp}) + // Since we got respData[0] == false, it means that assert has happened + // while executing user-defined function on vshard storage. + // In this case, vshard storage must return a pair: false, error. + if len(respData) < 2 { + err = fmt.Errorf("protocol violation: unexpected response length when respData[0] == false: %d", len(respData)) + } else { + err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp}) + } + if err != nil { + // Either protocol has been violated or decoding has failed. err = fmt.Errorf("cant get typed vshard err with err: %s", err) + } else { + // StorageCallAssertError successfully has been decoded. + err = errorResp } - err = errorResp + continue } r.metrics().RequestDuration(time.Since(timeStart), true, false) - r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData)) - return respData[1:], func(result interface{}) error { + if len(respData) < 2 { + return nil + } + var stub interface{} return future.GetTyped(&[]interface{}{&stub, result}) diff --git a/vshard.go b/vshard.go index 35ee282..a3e7021 100644 --- a/vshard.go +++ b/vshard.go @@ -207,7 +207,7 @@ func validateCfg(cfg Config) error { } if cfg.TotalBucketCount == 0 { - return fmt.Errorf("bucket count must be grather then 0") + return fmt.Errorf("bucket count must be greater than 0") } return nil