Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #39
Browse files Browse the repository at this point in the history
* combining an array/slice of atomic pointers and immutable objects
	to lock-free modification of routeMap
* RouterMapCallRWImpl typofix: compare totalBucketCount againts TotalBucketCount instead of r.knownBucketCount
  • Loading branch information
nurzhan-saktaganov committed Aug 21, 2024
1 parent 0bfd2b1 commit 9a80119
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 36 deletions.
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (r *Router) RouterMapCallRWImpl(
return nil, err
}

if totalBucketCount != r.knownBucketCount.Load() {
if totalBucketCount != int32(r.cfg.TotalBucketCount) {
return nil, fmt.Errorf("unknown bucket counts %d", totalBucketCount)
}

Expand Down
40 changes: 25 additions & 15 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,17 @@ func (s *searchLock) StartSearch(bucketID uint64) chan struct{} {

// BucketDiscovery search bucket in whole cluster
func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replicaset, error) {
r.searchLock.WaitOnSearch(bucketID)
r.viewMutex.RLock()
view := r.view
r.viewMutex.RUnlock()

rs := r.routeMap[bucketID]
rs := view.routeMap[bucketID].Load()
if rs != nil {
return rs, nil
}

// it`s ok if in the same time we have few active searches
// mu per bucket is expansive
stopSearchCh := r.searchLock.StartSearch(bucketID)
defer close(stopSearchCh)

r.cfg.Logger.Info(ctx, fmt.Sprintf("Discovering bucket %d", bucketID))

r.idToReplicasetMutex.RLock()
Expand All @@ -87,6 +86,8 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
go func(rs *Replicaset, rsID uuid.UUID) {
defer wg.Done()
if _, err := rs.BucketStat(ctx, bucketID); err == nil {
// It's ok if several replicasets return ok to bucket_stat command for the same bucketID,
// just pick any of them.
var res result
res.rs, res.err = r.BucketSet(bucketID, rsID)
resultAtomic.Store(&res)
Expand Down Expand Up @@ -116,7 +117,11 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica

// BucketResolve resolve bucket id to replicaset
func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error) {
rs := r.routeMap[bucketID]
r.viewMutex.RLock()
view := r.view
r.viewMutex.RUnlock()

rs := view.routeMap[bucketID].Load()
if rs != nil {
return rs, nil
}
Expand All @@ -132,11 +137,16 @@ func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicase

// DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64) {
r.viewMutex.RLock()
view := r.view
r.viewMutex.RUnlock()

count := rs.bucketCount.Load()

affected := make(map[*Replicaset]int)

for _, bucketID := range buckets {
oldRs := r.routeMap[bucketID]
oldRs := view.routeMap[bucketID].Swap(rs)

if oldRs != rs {
count++
Expand All @@ -149,9 +159,8 @@ func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buc
oldRs.bucketCount.Add(-1)
} else {
// router.known_bucket_count = router.known_bucket_count + 1
r.knownBucketCount.Add(1)
view.knownBucketCount.Add(1)
}
r.routeMap[bucketID] = rs
}
}

Expand All @@ -175,10 +184,12 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {

r.log().Info(ctx, "start discovery all buckets")

knownBucket := atomic.Int32{}

errGr, ctx := errgroup.WithContext(ctx)

r.viewMutex.RLock()
view := r.view
r.viewMutex.RUnlock()

r.idToReplicasetMutex.RLock()
idToReplicasetRef := r.idToReplicaset
r.idToReplicasetMutex.RUnlock()
Expand Down Expand Up @@ -218,8 +229,9 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
break
}

r.routeMap[bucket] = rs
knownBucket.Add(1)
if old := view.routeMap[bucket].Swap(rs); old == nil {
view.knownBucketCount.Add(1)
}
}

// There are no more buckets
Expand All @@ -239,8 +251,6 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
}
r.log().Info(ctx, fmt.Sprintf("discovery done since: %s", time.Since(t)))

r.knownBucketCount.Store(knownBucket.Load())

return nil
}

Expand Down
64 changes: 46 additions & 18 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,25 @@ var (
ErrInvalidInstanceInfo = fmt.Errorf("invalid instance info")
)

