Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #44 (#67)
Browse files Browse the repository at this point in the history
- Remove bucketCount field of struct Replicaset
  • Loading branch information
nurzhan-saktaganov authored Sep 14, 2024
1 parent 696fc12 commit c013b51
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ REFACTOR:
* resolve issue #38: simplify DiscoveryAllBuckets and remove suspicious if
* resolve issue #46: drastically simplify RouterMapCallRWImpl and added tests with real tnt
* Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs
* resolve issue #44: remove bucketCount field from struct Replicaset

TESTS:

Expand Down
41 changes: 19 additions & 22 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,39 +111,36 @@ func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicase
// DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64) {
view := r.getConsistentView()

count := rs.bucketCount.Load()

affected := make(map[*Replicaset]int)
removedFrom := make(map[*Replicaset]int)

for _, bucketID := range buckets {
oldRs := view.routeMap[bucketID].Swap(rs)

if oldRs != rs {
count++

if oldRs != nil {
if _, exists := affected[oldRs]; !exists {
affected[oldRs] = int(oldRs.bucketCount.Load())
}
if oldRs == rs {
continue
}

oldRs.bucketCount.Add(-1)
} else {
// router.known_bucket_count = router.known_bucket_count + 1
view.knownBucketCount.Add(1)
}
if oldRs == nil {
view.knownBucketCount.Add(1)
}
}

if count != rs.bucketCount.Load() {
r.log().Infof(ctx, "Updated %s buckets: was %d, became %d", rs.info.Name, rs.bucketCount.Load(), count)
// We don't check oldRs for nil here, because it's a valid key too (if rs == nil, it means removed from unknown buckets set)
removedFrom[oldRs]++
}

rs.bucketCount.Store(count)
var addedToRs int
for rs, removedFromRs := range removedFrom {
addedToRs += removedFromRs

for rs, oldBucketCount := range affected {
r.log().Infof(ctx, "Affected buckets of %s: was %d, became %d", rs.info.Name, oldBucketCount, rs.bucketCount.Load())
switch rs {
case nil:
r.log().Debugf(ctx, "Added new %d buckets to the cluster map", removedFromRs)
default:
r.log().Debugf(ctx, "Removed %d buckets from replicaset %s", removedFromRs, rs.info.Name)
}
}

r.log().Infof(ctx, "Added %d buckets to replicaset %s", addedToRs, rs.info.Name)
}

func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
Expand Down
3 changes: 0 additions & 3 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package vshard_router //nolint:revive
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand All @@ -29,8 +28,6 @@ type ReplicasetCallOpts struct {
type Replicaset struct {
conn pool.Pooler
info ReplicasetInfo

bucketCount atomic.Int32
}

func (rs *Replicaset) String() string {
Expand Down
3 changes: 0 additions & 3 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package vshard_router //nolint:revive
import (
"context"
"fmt"
"sync/atomic"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool/v2"
Expand Down Expand Up @@ -93,8 +92,6 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
Name: rsInfo.Name,
UUID: rsInfo.UUID,
},
// according to the documentation, it will be initialized by zero, see: https://pkg.go.dev/sync/atomic#Int32
bucketCount: atomic.Int32{},
}

rsInstances := make([]pool.Instance, 0, len(instances))
Expand Down
19 changes: 3 additions & 16 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,8 @@ func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error)

view := r.getConsistentView()

oldReplicaset := view.routeMap[bucketID].Swap(rs)
if oldReplicaset != rs {
if oldReplicaset != nil {
oldReplicaset.bucketCount.Add(-1)
} else {
view.knownBucketCount.Add(1)
}

rs.bucketCount.Add(1)
if oldRs := view.routeMap[bucketID].Swap(rs); oldRs == nil {
view.knownBucketCount.Add(1)
}

return rs, nil
Expand All @@ -203,7 +196,7 @@ func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error)
func (r *Router) BucketReset(bucketID uint64) {
view := r.getConsistentView()

if bucketID > uint64(len(view.routeMap))+1 {
if bucketID > r.cfg.TotalBucketCount {
return
}

Expand All @@ -213,17 +206,11 @@ func (r *Router) BucketReset(bucketID uint64) {
}

func (r *Router) RouteMapClean() {
idToReplicasetRef := r.getIDToReplicaset()

newView := &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], r.cfg.TotalBucketCount+1),
}

r.setConsistentView(newView)

for _, rs := range idToReplicasetRef {
rs.bucketCount.Store(0)
}
}

func prepareCfg(cfg Config) (Config, error) {
Expand Down

0 comments on commit c013b51

Please sign in to comment.