Skip to content
This repository was archived by the owner on Mar 9, 2025. It is now read-only.

Commit 7bfaa62

Browse files
resolve issue #110
- TODO: add description - TODO: add info into CHANGES file
1 parent be46c3a commit 7bfaa62

File tree

3 files changed

+193
-66
lines changed

3 files changed

+193
-66
lines changed

api.go

Lines changed: 191 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,19 @@ type VshardMode string
2323
const (
2424
ReadMode VshardMode = "read"
2525
WriteMode VshardMode = "write"
26+
27+
// callTimeoutDefault is a default timeout when no timeout is provided
28+
callTimeoutDefault = 500 * time.Millisecond
2629
)
2730

2831
func (c VshardMode) String() string {
2932
return string(c)
3033
}
3134

3235
type vshardStorageCallResponseProto struct {
33-
assertError *assertError // not nil if there is assert error
34-
vshardError *StorageCallVShardError // not nil if there is vshard response
35-
data []interface{} // raw response data
36+
AssertError *assertError // not nil if there is assert error
37+
VshardError *StorageCallVShardError // not nil if there is vshard response
38+
CallResp VshardCallResp
3639
}
3740

3841
func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
@@ -88,7 +91,7 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error
8891
return fmt.Errorf("failed to decode storage vshard error: %w", err)
8992
}
9093

91-
r.vshardError = &vshardError
94+
r.VshardError = &vshardError
9295

9396
return nil
9497
}
@@ -110,20 +113,19 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error
110113
return fmt.Errorf("failed to decode storage assert error: %w", err)
111114
}
112115

113-
r.assertError = &assertError
116+
r.AssertError = &assertError
114117

115118
return nil
116119
}
117120

118121
// isVShardRespOk is true
119-
r.data = make([]interface{}, 0, respArrayLen-1)
120-
122+
r.CallResp.data = make([]msgpack.RawMessage, 0, respArrayLen-1)
121123
for i := 1; i < respArrayLen; i++ {
122-
elem, err := d.DecodeInterface()
124+
elem, err := d.DecodeRaw()
123125
if err != nil {
124-
return fmt.Errorf("failed to decode into interface element #%d of response array", i+1)
126+
return fmt.Errorf("failed to decode into msgpack.RawMessage element #%d of response array", i-1)
125127
}
126-
r.data = append(r.data, elem)
128+
r.CallResp.data = append(r.CallResp.data, elem)
127129
}
128130

129131
return nil
@@ -176,54 +178,113 @@ type CallOpts struct {
176178
Timeout time.Duration
177179
}
178180

179-
// revive warns us: time-naming: var CallTimeoutMin is of type time.Duration; don't use unit-specific suffix "Min".
180-
// But the original lua vshard implementation uses this naming, so we use it too.
181-
//
182-
//nolint:revive
183-
const CallTimeoutMin = time.Second / 2
181+
type VshardCallMode int
184182

185-
// RouterCallImpl Perform shard operation function will restart operation
186-
// after wrong bucket response until timeout is reached
187-
func (r *Router) RouterCallImpl(ctx context.Context,
188-
bucketID uint64,
189-
opts CallOpts,
190-
fnc string,
191-
args interface{}) (interface{}, StorageResultTypedFunc, error) {
183+
const (
184+
VshardCallModeRO VshardCallMode = iota
185+
VshardCallModeRW
186+
VshardCallModeRE
187+
VshardCallModeBRO
188+
VshardCallModeBRE
189+
)
190+
191+
type VshardCallOptions struct {
192+
Timeout time.Duration
193+
}
194+
195+
type VshardCallResp struct {
196+
data []msgpack.RawMessage
197+
}
198+
199+
func (r VshardCallResp) Get() ([]interface{}, error) {
200+
resp := make([]interface{}, 0, len(r.data))
201+
202+
for i, rawMessage := range r.data {
203+
var v interface{}
204+
if err := msgpack.Unmarshal(rawMessage, &v); err != nil {
205+
return nil, fmt.Errorf("failed to decode into interface element #%d of response array: %w", i, err)
206+
}
207+
resp = append(resp, v)
208+
}
192209

210+
return resp, nil
211+
}
212+
213+
func (r VshardCallResp) GetTyped(result []interface{}) error {
214+
minLen := len(result)
215+
if dataLen := len(r.data); dataLen < minLen {
216+
minLen = dataLen
217+
}
218+
219+
for i := 0; i < minLen; i++ {
220+
if err := msgpack.Unmarshal(r.data[i], result[i]); err != nil {
221+
return fmt.Errorf("failed to decode into result[%d] element #%d of response array: %w", i, i, err)
222+
}
223+
}
224+
225+
return nil
226+
}
227+
228+
func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardCallMode,
229+
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
193230
const vshardStorageClientCall = "vshard.storage.call"
194231

195232
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
196-
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
233+
return VshardCallResp{}, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
197234
}
198235