// This data struct is instroduced by https://github.com/KaymeKaydex/go-vshard-router/issues/39.
// We use an array of atomics to lock-free handling elements of routeMap.
// knownBucketCount reflects a statistic over routeMap.
// knownBucketCount might be inconsistent for a few mksecs, because at first we change routeMap[bucketID],
// only after that we change knownBucketCount: this is not an atomic change of complex state.
// It it is not a problem at all.
//
// While changing `knownBucketCount` we heavily rely on commutative property of algebraic sum operation ("+"),
// due to this property we don't afraid any amount of concurrent modifications.
// See: https://en.wikipedia.org/wiki/Commutative_property
//
// Since RouteMapClean creates a new routeMap, we have to assign knownBucketCount := 0.
// But assign is not a commutative operation, therefore we have to create a completely new atomic variable,
// that reflects a statistic over newly created routeMap.
type consistentView struct {
routeMap []atomic.Pointer[Replicaset]
knownBucketCount atomic.Int32
}

type Router struct {
cfg Config

Expand All @@ -31,10 +50,8 @@ type Router struct {
idToReplicasetMutex sync.RWMutex
idToReplicaset map[uuid.UUID]*Replicaset

routeMap []*Replicaset
searchLock searchLock

knownBucketCount atomic.Int32
viewMutex sync.RWMutex
view *consistentView

// ----------------------- Map-Reduce -----------------------
// Storage Ref ID. It must be unique for each ref request
Expand Down Expand Up @@ -107,11 +124,11 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
}

router := &Router{
cfg: cfg,
idToReplicaset: make(map[uuid.UUID]*Replicaset),
routeMap: make([]*Replicaset, cfg.TotalBucketCount+1),
searchLock: searchLock{mu: sync.RWMutex{}, perBucket: make([]chan struct{}, cfg.TotalBucketCount+1)},
knownBucketCount: atomic.Int32{},
cfg: cfg,
idToReplicaset: make(map[uuid.UUID]*Replicaset),
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], cfg.TotalBucketCount+1),
},
}

err = cfg.TopologyProvider.Init(router.Topology())
Expand Down Expand Up @@ -160,39 +177,50 @@ func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error)
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
}

oldReplicaset := r.routeMap[bucketID]
r.viewMutex.RLock()
view := r.view
r.viewMutex.RUnlock()

oldReplicaset := view.routeMap[bucketID].Swap(rs)
if oldReplicaset != rs {
if oldReplicaset != nil {
oldReplicaset.bucketCount.Add(-1)
} else {
r.knownBucketCount.Add(1)
view.knownBucketCount.Add(1)
}

rs.bucketCount.Add(1)
}

r.routeMap[bucketID] = rs

return rs, nil
}

func (r *Router) BucketReset(bucketID uint64) {
if bucketID > uint64(len(r.routeMap))+1 {
r.viewMutex.RLock()
view := r.view
r.viewMutex.RUnlock()

if bucketID > uint64(len(view.routeMap))+1 {
return
}

r.knownBucketCount.Add(-1)
r.routeMap[bucketID] = nil
if old := view.routeMap[bucketID].Swap(nil); old != nil {
view.knownBucketCount.Add(-1)
}
}

func (r *Router) RouteMapClean() {
r.idToReplicasetMutex.RLock()
idToReplicasetRef := r.idToReplicaset
r.idToReplicasetMutex.RUnlock()

r.routeMap = make([]*Replicaset, r.cfg.TotalBucketCount+1)
r.knownBucketCount.Store(0)
newView := &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], r.cfg.TotalBucketCount+1),
}

r.viewMutex.Lock()
r.view = newView
r.viewMutex.Unlock()

for _, rs := range idToReplicasetRef {
rs.bucketCount.Store(0)
Expand Down
7 changes: 5 additions & 2 deletions vshard_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vshard_router //nolint:revive

import (
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -31,8 +32,10 @@ func TestRouter_RouterBucketCount(t *testing.T) {

func TestRouter_RouteMapClean(t *testing.T) {
r := Router{
cfg: Config{TotalBucketCount: 10},
routeMap: make([]*Replicaset, 10),
cfg: Config{TotalBucketCount: 10},
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], 10),
},
}

require.NotPanics(t, func() {
Expand Down

0 comments on commit 9a80119

Please sign in to comment.