Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
a little refactor
Browse files Browse the repository at this point in the history
- reduce SLOC by using CallAsync method
- BucketForceCreate optimization: don't decode tnt response
  • Loading branch information
nurzhan-saktaganov committed Dec 13, 2024
1 parent 17d1ba2 commit 2d912a7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 42 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ FEATURES:
REFACTOR:

* Use constants for vshard error names and codes.
* Reduce SLOC by using CallAsync method.
* BucketForceCreate optimization: don't decode tnt response.

TESTS:
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.
Expand Down
55 changes: 13 additions & 42 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,7 @@ func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketSt
func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tarantool.Future {
const bucketStatFnc = "vshard.storage.bucket_stat"

req := tarantool.NewCallRequest(bucketStatFnc).
Args([]interface{}{bucketID}).
Context(ctx)

future := rs.conn.Do(req, pool.RO)

return future
return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RO}, bucketStatFnc, []interface{}{bucketID})
}

func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
Expand Down Expand Up @@ -103,20 +97,11 @@ func (rs *Replicaset) ReplicaCall(
fnc string,
args interface{},
) (interface{}, StorageResultTypedFunc, error) {
timeout := CallTimeoutMin

if opts.Timeout > 0 {
timeout = opts.Timeout
if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

req := tarantool.NewCallRequest(fnc).
Context(ctx).
Args(args)

future := rs.conn.Do(req, opts.PoolMode)
future := rs.CallAsync(ctx, opts, fnc, args)

respData, err := future.Get()
if err != nil {
Expand Down Expand Up @@ -158,13 +143,8 @@ func (rs *Replicaset) bucketsDiscoveryAsync(ctx context.Context, from uint64) *t
From uint64 `msgpack:"from"`
}{From: from}

req := tarantool.NewCallRequest(bucketsDiscoveryFnc).
Context(ctx).
Args([]interface{}{&bucketsDiscoveryPaginationRequest})

future := rs.conn.Do(req, pool.PreferRO)

return future
return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.PreferRO}, bucketsDiscoveryFnc,
[]interface{}{bucketsDiscoveryPaginationRequest})
}

type bucketsDiscoveryResp struct {
Expand Down Expand Up @@ -279,28 +259,19 @@ func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error
func (rs *Replicaset) BucketsCount(ctx context.Context) (uint64, error) {
const bucketCountFnc = "vshard.storage.buckets_count"

req := tarantool.NewCallRequest(bucketCountFnc)
req = req.Context(ctx)
var bucketCount uint64

fut := rs.conn.Do(req, pool.ANY)
fut := rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.ANY}, bucketCountFnc, nil)
err := fut.GetTyped(&[]interface{}{&bucketCount})

bucketCount := new(uint64)

err := fut.GetTyped(&[]interface{}{bucketCount})

return *bucketCount, err
return bucketCount, err
}

func (rs *Replicaset) BucketForceCreate(ctx context.Context, firstBucketID, count uint64) error {
const bucketCountFnc = "vshard.storage.bucket_force_create"

req := tarantool.NewCallRequest(bucketCountFnc)
req = req.Context(ctx)
req = req.Args(&[]interface{}{firstBucketID, count})

fut := rs.conn.Do(req, pool.RW)
const bucketForceCreateFnc = "vshard.storage.bucket_force_create"

_, err := fut.Get()
fut := rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RW}, bucketForceCreateFnc, []interface{}{firstBucketID, count})
_, err := fut.GetResponse()

return err
}

0 comments on commit 2d912a7

Please sign in to comment.