Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
review fixes for #105
Browse files Browse the repository at this point in the history
* ClusterBootstrap: eliminate direct access to r.idToReplicaset
* replace github links to master branch with permalinks
* hide router's concurrent data under interface 'routerConcurrentData' to prevent misusage by developers
  • Loading branch information
nurzhan-saktaganov committed Dec 17, 2024
1 parent 0a8677f commit 8783459
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 103 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

BUG FIXES:
* Fix decoding fields for StorageCallVShardError (MasterUUID, ReplicasetUUID).
* ClusterBootstrap: eliminate direct access to r.idToReplicaset.

CHANGES:
* Add comment why and how we handle "NON_MASTER" vshard error.
Expand All @@ -12,6 +13,7 @@ CHANGES:
* Decode 'vshard.storage.call' response manually into struct vshardStorageCallResponseProto using DecodeMsgpack interface to reduce allocations (partially #61, #100).
* Remove `mapstructure` tag from StorageCallVShardError.
* Update benchmarks in README files.
* Replace github links to master branch with permalinks.

FEATURES:

Expand All @@ -26,6 +28,7 @@ REFACTOR:
* Remove bucketStatError type, use StorageCallVShardError type instead.
* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100).
* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content).
* Hide router's concurrent data under interface 'routerConcurrentData' to prevent misusage by developers.

TESTS:
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.
Expand Down
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ test: prepare-tnt
cover: test ## Generate and open the HTML report for test coverage.
$(GO_CMD) tool cover -html=coverage.out

test/integration:
@$(MAKE) -C ./tests/integration test

generate/mocks:
mockery --name=Pool --case=underscore --output=mocks/pool --outpkg=mockpool # need fix it later
mockery --name=TopologyController --case=underscore --output=mocks/topology --outpkg=mocktopology
Expand Down
12 changes: 6 additions & 6 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,

var loggedOnce bool
for {
idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()
if _, ok := idToReplicasetRef[destinationUUID]; ok {
_, err := r.BucketSet(bucketID, destinationUUID)
if err == nil {
Expand Down Expand Up @@ -355,9 +355,9 @@ func (r *Router) RouterMapCallRWImpl(
}

timeStart := time.Now()
refID := r.refID.Add(1)
refID := r.concurrentData.nextRefID()

idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()

defer func() {
// call function "storage_unref" if map_callrw is failed or successed
Expand Down Expand Up @@ -509,11 +509,11 @@ func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset,
}

// RouterRouteAll return map of all replicasets.
func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset {
idToReplicasetRef := r.getIDToReplicaset()
func (r *Router) RouterRouteAll() UUIDToReplicasetMap {
idToReplicasetRef, _ := r.concurrentData.getRefs()

// Do not expose the original map to prevent unauthorized modification.
idToReplicasetCopy := make(map[uuid.UUID]*Replicaset)
idToReplicasetCopy := make(UUIDToReplicasetMap)

for k, v := range idToReplicasetRef {
idToReplicasetCopy[k] = v
Expand Down
7 changes: 2 additions & 5 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package vshard_router // nolint: revive
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -51,9 +50,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
Loggerf: emptyLogfProvider,
Metrics: emptyMetricsProvider,
},
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], 11),
},
concurrentData: newRouterConcurrentData(10),
}

futureError := fmt.Errorf("testErr")
Expand All @@ -63,7 +60,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
mPool := mockpool.NewPool(t)
mPool.On("Do", mock.Anything, mock.Anything).Return(errFuture)

r.view.routeMap[5].Store(&Replicaset{
r.concurrentData.(*routerConcurrentDataImpl).view.routeMap[5].Store(&Replicaset{
conn: mPool,
})

Expand Down
71 changes: 71 additions & 0 deletions concurrent_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package vshard_router //nolint:revive

import (
"sync"
"sync/atomic"

"github.com/google/uuid"
)

type UUIDToReplicasetMap map[uuid.UUID]*Replicaset

type routerConcurrentData interface {
nextRefID() int64
getRefs() (UUIDToReplicasetMap, *consistentView)
setConsistentView(view *consistentView)
setIDToReplicaset(idToReplicasetNew UUIDToReplicasetMap)
}

// routerConcurrentDataImpl is the router's data that can be accessed concurrently.
type routerConcurrentDataImpl struct {
mutex sync.RWMutex

// mutex guards not the map itself, but the variable idToReplicaset.
// idToReplicaset is an immutable object by our convention.
// Whenever we add or remove a replicaset, we create a new map object.
// idToReplicaset variable can be modified only by setIDToReplicaset method.
// Assuming that we rarely change idToReplicaset.
// it should be the simplest and most efficient way of handling concurrent access.
// Additionally, we can safely iterate over a map because it never changes.
idToReplicaset UUIDToReplicasetMap
// See comment for type consistentView.
view *consistentView

// ----------------------- Map-Reduce -----------------------
// Storage Ref ID. It must be unique for each ref request
// and therefore is global and monotonically growing.
refID atomic.Int64
}

func newRouterConcurrentData(totalBucketCount uint64) routerConcurrentData {
return &routerConcurrentDataImpl{
idToReplicaset: make(UUIDToReplicasetMap),
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], totalBucketCount+1),
},
}
}

