Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

RouterCallImpl: fix unpacking 'vshard.storage.call' response #35

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func (r *Router) RouterCallImpl(ctx context.Context,
opts CallOpts,
fnc string,
args interface{}) (interface{}, StorageResultTypedFunc, error) {

if bucketID > r.cfg.TotalBucketCount {
return nil, nil, fmt.Errorf("bucket is unreachable: bucket id is out of range")
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

if opts.Timeout == 0 {
Expand Down Expand Up @@ -117,19 +118,26 @@ func (r *Router) RouterCallImpl(ctx context.Context,
continue
}

r.log().Info(ctx, fmt.Sprintf("try call replicaset %s", rs.info.Name))
r.log().Info(ctx, fmt.Sprintf("try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID))

future := rs.conn.Do(req, opts.PoolMode)
respData, err := future.Get()

var respData []interface{}
respData, err = future.Get()
if err != nil {
r.log().Error(ctx, fmt.Sprintf("got future error: %s", err))
r.metrics().RetryOnCall("future_get_error")

continue
}

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

if len(respData) < 1 {
err = fmt.Errorf("invalid length of response data: must be >1, current: %d", len(respData))
// vshard.storage.call(func) returns up to two values:
// - true/false
// - func result, omitted if func does not return anything
err = fmt.Errorf("invalid length of response data: must be >= 1, current: %d", len(respData))

r.log().Error(ctx, err.Error())

Expand All @@ -140,11 +148,18 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if respData[0] == nil {
vshardErr := &StorageCallVShardError{}

err = mapstructure.Decode(respData[1], vshardErr)
if len(respData) < 2 {
err = fmt.Errorf("unexpected response length when respData[0] == nil: %d", len(respData))
} else {
err = mapstructure.Decode(respData[1], vshardErr)
}

if err != nil {
r.metrics().RetryOnCall("internal_error")

r.log().Error(ctx, fmt.Sprintf("cant decode vhsard err by trarantool with err: %s; continue try", err))
err = fmt.Errorf("cant decode vhsard err by trarantool with err: %s; continue try", err)

r.log().Error(ctx, err.Error())
continue
}

Expand Down Expand Up @@ -175,19 +190,28 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if !isVShardRespOk { // error
errorResp := &StorageCallAssertError{}

err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
if len(respData) < 2 {
err = fmt.Errorf("unexpected response length when respData[0] == nil: %d", len(respData))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resp data cant be nil, cause we check it before

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and add few comments pls

} else {
err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
}

if err != nil {
err = fmt.Errorf("cant get typed vshard err with err: %s", err)
} else {
KaymeKaydex marked this conversation as resolved.
Show resolved Hide resolved
err = errorResp
}

err = errorResp
continue
}

r.metrics().RequestDuration(time.Since(timeStart), true, false)

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

return respData[1:], func(result interface{}) error {
if len(respData) < 2 {
return nil
}

var stub interface{}

return future.GetTyped(&[]interface{}{&stub, result})
Expand Down
2 changes: 1 addition & 1 deletion vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func validateCfg(cfg Config) error {
}

if cfg.TotalBucketCount == 0 {
return fmt.Errorf("bucket count must be grather then 0")
return fmt.Errorf("bucket count must be greater than 0")
}

return nil
Expand Down
Loading