From 2d912a78c7468517103d32a52da63bb8c0e1283b Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Fri, 13 Dec 2024 18:14:58 +0300 Subject: [PATCH] a little refactor - reduce SLOC by using CallAsync method - BucketForceCreate optimization: don't decode tnt response --- CHANGELOG.md | 2 ++ replicaset.go | 55 ++++++++++++--------------------------------------- 2 files changed, 15 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cfa8c83..6b51f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ FEATURES: REFACTOR: * Use constants for vshard error names and codes. +* Reduce SLOC by using CallAsync method. +* BucketForceCreate optimization: don't decode tnt response. TESTS: * Rename bootstrap_test.go -> tarantool_test.go and new test in this file. diff --git a/replicaset.go b/replicaset.go index 09e7797..74692b8 100644 --- a/replicaset.go +++ b/replicaset.go @@ -48,13 +48,7 @@ func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketSt func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tarantool.Future { const bucketStatFnc = "vshard.storage.bucket_stat" - req := tarantool.NewCallRequest(bucketStatFnc). - Args([]interface{}{bucketID}). - Context(ctx) - - future := rs.conn.Do(req, pool.RO) - - return future + return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RO}, bucketStatFnc, []interface{}{bucketID}) } func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) { @@ -103,20 +97,11 @@ func (rs *Replicaset) ReplicaCall( fnc string, args interface{}, ) (interface{}, StorageResultTypedFunc, error) { - timeout := CallTimeoutMin - - if opts.Timeout > 0 { - timeout = opts.Timeout + if opts.Timeout == 0 { + opts.Timeout = CallTimeoutMin } - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - req := tarantool.NewCallRequest(fnc). - Context(ctx). - Args(args) - - future := rs.conn.Do(req, opts.PoolMode) + future := rs.CallAsync(ctx, opts, fnc, args) respData, err := future.Get() if err != nil { @@ -158,13 +143,8 @@ func (rs *Replicaset) bucketsDiscoveryAsync(ctx context.Context, from uint64) *t From uint64 `msgpack:"from"` }{From: from} - req := tarantool.NewCallRequest(bucketsDiscoveryFnc). - Context(ctx). - Args([]interface{}{&bucketsDiscoveryPaginationRequest}) - - future := rs.conn.Do(req, pool.PreferRO) - - return future + return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.PreferRO}, bucketsDiscoveryFnc, + []interface{}{bucketsDiscoveryPaginationRequest}) } type bucketsDiscoveryResp struct { @@ -279,28 +259,19 @@ func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error func (rs *Replicaset) BucketsCount(ctx context.Context) (uint64, error) { const bucketCountFnc = "vshard.storage.buckets_count" - req := tarantool.NewCallRequest(bucketCountFnc) - req = req.Context(ctx) + var bucketCount uint64 - fut := rs.conn.Do(req, pool.ANY) + fut := rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.ANY}, bucketCountFnc, nil) + err := fut.GetTyped(&[]interface{}{&bucketCount}) - bucketCount := new(uint64) - - err := fut.GetTyped(&[]interface{}{bucketCount}) - - return *bucketCount, err + return bucketCount, err } func (rs *Replicaset) BucketForceCreate(ctx context.Context, firstBucketID, count uint64) error { - const bucketCountFnc = "vshard.storage.bucket_force_create" - - req := tarantool.NewCallRequest(bucketCountFnc) - req = req.Context(ctx) - req = req.Args(&[]interface{}{firstBucketID, count}) - - fut := rs.conn.Do(req, pool.RW) + const bucketForceCreateFnc = "vshard.storage.bucket_force_create" - _, err := fut.Get() + fut := rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RW}, bucketForceCreateFnc, []interface{}{firstBucketID, count}) + _, err := fut.GetResponse() return err }