func (d *routerConcurrentDataImpl) nextRefID() int64 {
return d.refID.Add(1)
}

func (d *routerConcurrentDataImpl) getRefs() (UUIDToReplicasetMap, *consistentView) {
d.mutex.RLock()
idToReplicasetRef, view := d.idToReplicaset, d.view
d.mutex.RUnlock()

return idToReplicasetRef, view
}

func (d *routerConcurrentDataImpl) setConsistentView(view *consistentView) {
d.mutex.Lock()
d.view = view
d.mutex.Unlock()
}

func (d *routerConcurrentDataImpl) setIDToReplicaset(idToReplicasetNew UUIDToReplicasetMap) {
d.mutex.Lock()
d.idToReplicaset = idToReplicasetNew
d.mutex.Unlock()
}
12 changes: 5 additions & 7 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

view := r.getConsistentView()
_, view := r.concurrentData.getRefs()

rs := view.routeMap[bucketID].Load()
if rs != nil {
Expand All @@ -70,7 +70,7 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
}

func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Replicaset, error) {
idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()

type rsFuture struct {
rsID uuid.UUID
Expand Down Expand Up @@ -127,8 +127,7 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L1700
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/consts.lua#L37
func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) (*Replicaset, error) {
idToReplicasetRef := r.getIDToReplicaset()
view := r.getConsistentView()
idToReplicasetRef, view := r.concurrentData.getRefs()

type rsFuture struct {
rs *Replicaset
Expand Down Expand Up @@ -190,7 +189,7 @@ func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicase

// DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64) {
view := r.getConsistentView()
_, view := r.concurrentData.getRefs()
removedFrom := make(map[*Replicaset]int)

for _, bucketID := range buckets {
Expand Down Expand Up @@ -230,8 +229,7 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {

errGr, ctx := errgroup.WithContext(ctx)

view := r.getConsistentView()
idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, view := r.concurrentData.getRefs()

for _, rs := range idToReplicasetRef {
rs := rs
Expand Down
5 changes: 1 addition & 4 deletions discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package vshard_router //nolint:revive

import (
"context"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -16,9 +15,7 @@ func TestRouter_BucketResolve_InvalidBucketID(t *testing.T) {
TotalBucketCount: uint64(10),
Loggerf: emptyLogfProvider,
},
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], 11),
},
concurrentData: newRouterConcurrentData(10),
}

_, err := r.BucketResolve(ctx, 20)
Expand Down
4 changes: 2 additions & 2 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
}

// ReplicaCall perform function on remote storage
// link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661
// link https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L661
// This method is deprecated, because looks like it has a little bit broken interface
func (rs *Replicaset) ReplicaCall(
ctx context.Context,
Expand Down Expand Up @@ -216,7 +216,7 @@ func (rs *Replicaset) bucketsDiscovery(ctx context.Context, from uint64) (bucket
// At each iteration, the algorithm either concludes or disregards at least
// one new overloaded replicaset. Therefore, its time complexity is O(N^2),
// where N is the number of replicasets.
// based on https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L1358
// based on https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L1358
func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error {
isBalanceFound := false
weightSum := 0.0
Expand Down
4 changes: 1 addition & 3 deletions tests/tnt/replicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,7 @@ func TestReplicasetBucketsCount(t *testing.T) {

require.NoError(t, err, "NewRouter finished successfully")
for _, rs := range router.RouterRouteAll() {
count := uint64(0)

count, err = rs.BucketsCount(ctx)
count, err := rs.BucketsCount(ctx)
require.NoError(t, err)
require.NotEqual(t, count, 0)
}
Expand Down
30 changes: 8 additions & 22 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,6 @@ type TopologyController interface {
AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
}

func (r *Router) getIDToReplicaset() map[uuid.UUID]*Replicaset {
r.idToReplicasetMutex.RLock()
idToReplicasetRef := r.idToReplicaset
r.idToReplicasetMutex.RUnlock()

return idToReplicasetRef
}

func (r *Router) setIDToReplicaset(idToReplicasetNew map[uuid.UUID]*Replicaset) {
r.idToReplicasetMutex.Lock()
r.idToReplicaset = idToReplicasetNew
r.idToReplicasetMutex.Unlock()
}

func (r *Router) Topology() TopologyController {
return r
}
Expand All @@ -62,7 +48,7 @@ func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceI
Opts: r.cfg.PoolOpts,
}

idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()

rs := idToReplicasetRef[rsID]
if rs == nil {
Expand All @@ -75,7 +61,7 @@ func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceI
func (r *Router) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error {
r.log().Debugf(ctx, "Trying to remove instance %s from router topology in rs %s", instanceID, rsID)

idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()

rs := idToReplicasetRef[rsID]
if rs == nil {
Expand All @@ -88,7 +74,7 @@ func (r *Router) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID)
func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error {
r.log().Debugf(ctx, "Trying to add replicaset %s to router topology", rsInfo)

idToReplicasetOld := r.getIDToReplicaset()
idToReplicasetOld, _ := r.concurrentData.getRefs()

if _, ok := idToReplicasetOld[rsInfo.UUID]; ok {
return ErrReplicasetExists
Expand Down Expand Up @@ -138,7 +124,7 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
replicaset.conn = conn

// Create an entirely new map object
idToReplicasetNew := make(map[uuid.UUID]*Replicaset)
idToReplicasetNew := make(UUIDToReplicasetMap)
for k, v := range idToReplicasetOld {
idToReplicasetNew[k] = v
}
Expand All @@ -148,7 +134,7 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
// by comparing references to r.idToReplicaset and idToReplicasetOld.
// But it requires reflection which I prefer to avoid.
// See: https://stackoverflow.com/questions/58636694/how-to-know-if-2-go-maps-reference-the-same-data.
r.setIDToReplicaset(idToReplicasetNew)
r.concurrentData.setIDToReplicaset(idToReplicasetNew)

return nil
}
Expand All @@ -171,21 +157,21 @@ func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetI
func (r *Router) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error {
r.log().Debugf(ctx, "Trying to remove replicaset %s from router topology", rsID)

idToReplicasetOld := r.getIDToReplicaset()
idToReplicasetOld, _ := r.concurrentData.getRefs()

rs := idToReplicasetOld[rsID]
if rs == nil {
return []error{ErrReplicasetNotExists}
}

// Create an entirely new map object
idToReplicasetNew := make(map[uuid.UUID]*Replicaset)
idToReplicasetNew := make(UUIDToReplicasetMap)
for k, v := range idToReplicasetOld {
idToReplicasetNew[k] = v
}
delete(idToReplicasetNew, rsID)

r.setIDToReplicaset(idToReplicasetNew)
r.concurrentData.setIDToReplicaset(idToReplicasetNew)

return rs.conn.CloseGraceful()
}
Loading

0 comments on commit 8783459

Please sign in to comment.