From 5f5cc350cca98dc0d1f6199746a1be2becb04d99 Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Wed, 4 Sep 2024 00:18:59 +0300 Subject: [PATCH] resolve issue #38 (#45) * simplify DiscoveryAllBuckets: remove a suspicious if * fix warning from linter v1.60.3 --- CHANGELOG.md | 6 ++++++ api.go | 2 +- discovery.go | 46 +++++++++++++++++++--------------------------- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05006f5..ffedf3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## Unreleased + +REFACTOR: + +* resolve issue #38: simplify DiscoveryAllBuckets and remove suspicious if + ## 0.0.12 BUG FIXES: diff --git a/api.go b/api.go index d9b3bdf..e2a9bed 100644 --- a/api.go +++ b/api.go @@ -361,7 +361,7 @@ func (r *Router) RouterMapCallRWImpl( return nil, err } - if totalBucketCount != int32(r.cfg.TotalBucketCount) { + if uint64(totalBucketCount) != r.cfg.TotalBucketCount { return nil, fmt.Errorf("unknown bucket counts %d", totalBucketCount) } diff --git a/discovery.go b/discovery.go index da34f69..1a7310e 100644 --- a/discovery.go +++ b/discovery.go @@ -148,10 +148,6 @@ func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buc } func (r *Router) DiscoveryAllBuckets(ctx context.Context) error { - type BucketsDiscoveryPaginationRequest struct { - From uint64 `msgpack:"from"` - } - t := time.Now() r.log().Info(ctx, "start discovery all buckets") @@ -165,49 +161,45 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error { rs := rs errGr.Go(func() error { - rawReq := BucketsDiscoveryPaginationRequest{From: 0} + var bucketsDiscoveryPaginationRequest struct { + From uint64 `msgpack:"from"` + } for { - bucketsInRS := make([]uint64, 0) // cause lua starts from 1 - nextFrom := new(uint64) req := tarantool.NewCallRequest("vshard.storage.buckets_discovery"). Context(ctx). - Args([]interface{}{&rawReq}) + Args([]interface{}{&bucketsDiscoveryPaginationRequest}) future := rs.conn.Do(req, pool.PreferRO) - err := future.GetTyped(&[]interface{}{&struct { - Buckets *[]uint64 `msgpack:"buckets"` - NextFrom *uint64 `msgpack:"next_from"` - }{ - Buckets: &bucketsInRS, - NextFrom: nextFrom, - }}) - if err != nil { - return err + // We intentionally don't support old vshard storages that mentioned here: + // https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/router/init.lua#L343 + var resp struct { + Buckets []uint64 `msgpack:"buckets"` + NextFrom uint64 `msgpack:"next_from"` } - if len(bucketsInRS) == 0 { - return nil + err := future.GetTyped(&[]interface{}{&resp}) + if err != nil { + return err } - for _, bucket := range bucketsInRS { - if bucket == 0 { - break - } - - if old := view.routeMap[bucket].Swap(rs); old == nil { + for _, bucketID := range resp.Buckets { + // We could check here that bucketID is in range [1, TotalBucketCnt], but it seems to be redundant. + if old := view.routeMap[bucketID].Swap(rs); old == nil { view.knownBucketCount.Add(1) } } // There are no more buckets // https://github.com/tarantool/vshard/blob/8d299bfe/vshard/storage/init.lua#L1730 - if nextFrom == nil || *nextFrom == 0 { + // vshard.storage returns { buckets = [], next_from = nil } if there are no more buckets. + // Since next_from is always > 0. NextFrom = 0 means that we got next_from = nil, that has not been decoded. + if resp.NextFrom == 0 { return nil } - rawReq.From = *nextFrom + bucketsDiscoveryPaginationRequest.From = resp.NextFrom } }) }