From cf9591cc4fe8eb2d2be332a6995e5004e036861c Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Mon, 2 Sep 2024 15:49:04 +0300 Subject: [PATCH] resolve several issues (#41) * use return value of refID.Add() because race is possible in case of refID.Load() + refID.Add() * use int32 for bucketCount on decoding 'storage_ref' response because uint16 (max 65535) is too small. We may have some dev cluster with million buckets that are distributed over two shards * RouterMapCallRWImpl: modify idToResult under mutex to prevent concurrent modifications * BucketDiscovery: use atomic to acquire the result of BucketSet --- CHANGELOG.md | 4 ++++ api.go | 11 +++++++---- discovery.go | 27 +++++++++++++++++++-------- vshard.go | 4 +--- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21e96e6..af234b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ BUG FIXES: * DiscoveryAllBuckets returns nil even if errGr.Wait() returns err, fixed * DiscoveryHandleBuckets: misusage of atomics, fixed * race when accessing to idToReplicaset, fixed: idToReplicaset is immutable object now +* RouterMapCallRWImpl: fix misusage of refID atomic +* RouterMapCallRWImpl: decode bucketCount into 32 bit integer instead of 16 bit +* RouterMapCallRWImpl: fix concurrent access to idToResult map +* BucketDiscovery: fix possible concurrent access to resultRs and err vars FEATURES: diff --git a/api.go b/api.go index 61f8ded..3488816 100644 --- a/api.go +++ b/api.go @@ -3,6 +3,7 @@ package vshard_router //nolint:revive import ( "context" "fmt" + "sync" "sync/atomic" "time" @@ -262,8 +263,7 @@ func (r *Router) RouterMapCallRWImpl( timeout := opts.Timeout timeStart := time.Now() - refID := r.refID.Load() - r.refID.Add(1) + refID := r.refID.Add(1) idToReplicasetRef := r.getIDToReplicaset() @@ -342,7 +342,7 @@ func (r *Router) RouterMapCallRWImpl( return vshardErr } - var bucketCount uint16 + var bucketCount int32 err = future.GetTyped(&[]interface{}{&bucketCount}) if err != nil { cancel() @@ -350,7 +350,7 @@ func (r *Router) RouterMapCallRWImpl( return err } - atomic.AddInt32(&totalBucketCount, int32(bucketCount)) + atomic.AddInt32(&totalBucketCount, bucketCount) } return nil @@ -399,6 +399,7 @@ func (r *Router) RouterMapCallRWImpl( // map stage: collect + var idToResultMutex sync.Mutex idToResult := make(map[uuid.UUID]interface{}) for i := 0; i < int(r.nWorkers); i++ { @@ -457,7 +458,9 @@ func (r *Router) RouterMapCallRWImpl( return err } + idToResultMutex.Lock() idToResult[rsFuture.id] = respData[1] + idToResultMutex.Unlock() } return nil diff --git a/discovery.go b/discovery.go index a08e7b9..c19df33 100644 --- a/discovery.go +++ b/discovery.go @@ -9,6 +9,7 @@ import ( "golang.org/x/sync/errgroup" + "github.com/google/uuid" "github.com/tarantool/go-tarantool/v2" "github.com/tarantool/go-tarantool/v2/pool" ) @@ -70,22 +71,32 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica wg := sync.WaitGroup{} wg.Add(len(idToReplicasetRef)) - var err error - var resultRs *Replicaset + type result struct { + err error + rs *Replicaset + } + + // This works only for go 1.19 or higher. To support older versions + // we can use mutex + conditional compilation that checks go version. + // Example for conditional compilation: https://www.youtube.com/watch?v=5eQBKqVlNQg + var resultAtomic = atomic.Pointer[result]{} for rsID, rs := range idToReplicasetRef { - rsID := rsID - go func(_rs *Replicaset) { + go func(rs *Replicaset, rsID uuid.UUID) { defer wg.Done() - _, errStat := _rs.BucketStat(ctx, bucketID) - if errStat == nil { - resultRs, err = r.BucketSet(bucketID, rsID) + if _, err := rs.BucketStat(ctx, bucketID); err == nil { + var res result + res.rs, res.err = r.BucketSet(bucketID, rsID) + resultAtomic.Store(&res) } - }(rs) + }(rs, rsID) } wg.Wait() + res := resultAtomic.Load() + resultRs, err := res.rs, res.err + if err != nil || resultRs == nil { return nil, Errors[9] // NO_ROUTE_TO_BUCKET } diff --git a/vshard.go b/vshard.go index 4f0f9d8..ea1aa9c 100644 --- a/vshard.go +++ b/vshard.go @@ -115,8 +115,6 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) { knownBucketCount: atomic.Int32{}, } - router.knownBucketCount.Store(0) - err = cfg.TopologyProvider.Init(router.Topology()) if err != nil { router.log().Error(ctx, fmt.Sprintf("cant create new topology provider with err: %s", err)) @@ -143,7 +141,7 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) { } nWorkers := int32(2) - if cfg.NWorkers != 0 { + if cfg.NWorkers > 0 { nWorkers = cfg.NWorkers }