Skip to content
This repository was archived by the owner on Mar 9, 2025. It is now read-only.

Commit b22112f

Browse files
resolve issue #66
* retry tarantool request only on some vshard errors * do not export anymore storageCallAssertError, bucketStatError
1 parent 38cc28e commit b22112f

File tree

7 files changed

+79
-116
lines changed

7 files changed

+79
-116
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ FEATURES:
1717
* All discovering logs has new prefix [DISCOVERY]
1818
* Introduce Replicaset.CallAsync, it is usefull to send concurrent requests to replicasets;
1919
additionally, CallAsync provides new interface to interact with replicaset without cons of interface of ReplicaCall
20+
* Retry tarantool request only on some vshard errors (#66)
2021

2122
REFACTOR:
2223

@@ -28,6 +29,7 @@ REFACTOR:
2829
* BucketStat: split into bucketStatAsync and bucketStatWait parts
2930
* BucketDiscovery: do not spawn goroutines, just use futures in the single goroutine
3031
* BucketResolve: make it alias for BucketDiscovery
32+
* do not export anymore storageCallAssertError, bucketStatError
3133

3234
TESTS:
3335

api.go

Lines changed: 48 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,17 @@ func (c VshardMode) String() string {
2727
return string(c)
2828
}
2929

30-
type StorageCallAssertError struct {
30+
type storageCallAssertError struct {
3131
Code int `msgpack:"code"`
3232
BaseType string `msgpack:"base_type"`
3333
Type string `msgpack:"type"`
3434
Message string `msgpack:"message"`
3535
Trace interface{} `msgpack:"trace"`
3636
}
3737

38-
func (s StorageCallAssertError) Error() string {
39-
return fmt.Sprintf("vshard.storage.call assert error code: %d, type:%s, message: %s", s.Code, s.Type, s.Message)
38+
func (s storageCallAssertError) Error() string {
39+
type alias storageCallAssertError
40+
return fmt.Sprintf("%+v", alias(s))
4041
}
4142

4243
type StorageCallVShardError struct {
@@ -51,7 +52,11 @@ type StorageCallVShardError struct {
5152
}
5253

5354
func (s StorageCallVShardError) Error() string {
54-
return fmt.Sprintf("vshard.storage.call bucket error bucket_id: %d, reason: %s, name: %s", s.BucketID, s.Reason, s.Name)
55+
// Just print struct as is, use hack with alias type to avoid recursion:
56+
// %v attempts to call Error() method for s, which is recursion.
57+
// This alias doesn't have method Error().
58+
type alias StorageCallVShardError
59+
return fmt.Sprintf("%+v", alias(s))
5560
}
5661

5762
type StorageResultTypedFunc = func(result interface{}) error
@@ -76,6 +81,8 @@ func (r *Router) RouterCallImpl(ctx context.Context,
7681
fnc string,
7782
args interface{}) (interface{}, StorageResultTypedFunc, error) {
7883

84+
const vshardStorageClientCall = "vshard.storage.call"
85+
7986
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
8087
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
8188
}
@@ -87,7 +94,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
8794
timeout := opts.Timeout
8895
timeStart := time.Now()
8996

90-
req := tarantool.NewCallRequest("vshard.storage.call")
97+
req := tarantool.NewCallRequest(vshardStorageClientCall)
9198
req = req.Context(ctx)
9299
req = req.Args([]interface{}{
93100
bucketID,
@@ -97,6 +104,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
97104
})
98105

99106
var err error
107+
var vshardError StorageCallVShardError
100108

101109
for {
102110
if since := time.Since(timeStart); since > timeout {
@@ -114,10 +122,9 @@ func (r *Router) RouterCallImpl(ctx context.Context,
114122

115123
rs, err = r.BucketResolve(ctx, bucketID)
116124
if err != nil {
117-
r.log().Debugf(ctx, "cant resolve bucket %d with error: %s", bucketID, err.Error())
118-
119125
r.metrics().RetryOnCall("bucket_resolve_error")
120-
continue
126+
127+
return nil, nil, fmt.Errorf("cant resolve bucket %d: %w", bucketID, err)
121128
}
122129

123130
r.log().Infof(ctx, "try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)
@@ -127,97 +134,73 @@ func (r *Router) RouterCallImpl(ctx context.Context,
127134
var respData []interface{}
128135
respData, err = future.Get()
129136
if err != nil {
130-
r.log().Errorf(ctx, "got future error: %s", err)
131137
r.metrics().RetryOnCall("future_get_error")
132138

133-
continue
139+
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
134140
}
135141

136-
r.log().Debugf(ctx, "got call result response data %s", respData)
142+
r.log().Debugf(ctx, "got call result response data %v", respData)
137143

138-
if len(respData) < 1 {
144+
if len(respData) == 0 {
139145
// vshard.storage.call(func) returns up to two values:
140-
// - true/false
146+
// - true/false/nil
141147
// - func result, omitted if func does not return anything
142-
err = fmt.Errorf("invalid length of response data: must be >= 1, current: %d", len(respData))
143-
144-
r.log().Errorf(ctx, "%s", err.Error())
145-
146148
r.metrics().RetryOnCall("resp_data_error")
147-
continue
149+
150+
return nil, nil, fmt.Errorf("protocol violation %s: got empty response", vshardStorageClientCall)
148151
}
149152

150153
if respData[0] == nil {
151-
vshardErr := &StorageCallVShardError{}
152-
153-
if len(respData) < 2 {
154-
err = fmt.Errorf("unexpected response length when respData[0] == nil: %d", len(respData))
155-
} else {
156-
err = mapstructure.Decode(respData[1], vshardErr)
154+
if len(respData) != 2 {
155+
return nil, nil, fmt.Errorf("protocol violation %s: length is %d when respData[0] is nil", vshardStorageClientCall, len(respData))
157156
}
158157

158+
err = mapstructure.Decode(respData[1], &vshardError)
159159
if err != nil {
160+
// Something unexpected happened: we couldn't decode respData[1] as a vshardError,
161+
// so return reason why and respData[1], that is supposed to be a vshardError.
160162
r.metrics().RetryOnCall("internal_error")
161163

162-
err = fmt.Errorf("cant decode vhsard err by trarantool with err: %s; continue try", err)
163-
164-
r.log().Errorf(ctx, "%s", err.Error())
165-
continue
164+
return nil, nil, fmt.Errorf("cant decode vhsard err by trarantool with err: %v (%v)", err, respData[1])
166165
}
167166

168-
err = vshardErr
169-
170-
r.log().Errorf(ctx, "got vshard storage call error: %s", err)
171-
172-
if vshardErr.Name == "WRONG_BUCKET" ||
173-
vshardErr.Name == "BUCKET_IS_LOCKED" ||
174-
vshardErr.Name == "TRANSFER_IS_IN_PROGRESS" {
167+
switch vshardError.Name {
168+
case "WRONG_BUCKET", "BUCKET_IS_LOCKED", "TRANSFER_IS_IN_PROGRESS":
175169
r.BucketReset(bucketID)
176170
r.metrics().RetryOnCall("bucket_migrate")
177171

178-
r.log().Debugf(ctx, "retrying cause bucket in migrate state (%s)", vshardErr.Name)
172+
r.log().Debugf(ctx, "retrying %s cause bucket in migrate state (%s)", fnc, vshardError.Error())
179173

174+
// this vshardError will be returned to a caller in case of timeout
175+
err = &vshardError
180176
continue
181177
}
182178

183-
continue
179+
return nil, nil, &vshardError
184180
}
185181

186-
isVShardRespOk := false
182+
var isVShardRespOk bool
187183
err = future.GetTyped(&[]interface{}{&isVShardRespOk})
188184
if err != nil {
189-
r.log().Debugf(ctx, "cant get typed with err: %s", err)
190-
191-
continue
185+
return nil, nil, fmt.Errorf("protocol violation %s: can't decode respData[0] as boolean: %v", vshardStorageClientCall, err)
192186
}
193187

194-
if !isVShardRespOk { // error
195-
errorResp := &StorageCallAssertError{}
196-
197-
// Since we got respData[0] == false, it means that assert has happened
188+
if !isVShardRespOk {
189+
// Since we got respData[0] == false, it means that an error has happened
198190
// while executing user-defined function on vshard storage.
199191
// In this case, vshard storage must return a pair: false, error.
200-
if len(respData) < 2 {
201-
err = fmt.Errorf("protocol violation: unexpected response length when respData[0] == false: %d", len(respData))
202-
} else {
203-
err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
192+
if len(respData) != 2 {
193+
return nil, nil, fmt.Errorf("protocol violation %s: response length is %d when respData[0] is false", vshardStorageClientCall, len(respData))
204194
}
205195

196+
var assertError storageCallAssertError
197+
err = mapstructure.Decode(respData[1], &assertError)
206198
if err != nil {
207-
// Either protocol has been violated or decoding has failed.
208-
err = fmt.Errorf("cant get typed vshard err with err: %s", err)
209-
} else {
210-
// StorageCallAssertError successfully has been decoded.
211-
err = errorResp
199+
// We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode.
200+
return nil, nil, fmt.Errorf("%s: %s failed %v (decoding to assertError failed %v)", vshardStorageClientCall, fnc, respData[1], err)
212201
}
213202

214-
if errorResp.Type == "ClientError" || errorResp.Type == "LuajitError" {
215-
return nil, nil, errorResp
216-
}
217-
218-
r.log().Debugf(ctx, "retry cause vhsard response not ok: %s", errorResp)
219-
220-
continue
203+
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, assertError)
221204
}
222205

223206
r.metrics().RequestDuration(time.Since(timeStart), true, false)
@@ -363,14 +346,14 @@ func (r *Router) RouterMapCallRWImpl(
363346
return nil, fmt.Errorf("protocol violation: invalid respData length when respData[0] == nil, must be = 2, current: %d", len(respData))
364347
}
365348

366-
assertError := &StorageCallAssertError{}
367-
err = mapstructure.Decode(respData[1], assertError)
349+
var assertError storageCallAssertError
350+
err = mapstructure.Decode(respData[1], &assertError)
368351
if err != nil {
369-
// TODO: not only StorageCallAssertError is possible here?
352+
// We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode.
370353
return nil, fmt.Errorf("storage_map failed on %v: %+v (decoding to assertError failed %v)", rsFuture.uuid, respData[1], err)
371354
}
372355

373-
return nil, fmt.Errorf("storage_map failed on %v: %w", rsFuture.uuid, assertError)
356+
return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.uuid, assertError)
374357
}
375358

376359
var isVShardRespOk bool

api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ func TestRouter_RouterCallImpl(t *testing.T) {
6868
})
6969

7070
_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second}, "test", []byte("test"))
71-
require.ErrorIs(t, futureError, err)
71+
require.ErrorIs(t, err, futureError)
7272
})
7373
}

error.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ var Errors = map[int]Error{
224224
},
225225
}
226226

227-
type BucketStatError struct {
227+
type bucketStatError struct {
228228
BucketID uint64 `msgpack:"bucket_id"`
229229
Reason string `msgpack:"reason"`
230230
Code int `msgpack:"code"`
@@ -233,6 +233,7 @@ type BucketStatError struct {
233233
Name string `msgpack:"name"`
234234
}
235235

236-
func (bse BucketStatError) Error() string {
237-
return "todo"
236+
func (bse bucketStatError) Error() string {
237+
type alias bucketStatError
238+
return fmt.Sprintf("%+v", alias(bse))
238239
}

replicaset.go

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,20 @@ func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
6060
return bsInfo, err
6161
}
6262

63-
if len(respData) < 1 {
64-
return bsInfo, fmt.Errorf("respData len is 0 for bucketStatWait; unsupported or broken proto")
63+
if len(respData) == 0 {
64+
return bsInfo, fmt.Errorf("protocol violation bucketStatWait: empty response")
6565
}
6666

6767
if respData[0] == nil {
68-
69-
if len(respData) < 2 {
70-
return bsInfo, fmt.Errorf("respData len < 2 when respData[0] is nil for bucketStatWait")
68+
if len(respData) != 2 {
69+
return bsInfo, fmt.Errorf("protocol violation bucketStatWait: invalid response length %d when respData[0] is nil", len(respData))
7170
}
7271

73-
var tmp interface{} // todo: fix non-panic crutch
74-
bsError := &BucketStatError{}
75-
76-
err := future.GetTyped(&[]interface{}{tmp, bsError})
72+
var bsError bucketStatError
73+
err = mapstructure.Decode(respData[1], &bsError)
7774
if err != nil {
78-
return bsInfo, err
75+
// We could not decode respData[1] as bsError, so return respData[1] as is, add info why we could not decode.
76+
return bsInfo, fmt.Errorf("bucketStatWait error: %v (can't decode into bsError: %v)", respData[1], err)
7977
}
8078

8179
return bsInfo, bsError
@@ -106,42 +104,28 @@ func (rs *Replicaset) ReplicaCall(
106104
timeout = opts.Timeout
107105
}
108106

109-
timeStart := time.Now()
110-
111107
ctx, cancel := context.WithTimeout(ctx, timeout)
112108
defer cancel()
113109

114110
req := tarantool.NewCallRequest(fnc).
115111
Context(ctx).
116112
Args(args)
117113

118-
var (
119-
respData []interface{}
120-
err error
121-
)
122-
123-
for {
124-
if since := time.Since(timeStart); since > timeout {
125-
return nil, nil, err
126-
}
127-
128-
future := rs.conn.Do(req, opts.PoolMode)
129-
130-
respData, err = future.Get()
131-
if err != nil {
132-
continue
133-
}
114+
future := rs.conn.Do(req, opts.PoolMode)
134115

135-
if len(respData) == 0 {
136-
// Since this method returns the first element of respData by contract, we can't return anything is this case (broken interface)
137-
err = fmt.Errorf("response data is empty")
138-
continue
139-
}
116+
respData, err := future.Get()
117+
if err != nil {
118+
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
119+
}
140120

141-
return respData[0], func(result interface{}) error {
142-
return future.GetTyped(&[]interface{}{&result})
143-
}, nil
121+
if len(respData) == 0 {
122+
// Since this method returns the first element of respData by contract, we can't return anything is this case (broken interface)
123+
return nil, nil, fmt.Errorf("%s response data is empty", fnc)
144124
}
125+
126+
return respData[0], func(result interface{}) error {
127+
return future.GetTyped(&[]interface{}{&result})
128+
}, nil
145129
}
146130

147131
// Call sends async request to remote storage

tests/tnt/replicaset_test.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package tnt
22

33
import (
44
"context"
5-
"log"
65
"testing"
76
"time"
87

@@ -14,10 +13,7 @@ import (
1413
)
1514

1615
func TestReplicasetReplicaCall(t *testing.T) {
17-
if !isCorrectRun() {
18-
log.Printf("Incorrect run of tnt-test framework")
19-
return
20-
}
16+
skipOnInvalidRun(t)
2117

2218
t.Parallel()
2319

@@ -101,10 +97,7 @@ func TestReplicasetReplicaCall(t *testing.T) {
10197
}
10298

10399
func TestReplicsetCallAsync(t *testing.T) {
104-
if !isCorrectRun() {
105-
log.Printf("Incorrect run of tnt-test framework")
106-
return
107-
}
100+
skipOnInvalidRun(t)
108101

109102
t.Parallel()
110103

tests/tnt/router_call_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ func TestRouterCallProto(t *testing.T) {
5757
require.NotNil(t, err, "RouterCallImpl echo finised with nil args")
5858

5959
_, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "raise_luajit_error", args)
60-
require.IsType(t, &vshardrouter.StorageCallAssertError{}, err, "RouterCallImpl raise_luajit_error finished with StorageCallAssertError")
60+
require.NotNil(t, err, "RouterCallImpl raise_luajit_error finished with err")
6161

6262
_, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "raise_client_error", args)
63-
require.IsType(t, &vshardrouter.StorageCallAssertError{}, err, "RouterCallImpl raise_client_error finished with StorageCallAssertError")
63+
require.NotNil(t, err, "RouterCallImpl raise_client_error finished with err")
6464

6565
// maxRespLen is due to:
6666
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130

0 commit comments

Comments
 (0)