Skip to content
This repository was archived by the owner on Mar 9, 2025. It is now read-only.

resolve issue #38 #45

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Unreleased

REFACTOR:

* resolve issue #38: simplify DiscoveryAllBuckets and remove suspicious if

## 0.0.12

BUG FIXES:
Expand Down
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (r *Router) RouterMapCallRWImpl(
return nil, err
}

if totalBucketCount != int32(r.cfg.TotalBucketCount) {
if uint64(totalBucketCount) != r.cfg.TotalBucketCount {
return nil, fmt.Errorf("unknown bucket counts %d", totalBucketCount)
}

Expand Down
46 changes: 19 additions & 27 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buc
}

func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
type BucketsDiscoveryPaginationRequest struct {
From uint64 `msgpack:"from"`
}

t := time.Now()

r.log().Info(ctx, "start discovery all buckets")
Expand All @@ -165,49 +161,45 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
rs := rs

errGr.Go(func() error {
rawReq := BucketsDiscoveryPaginationRequest{From: 0}
var bucketsDiscoveryPaginationRequest struct {
From uint64 `msgpack:"from"`
}

for {
bucketsInRS := make([]uint64, 0) // cause lua starts from 1
nextFrom := new(uint64)
req := tarantool.NewCallRequest("vshard.storage.buckets_discovery").
Context(ctx).
Args([]interface{}{&rawReq})
Args([]interface{}{&bucketsDiscoveryPaginationRequest})

future := rs.conn.Do(req, pool.PreferRO)

err := future.GetTyped(&[]interface{}{&struct {
Buckets *[]uint64 `msgpack:"buckets"`
NextFrom *uint64 `msgpack:"next_from"`
}{
Buckets: &bucketsInRS,
NextFrom: nextFrom,
}})
if err != nil {
return err
// We intentionally don't support old vshard storages that mentioned here:
// https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/router/init.lua#L343
var resp struct {
Buckets []uint64 `msgpack:"buckets"`
NextFrom uint64 `msgpack:"next_from"`
}

if len(bucketsInRS) == 0 {
return nil
err := future.GetTyped(&[]interface{}{&resp})
if err != nil {
return err
}

for _, bucket := range bucketsInRS {
if bucket == 0 {
break
}

if old := view.routeMap[bucket].Swap(rs); old == nil {
for _, bucketID := range resp.Buckets {
// We could check here that bucketID is in range [1, TotalBucketCnt], but it seems to be redundant.
if old := view.routeMap[bucketID].Swap(rs); old == nil {
view.knownBucketCount.Add(1)
}
}

// There are no more buckets
// https://github.com/tarantool/vshard/blob/8d299bfe/vshard/storage/init.lua#L1730
if nextFrom == nil || *nextFrom == 0 {
// vshard.storage returns { buckets = [], next_from = nil } if there are no more buckets.
// Since next_from is always > 0. NextFrom = 0 means that we got next_from = nil, that has not been decoded.
if resp.NextFrom == 0 {
return nil
}

rawReq.From = *nextFrom
bucketsDiscoveryPaginationRequest.From = resp.NextFrom
}
})
}
Expand Down
Loading