diff --git a/CHANGELOG.md b/CHANGELOG.md index de911c6..9e6b327 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ REFACTOR: * resolve issue #38: simplify DiscoveryAllBuckets and remove suspicious if * resolve issue #46: drastically simplify RouterMapCallRWImpl and added tests with real tnt * Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs +* resolve issue #44: remove bucketCount field from struct Replicaset TESTS: diff --git a/discovery.go b/discovery.go index 60d4b64..1adeeec 100644 --- a/discovery.go +++ b/discovery.go @@ -111,39 +111,36 @@ func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicase // DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset. func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64) { view := r.getConsistentView() - - count := rs.bucketCount.Load() - - affected := make(map[*Replicaset]int) + removedFrom := make(map[*Replicaset]int) for _, bucketID := range buckets { oldRs := view.routeMap[bucketID].Swap(rs) - if oldRs != rs { - count++ - - if oldRs != nil { - if _, exists := affected[oldRs]; !exists { - affected[oldRs] = int(oldRs.bucketCount.Load()) - } + if oldRs == rs { + continue + } - oldRs.bucketCount.Add(-1) - } else { - // router.known_bucket_count = router.known_bucket_count + 1 - view.knownBucketCount.Add(1) - } + if oldRs == nil { + view.knownBucketCount.Add(1) } - } - if count != rs.bucketCount.Load() { - r.log().Infof(ctx, "Updated %s buckets: was %d, became %d", rs.info.Name, rs.bucketCount.Load(), count) + // We don't check oldRs for nil here, because it's a valid key too (if rs == nil, it means removed from unknown buckets set) + removedFrom[oldRs]++ } - rs.bucketCount.Store(count) + var addedToRs int + for rs, removedFromRs := range removedFrom { + addedToRs += removedFromRs - for rs, oldBucketCount := range affected { - r.log().Infof(ctx, "Affected buckets of %s: was %d, became %d", rs.info.Name, oldBucketCount, rs.bucketCount.Load()) + switch rs { + case nil: + r.log().Debugf(ctx, "Added new %d buckets to the cluster map", removedFromRs) + default: + r.log().Debugf(ctx, "Removed %d buckets from replicaset %s", removedFromRs, rs.info.Name) + } } + + r.log().Infof(ctx, "Added %d buckets to replicaset %s", addedToRs, rs.info.Name) } func (r *Router) DiscoveryAllBuckets(ctx context.Context) error { diff --git a/replicaset.go b/replicaset.go index f073693..1e231a4 100644 --- a/replicaset.go +++ b/replicaset.go @@ -3,7 +3,6 @@ package vshard_router //nolint:revive import ( "context" "fmt" - "sync/atomic" "time" "github.com/google/uuid" @@ -29,8 +28,6 @@ type ReplicasetCallOpts struct { type Replicaset struct { conn pool.Pooler info ReplicasetInfo - - bucketCount atomic.Int32 } func (rs *Replicaset) String() string { diff --git a/topology.go b/topology.go index 3cbab92..ff6d80d 100644 --- a/topology.go +++ b/topology.go @@ -3,7 +3,6 @@ package vshard_router //nolint:revive import ( "context" "fmt" - "sync/atomic" "github.com/google/uuid" "github.com/tarantool/go-tarantool/v2" @@ -93,8 +92,6 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta Name: rsInfo.Name, UUID: rsInfo.UUID, }, - // according to the documentation, it will be initialized by zero, see: https://pkg.go.dev/sync/atomic#Int32 - bucketCount: atomic.Int32{}, } rsInstances := make([]pool.Instance, 0, len(instances)) diff --git a/vshard.go b/vshard.go index a1f1a0f..f48462f 100644 --- a/vshard.go +++ b/vshard.go @@ -186,15 +186,8 @@ func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error) view := r.getConsistentView() - oldReplicaset := view.routeMap[bucketID].Swap(rs) - if oldReplicaset != rs { - if oldReplicaset != nil { - oldReplicaset.bucketCount.Add(-1) - } else { - view.knownBucketCount.Add(1) - } - - rs.bucketCount.Add(1) + if oldRs := view.routeMap[bucketID].Swap(rs); oldRs == nil { + view.knownBucketCount.Add(1) } return rs, nil @@ -203,7 +196,7 @@ func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error) func (r *Router) BucketReset(bucketID uint64) { view := r.getConsistentView() - if bucketID > uint64(len(view.routeMap))+1 { + if bucketID > r.cfg.TotalBucketCount { return } @@ -213,17 +206,11 @@ func (r *Router) BucketReset(bucketID uint64) { } func (r *Router) RouteMapClean() { - idToReplicasetRef := r.getIDToReplicaset() - newView := &consistentView{ routeMap: make([]atomic.Pointer[Replicaset], r.cfg.TotalBucketCount+1), } r.setConsistentView(newView) - - for _, rs := range idToReplicasetRef { - rs.bucketCount.Store(0) - } } func prepareCfg(cfg Config) (Config, error) {