199-
if opts.Timeout == 0 {
200-
opts.Timeout = CallTimeoutMin
236+
var poolMode pool.Mode
237+
var vshardMode VshardMode
238+
239+
switch mode {
240+
case VshardCallModeRO:
241+
poolMode, vshardMode = pool.RO, ReadMode
242+
case VshardCallModeRW:
243+
poolMode, vshardMode = pool.RW, WriteMode
244+
case VshardCallModeRE:
245+
// poolMode, vshardMode = pool.PreferRO, ReadMode
246+
// since go-tarantool always use balance=true politic,
247+
// we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
248+
return VshardCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
249+
case VshardCallModeBRO:
250+
poolMode, vshardMode = pool.ANY, ReadMode
251+
case VshardCallModeBRE:
252+
poolMode, vshardMode = pool.PreferRO, ReadMode
253+
default:
254+
return VshardCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
201255
}
202256

203-
timeout := opts.Timeout
204-
timeStart := time.Now()
257+
timeout := callTimeoutDefault
258+
if opts.Timeout > 0 {
259+
timeout = opts.Timeout
260+
}
205261

206-
req := tarantool.NewCallRequest(vshardStorageClientCall)
207-
req = req.Context(ctx)
208-
req = req.Args([]interface{}{
209-
bucketID,
210-
opts.VshardMode.String(),
211-
fnc,
212-
args,
213-
})
262+
ctx, cancel := context.WithTimeout(ctx, timeout)
263+
defer cancel()
264+
265+
tntReq := tarantool.NewCallRequest(vshardStorageClientCall).
266+
Context(ctx).
267+
Args([]interface{}{
268+
bucketID,
269+
vshardMode,
270+
fnc,
271+
args,
272+
})
273+
274+
requestStartTime := time.Now()
214275

215276
var err error
216277

217278
for {
218-
if since := time.Since(timeStart); since > timeout {
219-
r.metrics().RequestDuration(since, false, false)
279+
if spent := time.Since(requestStartTime); spent > timeout {
280+
r.metrics().RequestDuration(spent, false, false)
220281

221-
r.log().Debugf(ctx, "Return result on timeout; since %s of timeout %s", since, timeout)
282+
r.log().Debugf(ctx, "Return result on timeout; spent %s of timeout %s", spent, timeout)
222283
if err == nil {
223284
err = fmt.Errorf("cant get call cause call impl timeout")
224285
}
225286

226-
return nil, nil, err
287+
return VshardCallResp{}, err
227288
}
228289

229290
var rs *Replicaset
@@ -243,18 +304,20 @@ func (r *Router) RouterCallImpl(ctx context.Context,
243304

244305
r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)
245306

246-
future := rs.conn.Do(req, opts.PoolMode)
247-
248307
var storageCallResponse vshardStorageCallResponseProto
249-
err = future.GetTyped(&storageCallResponse)
308+
err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
250309
if err != nil {
251-
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
310+
return VshardCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
252311
}
253312

254-
r.log().Debugf(ctx, "Got call result response data %v", storageCallResponse.data)
313+
r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse)
314+
315+
if storageCallResponse.AssertError != nil {
316+
return VshardCallResp{}, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
317+
}
255318

256-
if storageCallResponse.vshardError != nil {
257-
vshardError := storageCallResponse.vshardError
319+
if storageCallResponse.VshardError != nil {
320+
vshardError := storageCallResponse.VshardError
258321

259322
switch vshardError.Name {
260323
case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked:
@@ -264,7 +327,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
264327
if vshardError.Destination != "" {
265328
destinationUUID, err := uuid.Parse(vshardError.Destination)
266329
if err != nil {
267-
return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
330+
return VshardCallResp{}, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
268331
vshardStorageClientCall, vshardError, err)
269332
}
270333

@@ -288,8 +351,8 @@ func (r *Router) RouterCallImpl(ctx context.Context,
288351
const defaultPoolingPause = 50 * time.Millisecond
289352
time.Sleep(defaultPoolingPause)
290353

291-
if time.Since(timeStart) > timeout {
292-
return nil, nil, vshardError
354+
if spent := time.Since(requestStartTime); spent > timeout {
355+
return VshardCallResp{}, vshardError
293356
}
294357
}
295358
}
@@ -308,34 +371,98 @@ func (r *Router) RouterCallImpl(ctx context.Context,
308371
// There is a comment why lua vshard router doesn't retry:
309372
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
310373
r.BucketReset(bucketID)
311-
return nil, nil, vshardError
374+
return VshardCallResp{}, vshardError
312375
case VShardErrNameNonMaster:
313376
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
314377
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
315378
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
316379
// we just return this error as is.
317-
return nil, nil, vshardError
380+
return VshardCallResp{}, vshardError
318381
default:
319-
return nil, nil, vshardError
382+
return VshardCallResp{}, vshardError
320383
}
321384
}
322385

323-
if storageCallResponse.assertError != nil {
324-
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.assertError)
325-
}
386+
r.metrics().RequestDuration(time.Since(requestStartTime), true, false)
326387

327-
r.metrics().RequestDuration(time.Since(timeStart), true, false)
388+
return storageCallResponse.CallResp, nil
389+
}
390+
}
328391

329-
return storageCallResponse.data, func(result interface{}) error {
330-
if len(storageCallResponse.data) == 0 {
331-
return nil
332-
}
392+
func (r *Router) CallRO(ctx context.Context, bucketID uint64,
393+
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
394+
return r.Call(ctx, bucketID, VshardCallModeRO, fnc, args, opts)
395+
}
396+
397+
func (r *Router) CallRW(ctx context.Context, bucketID uint64,
398+
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
399+
return r.Call(ctx, bucketID, VshardCallModeRW, fnc, args, opts)
400+
}
401+
402+
func (r *Router) CallRE(ctx context.Context, bucketID uint64,
403+
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
404+
return r.Call(ctx, bucketID, VshardCallModeRE, fnc, args, opts)
405+
}
406+
407+
func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
408+
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
409+
return r.Call(ctx, bucketID, VshardCallModeBRO, fnc, args, opts)
410+
}
411+
412+
func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
413+
fnc string, args interface{}, opts VshardCallOptions) (VshardCallResp, error) {
414+
return r.Call(ctx, bucketID, VshardCallModeBRE, fnc, args, opts)
415+
}
333416

