Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #38 (#45)
Browse files Browse the repository at this point in the history
* simplify DiscoveryAllBuckets: remove a suspicious if
* fix warning from linter v1.60.3
  • Loading branch information
nurzhan-saktaganov authored Sep 3, 2024
1 parent dfa83c2 commit 5f5cc35
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 28 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Unreleased

REFACTOR:

* resolve issue #38: simplify DiscoveryAllBuckets and remove suspicious if

## 0.0.12

BUG FIXES:
Expand Down
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (r *Router) RouterMapCallRWImpl(
return nil, err
}

if totalBucketCount != int32(r.cfg.TotalBucketCount) {
if uint64(totalBucketCount) != r.cfg.TotalBucketCount {
return nil, fmt.Errorf("unknown bucket counts %d", totalBucketCount)
}

Expand Down
46 changes: 19 additions & 27 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buc
}

func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
type BucketsDiscoveryPaginationRequest struct {
From uint64 `msgpack:"from"`
}

t := time.Now()

r.log().Info(ctx, "start discovery all buckets")
Expand All @@ -165,49 +161,45 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
rs := rs

errGr.Go(func() error {
rawReq := BucketsDiscoveryPaginationRequest{From: 0}
var bucketsDiscoveryPaginationRequest struct {
From uint64 `msgpack:"from"`
}

for {
bucketsInRS := make([]uint64, 0) // cause lua starts from 1
nextFrom := new(uint64)
req := tarantool.NewCallRequest("vshard.storage.buckets_discovery").
Context(ctx).
Args([]interface{}{&rawReq})
Args([]interface{}{&bucketsDiscoveryPaginationRequest})

future := rs.conn.Do(req, pool.PreferRO)

err := future.GetTyped(&[]interface{}{&struct {
Buckets *[]uint64 `msgpack:"buckets"`
NextFrom *uint64 `msgpack:"next_from"`
}{
Buckets: &bucketsInRS,
NextFrom: nextFrom,
}})
if err != nil {
return err
// We intentionally don't support old vshard storages that mentioned here:
// https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/router/init.lua#L343
var resp struct {
Buckets []uint64 `msgpack:"buckets"`
NextFrom uint64 `msgpack:"next_from"`
}

if len(bucketsInRS) == 0 {
return nil
err := future.GetTyped(&[]interface{}{&resp})
if err != nil {
return err
}

for _, bucket := range bucketsInRS {
if bucket == 0 {
break
}

if old := view.routeMap[bucket].Swap(rs); old == nil {
for _, bucketID := range resp.Buckets {
// We could check here that bucketID is in range [1, TotalBucketCnt], but it seems to be redundant.
if old := view.routeMap[bucketID].Swap(rs); old == nil {
view.knownBucketCount.Add(1)
}
}

// There are no more buckets
// https://github.com/tarantool/vshard/blob/8d299bfe/vshard/storage/init.lua#L1730
if nextFrom == nil || *nextFrom == 0 {
// vshard.storage returns { buckets = [], next_from = nil } if there are no more buckets.
// Since next_from is always > 0. NextFrom = 0 means that we got next_from = nil, that has not been decoded.
if resp.NextFrom == 0 {
return nil
}

rawReq.From = *nextFrom
bucketsDiscoveryPaginationRequest.From = resp.NextFrom
}
})
}
Expand Down

0 comments on commit 5f5cc35

Please sign in to comment.