Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve several issues (#41)
Browse files Browse the repository at this point in the history
* use return value of refID.Add()
	because race is possible in case of refID.Load() + refID.Add()
* use int32 for bucketCount on decoding 'storage_ref' response
	because uint16 (max 65535) is too small. We may have some dev cluster
	with million buckets that are distributed over two shards
* RouterMapCallRWImpl: modify idToResult under mutex to prevent concurrent modifications
* BucketDiscovery: use atomic to acquire the result of BucketSet
  • Loading branch information
nurzhan-saktaganov authored Sep 2, 2024
1 parent ea5470b commit cf9591c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ BUG FIXES:
* DiscoveryAllBuckets returns nil even if errGr.Wait() returns err, fixed
* DiscoveryHandleBuckets: misusage of atomics, fixed
* race when accessing to idToReplicaset, fixed: idToReplicaset is immutable object now
* RouterMapCallRWImpl: fix misusage of refID atomic
* RouterMapCallRWImpl: decode bucketCount into 32 bit integer instead of 16 bit
* RouterMapCallRWImpl: fix concurrent access to idToResult map
* BucketDiscovery: fix possible concurrent access to resultRs and err vars

FEATURES:

Expand Down
11 changes: 7 additions & 4 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package vshard_router //nolint:revive
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -262,8 +263,7 @@ func (r *Router) RouterMapCallRWImpl(
timeout := opts.Timeout
timeStart := time.Now()

refID := r.refID.Load()
r.refID.Add(1)
refID := r.refID.Add(1)

idToReplicasetRef := r.getIDToReplicaset()

Expand Down Expand Up @@ -342,15 +342,15 @@ func (r *Router) RouterMapCallRWImpl(
return vshardErr
}

var bucketCount uint16
var bucketCount int32
err = future.GetTyped(&[]interface{}{&bucketCount})
if err != nil {
cancel()

return err
}

atomic.AddInt32(&totalBucketCount, int32(bucketCount))
atomic.AddInt32(&totalBucketCount, bucketCount)
}

return nil
Expand Down Expand Up @@ -399,6 +399,7 @@ func (r *Router) RouterMapCallRWImpl(

// map stage: collect

var idToResultMutex sync.Mutex
idToResult := make(map[uuid.UUID]interface{})

for i := 0; i < int(r.nWorkers); i++ {
Expand Down Expand Up @@ -457,7 +458,9 @@ func (r *Router) RouterMapCallRWImpl(
return err
}

idToResultMutex.Lock()
idToResult[rsFuture.id] = respData[1]
idToResultMutex.Unlock()
}

return nil
Expand Down
27 changes: 19 additions & 8 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"golang.org/x/sync/errgroup"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
)
Expand Down Expand Up @@ -70,22 +71,32 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
wg := sync.WaitGroup{}
wg.Add(len(idToReplicasetRef))

var err error
var resultRs *Replicaset
type result struct {
err error
rs *Replicaset
}

// This works only for go 1.19 or higher. To support older versions
// we can use mutex + conditional compilation that checks go version.
// Example for conditional compilation: https://www.youtube.com/watch?v=5eQBKqVlNQg
var resultAtomic = atomic.Pointer[result]{}

for rsID, rs := range idToReplicasetRef {
rsID := rsID
go func(_rs *Replicaset) {
go func(rs *Replicaset, rsID uuid.UUID) {
defer wg.Done()
_, errStat := _rs.BucketStat(ctx, bucketID)
if errStat == nil {
resultRs, err = r.BucketSet(bucketID, rsID)
if _, err := rs.BucketStat(ctx, bucketID); err == nil {
var res result
res.rs, res.err = r.BucketSet(bucketID, rsID)
resultAtomic.Store(&res)
}
}(rs)
}(rs, rsID)
}

wg.Wait()

res := resultAtomic.Load()
resultRs, err := res.rs, res.err

if err != nil || resultRs == nil {
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
}
Expand Down
4 changes: 1 addition & 3 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
knownBucketCount: atomic.Int32{},
}

router.knownBucketCount.Store(0)

err = cfg.TopologyProvider.Init(router.Topology())
if err != nil {
router.log().Error(ctx, fmt.Sprintf("cant create new topology provider with err: %s", err))
Expand All @@ -143,7 +141,7 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
}

nWorkers := int32(2)
if cfg.NWorkers != 0 {
if cfg.NWorkers > 0 {
nWorkers = cfg.NWorkers
}

Expand Down

0 comments on commit cf9591c

Please sign in to comment.