From 878345950102bec1015c90affa4021c9f3fa37be Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Wed, 18 Dec 2024 00:20:23 +0300 Subject: [PATCH] review fixes for #105 * ClusterBootstrap: eliminate direct access to r.idToReplicaset * replace github links to master branch with permalinks * hide router's concurrent data under interface 'routerConcurrentData' to prevent misusage by developers --- CHANGELOG.md | 3 ++ Makefile | 3 -- api.go | 12 +++--- api_test.go | 7 +--- concurrent_data.go | 71 ++++++++++++++++++++++++++++++++++++ discovery.go | 12 +++--- discovery_test.go | 5 +-- replicaset.go | 4 +- tests/tnt/replicaset_test.go | 4 +- topology.go | 30 ++++----------- topology_test.go | 18 +++++---- vshard.go | 53 +++++++-------------------- vshard_test.go | 7 +--- 13 files changed, 126 insertions(+), 103 deletions(-) create mode 100644 concurrent_data.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a23253..add3537 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ BUG FIXES: * Fix decoding fields for StorageCallVShardError (MasterUUID, ReplicasetUUID). +* ClusterBootstrap: eliminate direct access to r.idToReplicaset. CHANGES: * Add comment why and how we handle "NON_MASTER" vshard error. @@ -12,6 +13,7 @@ CHANGES: * Decode 'vshard.storage.call' response manually into struct vshardStorageCallResponseProto using DecodeMsgpack interface to reduce allocations (partially #61, #100). * Remove `mapstructure` tag from StorageCallVShardError. * Update benchmarks in README files. +* Replace github links to master branch with permalinks. FEATURES: @@ -26,6 +28,7 @@ REFACTOR: * Remove bucketStatError type, use StorageCallVShardError type instead. * Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100). * Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content). +* Hide router's concurrent data under interface 'routerConcurrentData' to prevent misusage by developers. TESTS: * Rename bootstrap_test.go -> tarantool_test.go and new test in this file. diff --git a/Makefile b/Makefile index 2e95705..ccc4683 100644 --- a/Makefile +++ b/Makefile @@ -44,9 +44,6 @@ test: prepare-tnt cover: test ## Generate and open the HTML report for test coverage. $(GO_CMD) tool cover -html=coverage.out -test/integration: - @$(MAKE) -C ./tests/integration test - generate/mocks: mockery --name=Pool --case=underscore --output=mocks/pool --outpkg=mockpool # need fix it later mockery --name=TopologyController --case=underscore --output=mocks/topology --outpkg=mocktopology diff --git a/api.go b/api.go index 35f0ce8..ef3947a 100644 --- a/api.go +++ b/api.go @@ -274,7 +274,7 @@ func (r *Router) RouterCallImpl(ctx context.Context, var loggedOnce bool for { - idToReplicasetRef := r.getIDToReplicaset() + idToReplicasetRef, _ := r.concurrentData.getRefs() if _, ok := idToReplicasetRef[destinationUUID]; ok { _, err := r.BucketSet(bucketID, destinationUUID) if err == nil { @@ -355,9 +355,9 @@ func (r *Router) RouterMapCallRWImpl( } timeStart := time.Now() - refID := r.refID.Add(1) + refID := r.concurrentData.nextRefID() - idToReplicasetRef := r.getIDToReplicaset() + idToReplicasetRef, _ := r.concurrentData.getRefs() defer func() { // call function "storage_unref" if map_callrw is failed or successed @@ -509,11 +509,11 @@ func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset, } // RouterRouteAll return map of all replicasets. -func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset { - idToReplicasetRef := r.getIDToReplicaset() +func (r *Router) RouterRouteAll() UUIDToReplicasetMap { + idToReplicasetRef, _ := r.concurrentData.getRefs() // Do not expose the original map to prevent unauthorized modification. - idToReplicasetCopy := make(map[uuid.UUID]*Replicaset) + idToReplicasetCopy := make(UUIDToReplicasetMap) for k, v := range idToReplicasetRef { idToReplicasetCopy[k] = v diff --git a/api_test.go b/api_test.go index 898099d..bdeb4f9 100644 --- a/api_test.go +++ b/api_test.go @@ -3,7 +3,6 @@ package vshard_router // nolint: revive import ( "context" "fmt" - "sync/atomic" "testing" "time" @@ -51,9 +50,7 @@ func TestRouter_RouterCallImpl(t *testing.T) { Loggerf: emptyLogfProvider, Metrics: emptyMetricsProvider, }, - view: &consistentView{ - routeMap: make([]atomic.Pointer[Replicaset], 11), - }, + concurrentData: newRouterConcurrentData(10), } futureError := fmt.Errorf("testErr") @@ -63,7 +60,7 @@ func TestRouter_RouterCallImpl(t *testing.T) { mPool := mockpool.NewPool(t) mPool.On("Do", mock.Anything, mock.Anything).Return(errFuture) - r.view.routeMap[5].Store(&Replicaset{ + r.concurrentData.(*routerConcurrentDataImpl).view.routeMap[5].Store(&Replicaset{ conn: mPool, }) diff --git a/concurrent_data.go b/concurrent_data.go new file mode 100644 index 0000000..14f7b57 --- /dev/null +++ b/concurrent_data.go @@ -0,0 +1,71 @@ +package vshard_router //nolint:revive + +import ( + "sync" + "sync/atomic" + + "github.com/google/uuid" +) + +type UUIDToReplicasetMap map[uuid.UUID]*Replicaset + +type routerConcurrentData interface { + nextRefID() int64 + getRefs() (UUIDToReplicasetMap, *consistentView) + setConsistentView(view *consistentView) + setIDToReplicaset(idToReplicasetNew UUIDToReplicasetMap) +} + +// routerConcurrentDataImpl is the router's data that can be accessed concurrently. +type routerConcurrentDataImpl struct { + mutex sync.RWMutex + + // mutex guards not the map itself, but the variable idToReplicaset. + // idToReplicaset is an immutable object by our convention. + // Whenever we add or remove a replicaset, we create a new map object. + // idToReplicaset variable can be modified only by setIDToReplicaset method. + // Assuming that we rarely change idToReplicaset. + // it should be the simplest and most efficient way of handling concurrent access. + // Additionally, we can safely iterate over a map because it never changes. + idToReplicaset UUIDToReplicasetMap + // See comment for type consistentView. + view *consistentView + + // ----------------------- Map-Reduce ----------------------- + // Storage Ref ID. It must be unique for each ref request + // and therefore is global and monotonically growing. + refID atomic.Int64 +} + +func newRouterConcurrentData(totalBucketCount uint64) routerConcurrentData { + return &routerConcurrentDataImpl{ + idToReplicaset: make(UUIDToReplicasetMap), + view: &consistentView{ + routeMap: make([]atomic.Pointer[Replicaset], totalBucketCount+1), + }, + } +} + +func (d *routerConcurrentDataImpl) nextRefID() int64 { + return d.refID.Add(1) +} + +func (d *routerConcurrentDataImpl) getRefs() (UUIDToReplicasetMap, *consistentView) { + d.mutex.RLock() + idToReplicasetRef, view := d.idToReplicaset, d.view + d.mutex.RUnlock() + + return idToReplicasetRef, view +} + +func (d *routerConcurrentDataImpl) setConsistentView(view *consistentView) { + d.mutex.Lock() + d.view = view + d.mutex.Unlock() +} + +func (d *routerConcurrentDataImpl) setIDToReplicaset(idToReplicasetNew UUIDToReplicasetMap) { + d.mutex.Lock() + d.idToReplicaset = idToReplicasetNew + d.mutex.Unlock() +} diff --git a/discovery.go b/discovery.go index 12b1265..0edc8b1 100644 --- a/discovery.go +++ b/discovery.go @@ -52,7 +52,7 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) } - view := r.getConsistentView() + _, view := r.concurrentData.getRefs() rs := view.routeMap[bucketID].Load() if rs != nil { @@ -70,7 +70,7 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica } func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Replicaset, error) { - idToReplicasetRef := r.getIDToReplicaset() + idToReplicasetRef, _ := r.concurrentData.getRefs() type rsFuture struct { rsID uuid.UUID @@ -127,8 +127,7 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L1700 // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/consts.lua#L37 func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) (*Replicaset, error) { - idToReplicasetRef := r.getIDToReplicaset() - view := r.getConsistentView() + idToReplicasetRef, view := r.concurrentData.getRefs() type rsFuture struct { rs *Replicaset @@ -190,7 +189,7 @@ func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicase // DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset. func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64) { - view := r.getConsistentView() + _, view := r.concurrentData.getRefs() removedFrom := make(map[*Replicaset]int) for _, bucketID := range buckets { @@ -230,8 +229,7 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error { errGr, ctx := errgroup.WithContext(ctx) - view := r.getConsistentView() - idToReplicasetRef := r.getIDToReplicaset() + idToReplicasetRef, view := r.concurrentData.getRefs() for _, rs := range idToReplicasetRef { rs := rs diff --git a/discovery_test.go b/discovery_test.go index c2bcbc8..a600170 100644 --- a/discovery_test.go +++ b/discovery_test.go @@ -2,7 +2,6 @@ package vshard_router //nolint:revive import ( "context" - "sync/atomic" "testing" "github.com/stretchr/testify/require" @@ -16,9 +15,7 @@ func TestRouter_BucketResolve_InvalidBucketID(t *testing.T) { TotalBucketCount: uint64(10), Loggerf: emptyLogfProvider, }, - view: &consistentView{ - routeMap: make([]atomic.Pointer[Replicaset], 11), - }, + concurrentData: newRouterConcurrentData(10), } _, err := r.BucketResolve(ctx, 20) diff --git a/replicaset.go b/replicaset.go index fe60a73..aa1be58 100644 --- a/replicaset.go +++ b/replicaset.go @@ -120,7 +120,7 @@ func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) { } // ReplicaCall perform function on remote storage -// link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661 +// link https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L661 // This method is deprecated, because looks like it has a little bit broken interface func (rs *Replicaset) ReplicaCall( ctx context.Context, @@ -216,7 +216,7 @@ func (rs *Replicaset) bucketsDiscovery(ctx context.Context, from uint64) (bucket // At each iteration, the algorithm either concludes or disregards at least // one new overloaded replicaset. Therefore, its time complexity is O(N^2), // where N is the number of replicasets. -// based on https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L1358 +// based on https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L1358 func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error { isBalanceFound := false weightSum := 0.0 diff --git a/tests/tnt/replicaset_test.go b/tests/tnt/replicaset_test.go index cb6b18d..8fe8198 100644 --- a/tests/tnt/replicaset_test.go +++ b/tests/tnt/replicaset_test.go @@ -226,9 +226,7 @@ func TestReplicasetBucketsCount(t *testing.T) { require.NoError(t, err, "NewRouter finished successfully") for _, rs := range router.RouterRouteAll() { - count := uint64(0) - - count, err = rs.BucketsCount(ctx) + count, err := rs.BucketsCount(ctx) require.NoError(t, err) require.NotEqual(t, count, 0) } diff --git a/topology.go b/topology.go index 56fabbb..28e3880 100644 --- a/topology.go +++ b/topology.go @@ -26,20 +26,6 @@ type TopologyController interface { AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error } -func (r *Router) getIDToReplicaset() map[uuid.UUID]*Replicaset { - r.idToReplicasetMutex.RLock() - idToReplicasetRef := r.idToReplicaset - r.idToReplicasetMutex.RUnlock() - - return idToReplicasetRef -} - -func (r *Router) setIDToReplicaset(idToReplicasetNew map[uuid.UUID]*Replicaset) { - r.idToReplicasetMutex.Lock() - r.idToReplicaset = idToReplicasetNew - r.idToReplicasetMutex.Unlock() -} - func (r *Router) Topology() TopologyController { return r } @@ -62,7 +48,7 @@ func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceI Opts: r.cfg.PoolOpts, } - idToReplicasetRef := r.getIDToReplicaset() + idToReplicasetRef, _ := r.concurrentData.getRefs() rs := idToReplicasetRef[rsID] if rs == nil { @@ -75,7 +61,7 @@ func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceI func (r *Router) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error { r.log().Debugf(ctx, "Trying to remove instance %s from router topology in rs %s", instanceID, rsID) - idToReplicasetRef := r.getIDToReplicaset() + idToReplicasetRef, _ := r.concurrentData.getRefs() rs := idToReplicasetRef[rsID] if rs == nil { @@ -88,7 +74,7 @@ func (r *Router) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error { r.log().Debugf(ctx, "Trying to add replicaset %s to router topology", rsInfo) - idToReplicasetOld := r.getIDToReplicaset() + idToReplicasetOld, _ := r.concurrentData.getRefs() if _, ok := idToReplicasetOld[rsInfo.UUID]; ok { return ErrReplicasetExists @@ -138,7 +124,7 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta replicaset.conn = conn // Create an entirely new map object - idToReplicasetNew := make(map[uuid.UUID]*Replicaset) + idToReplicasetNew := make(UUIDToReplicasetMap) for k, v := range idToReplicasetOld { idToReplicasetNew[k] = v } @@ -148,7 +134,7 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta // by comparing references to r.idToReplicaset and idToReplicasetOld. // But it requires reflection which I prefer to avoid. // See: https://stackoverflow.com/questions/58636694/how-to-know-if-2-go-maps-reference-the-same-data. - r.setIDToReplicaset(idToReplicasetNew) + r.concurrentData.setIDToReplicaset(idToReplicasetNew) return nil } @@ -171,7 +157,7 @@ func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetI func (r *Router) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error { r.log().Debugf(ctx, "Trying to remove replicaset %s from router topology", rsID) - idToReplicasetOld := r.getIDToReplicaset() + idToReplicasetOld, _ := r.concurrentData.getRefs() rs := idToReplicasetOld[rsID] if rs == nil { @@ -179,13 +165,13 @@ func (r *Router) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error { } // Create an entirely new map object - idToReplicasetNew := make(map[uuid.UUID]*Replicaset) + idToReplicasetNew := make(UUIDToReplicasetMap) for k, v := range idToReplicasetOld { idToReplicasetNew[k] = v } delete(idToReplicasetNew, rsID) - r.setIDToReplicaset(idToReplicasetNew) + r.concurrentData.setIDToReplicaset(idToReplicasetNew) return rs.conn.CloseGraceful() } diff --git a/topology_test.go b/topology_test.go index 2b02a8e..e63ded8 100644 --- a/topology_test.go +++ b/topology_test.go @@ -21,7 +21,7 @@ func TestController_AddInstance(t *testing.T) { t.Run("no such replicaset", func(t *testing.T) { router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{}, + concurrentData: newRouterConcurrentData(1), cfg: Config{ Loggerf: emptyLogfProvider, }, @@ -36,7 +36,7 @@ func TestController_AddInstance(t *testing.T) { t.Run("invalid instance info", func(t *testing.T) { router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{}, + concurrentData: newRouterConcurrentData(1), cfg: Config{ Loggerf: emptyLogfProvider, }, @@ -52,7 +52,7 @@ func TestController_RemoveInstance(t *testing.T) { t.Run("no such replicaset", func(t *testing.T) { router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{}, + concurrentData: newRouterConcurrentData(1), cfg: Config{ Loggerf: emptyLogfProvider, }, @@ -71,8 +71,10 @@ func TestController_RemoveReplicaset(t *testing.T) { mPool.On("CloseGraceful").Return(nil) router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{ - uuidToRemove: {conn: mPool}, + concurrentData: &routerConcurrentDataImpl{ + idToReplicaset: UUIDToReplicasetMap{ + uuidToRemove: {conn: mPool}, + }, }, cfg: Config{ Loggerf: emptyLogfProvider, @@ -97,8 +99,10 @@ func TestRouter_AddReplicaset_AlreadyExists(t *testing.T) { alreadyExistingRsUUID := uuid.New() router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{ - alreadyExistingRsUUID: {}, + concurrentData: &routerConcurrentDataImpl{ + idToReplicaset: UUIDToReplicasetMap{ + alreadyExistingRsUUID: {}, + }, }, cfg: Config{ Loggerf: emptyLogfProvider, diff --git a/vshard.go b/vshard.go index 86b557e..84209b3 100644 --- a/vshard.go +++ b/vshard.go @@ -3,7 +3,6 @@ package vshard_router //nolint:revive import ( "context" "fmt" - "sync" "sync/atomic" "time" @@ -42,18 +41,10 @@ type consistentView struct { type Router struct { cfg Config - // idToReplicasetMutex guards not the map itself, but the variable idToReplicaset. - // idToReplicaset is an immutable object by our convention. - // Whenever we add or remove a replicaset, we create a new map object. - // idToReplicaset can be modified only by TopologyController methods. - // Assuming that we rarely add or remove some replicaset, - // it should be the simplest and most efficient way of handling concurrent access. - // Additionally, we can safely iterate over a map because it never changes. - idToReplicasetMutex sync.RWMutex - idToReplicaset map[uuid.UUID]*Replicaset - - viewMutex sync.RWMutex - view *consistentView + // concurrentData is a router's data that can be accessed concurrently. + // It is hidden under interface to prevent misusage by developers. + // It's interface enforces developers use it correctly (I hope so). + concurrentData routerConcurrentData // ----------------------- Map-Reduce ----------------------- // Storage Ref ID. It must be unique for each ref request @@ -71,20 +62,6 @@ func (r *Router) log() LogfProvider { return r.cfg.Loggerf } -func (r *Router) getConsistentView() *consistentView { - r.viewMutex.RLock() - view := r.view - r.viewMutex.RUnlock() - - return view -} - -func (r *Router) setConsistentView(view *consistentView) { - r.viewMutex.Lock() - r.view = view - r.viewMutex.Unlock() -} - type Config struct { // Providers // Loggerf injects a custom logger. By default there is no logger is used. @@ -205,11 +182,9 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) { } router := &Router{ - cfg: cfg, - idToReplicaset: make(map[uuid.UUID]*Replicaset), - view: &consistentView{ - routeMap: make([]atomic.Pointer[Replicaset], cfg.TotalBucketCount+1), - }, + cfg: cfg, + + concurrentData: newRouterConcurrentData(cfg.TotalBucketCount), } err = cfg.TopologyProvider.Init(router.Topology()) @@ -240,15 +215,13 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) { // BucketSet Set a bucket to a replicaset. func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error) { - idToReplicasetRef := r.getIDToReplicaset() + idToReplicasetRef, view := r.concurrentData.getRefs() rs := idToReplicasetRef[rsID] if rs == nil { return nil, newVShardErrorNoRouteToBucket(bucketID) } - view := r.getConsistentView() - if oldRs := view.routeMap[bucketID].Swap(rs); oldRs == nil { view.knownBucketCount.Add(1) } @@ -257,7 +230,7 @@ func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error) } func (r *Router) BucketReset(bucketID uint64) { - view := r.getConsistentView() + _, view := r.concurrentData.getRefs() if bucketID > r.cfg.TotalBucketCount { return @@ -273,7 +246,7 @@ func (r *Router) RouteMapClean() { routeMap: make([]atomic.Pointer[Replicaset], r.cfg.TotalBucketCount+1), } - r.setConsistentView(newView) + r.concurrentData.setConsistentView(newView) } func prepareCfg(ctx context.Context, cfg Config) (Config, error) { @@ -384,10 +357,12 @@ func (r *Router) RouterBucketCount() uint64 { // error will result in an immediate return, ensuring that the operation either // succeeds fully or fails fast. func (r *Router) ClusterBootstrap(ctx context.Context, ifNotBootstrapped bool) error { - rssToBootstrap := make([]Replicaset, 0, len(r.idToReplicaset)) + idToReplicaset, _ := r.concurrentData.getRefs() + + rssToBootstrap := make([]Replicaset, 0, len(idToReplicaset)) var lastErr error - for _, rs := range r.idToReplicaset { + for _, rs := range idToReplicaset { rssToBootstrap = append(rssToBootstrap, *rs) } diff --git a/vshard_test.go b/vshard_test.go index 6963322..fe5aa13 100644 --- a/vshard_test.go +++ b/vshard_test.go @@ -1,7 +1,6 @@ package vshard_router //nolint:revive import ( - "sync/atomic" "testing" "github.com/stretchr/testify/require" @@ -32,10 +31,8 @@ func TestRouter_RouterBucketCount(t *testing.T) { func TestRouter_RouteMapClean(t *testing.T) { r := Router{ - cfg: Config{TotalBucketCount: 10}, - view: &consistentView{ - routeMap: make([]atomic.Pointer[Replicaset], 10), - }, + cfg: Config{TotalBucketCount: 10}, + concurrentData: newRouterConcurrentData(10), } require.NotPanics(t, func() {