334-
var stub bool
417+
// RouterCallImpl Perform shard operation function will restart operation
418+
// after wrong bucket response until timeout is reached
419+
func (r *Router) RouterCallImpl(ctx context.Context,
420+
bucketID uint64,
421+
opts CallOpts,
422+
fnc string,
423+
args interface{}) (interface{}, StorageResultTypedFunc, error) {
424+
425+
var vshardCallOpts = VshardCallOptions{
426+
Timeout: opts.Timeout,
427+
}
335428

336-
return future.GetTyped(&[]interface{}{&stub, result})
337-
}, nil
429+
var vshardCallMode VshardCallMode
430+
431+
switch opts.VshardMode {
432+
case WriteMode:
433+
vshardCallMode = VshardCallModeRW
434+
case ReadMode:
435+
switch opts.PoolMode {
436+
case pool.ANY:
437+
vshardCallMode = VshardCallModeBRO
438+
case pool.RO:
439+
vshardCallMode = VshardCallModeRO
440+
case pool.RW:
441+
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
442+
case pool.PreferRO:
443+
vshardCallMode = VshardCallModeBRE
444+
case pool.PreferRW:
445+
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
446+
default:
447+
return nil, nil, fmt.Errorf("unexpected opts.PoolMode %v", opts.PoolMode)
448+
}
449+
default:
450+
return nil, nil, fmt.Errorf("unexpected opts.VshardMode %v", opts.VshardMode)
451+
}
452+
453+
vshardCallResp, err := r.Call(ctx, bucketID, vshardCallMode, fnc, args, vshardCallOpts)
454+
if err != nil {
455+
return nil, nil, err
338456
}
457+
458+
data, err := vshardCallResp.Get()
459+
if err != nil {
460+
return nil, nil, err
461+
}
462+
463+
return data, func(result interface{}) error {
464+
return vshardCallResp.GetTyped([]interface{}{result})
465+
}, nil
339466
}
340467

341468
// RouterMapCallRWImpl perform call function on all masters in the cluster
@@ -349,7 +476,7 @@ func (r *Router) RouterMapCallRWImpl(
349476
) (map[uuid.UUID]interface{}, error) {
350477
const vshardStorageServiceCall = "vshard.storage._call"
351478

352-
timeout := CallTimeoutMin
479+
timeout := callTimeoutDefault
353480
if opts.Timeout > 0 {
354481
timeout = opts.Timeout
355482
}

api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
6767
conn: mPool,
6868
})
6969

70-
_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second}, "test", []byte("test"))
70+
_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second, VshardMode: ReadMode}, "test", []byte("test"))
7171
require.ErrorIs(t, err, futureError)
7272
})
7373
}

replicaset.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (rs *Replicaset) ReplicaCall(
103103
fnc string,
104104
args interface{},
105105
) (interface{}, StorageResultTypedFunc, error) {
106-
timeout := CallTimeoutMin
106+
timeout := callTimeoutDefault
107107

108108
if opts.Timeout > 0 {
109109
timeout = opts.Timeout

0 commit comments

Comments
 (0)