Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

review fixes for #105 #116

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

BUG FIXES:
* Fix decoding fields for StorageCallVShardError (MasterUUID, ReplicasetUUID).
* ClusterBootstrap: eliminate direct access to r.idToReplicaset.

CHANGES:
* Add comment why and how we handle "NON_MASTER" vshard error.
Expand All @@ -12,6 +13,7 @@ CHANGES:
* Decode 'vshard.storage.call' response manually into struct vshardStorageCallResponseProto using DecodeMsgpack interface to reduce allocations (partially #61, #100).
* Remove `mapstructure` tag from StorageCallVShardError.
* Update benchmarks in README files.
* Replace github links to master branch with permalinks.

FEATURES:

Expand All @@ -26,6 +28,7 @@ REFACTOR:
* Remove bucketStatError type, use StorageCallVShardError type instead.
* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100).
* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content).
* Hide router's concurrent data under interface 'routerConcurrentData' to prevent misusage by developers.

TESTS:
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.
Expand Down
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ test: prepare-tnt
cover: test ## Generate and open the HTML report for test coverage.
$(GO_CMD) tool cover -html=coverage.out

test/integration:
@$(MAKE) -C ./tests/integration test

generate/mocks:
mockery --name=Pool --case=underscore --output=mocks/pool --outpkg=mockpool # need fix it later
mockery --name=TopologyController --case=underscore --output=mocks/topology --outpkg=mocktopology
Expand Down
12 changes: 6 additions & 6 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,

var loggedOnce bool
for {
idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()
if _, ok := idToReplicasetRef[destinationUUID]; ok {
_, err := r.BucketSet(bucketID, destinationUUID)
if err == nil {
Expand Down Expand Up @@ -355,9 +355,9 @@ func (r *Router) RouterMapCallRWImpl(
}

timeStart := time.Now()
refID := r.refID.Add(1)
refID := r.concurrentData.nextRefID()

idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()

defer func() {
// call function "storage_unref" if map_callrw is failed or successed
Expand Down Expand Up @@ -509,11 +509,11 @@ func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset,
}

// RouterRouteAll return map of all replicasets.
func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset {
idToReplicasetRef := r.getIDToReplicaset()
func (r *Router) RouterRouteAll() UUIDToReplicasetMap {
idToReplicasetRef, _ := r.concurrentData.getRefs()

// Do not expose the original map to prevent unauthorized modification.
idToReplicasetCopy := make(map[uuid.UUID]*Replicaset)
idToReplicasetCopy := make(UUIDToReplicasetMap)

for k, v := range idToReplicasetRef {
idToReplicasetCopy[k] = v
Expand Down
8 changes: 3 additions & 5 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package vshard_router // nolint: revive
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

Expand All @@ -19,6 +18,7 @@ var emptyRouter = &Router{
Loggerf: emptyLogfProvider,
Metrics: emptyMetricsProvider,
},
concurrentData: newRouterConcurrentData(10),
}

func TestVshardMode_String_NotEmpty(t *testing.T) {
Expand Down Expand Up @@ -51,9 +51,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
Loggerf: emptyLogfProvider,
Metrics: emptyMetricsProvider,
},
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], 11),
},
concurrentData: newRouterConcurrentData(10),
}

futureError := fmt.Errorf("testErr")
Expand All @@ -63,7 +61,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
mPool := mockpool.NewPool(t)
mPool.On("Do", mock.Anything, mock.Anything).Return(errFuture)

r.view.routeMap[5].Store(&Replicaset{
r.concurrentData.(*routerConcurrentDataImpl).view.routeMap[5].Store(&Replicaset{
conn: mPool,
})

Expand Down
71 changes: 71 additions & 0 deletions concurrent_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package vshard_router //nolint:revive

import (
"sync"
"sync/atomic"

"github.com/google/uuid"
)

type UUIDToReplicasetMap map[uuid.UUID]*Replicaset

type routerConcurrentData interface {
nextRefID() int64
getRefs() (UUIDToReplicasetMap, *consistentView)
setConsistentView(view *consistentView)
setIDToReplicaset(idToReplicasetNew UUIDToReplicasetMap)
}

// routerConcurrentDataImpl is the router's data that can be accessed concurrently.
type routerConcurrentDataImpl struct {
mutex sync.RWMutex

// mutex guards not the map itself, but the variable idToReplicaset.
// idToReplicaset is an immutable object by our convention.
// Whenever we add or remove a replicaset, we create a new map object.
// idToReplicaset variable can be modified only by setIDToReplicaset method.
// Assuming that we rarely change idToReplicaset.
// it should be the simplest and most efficient way of handling concurrent access.
// Additionally, we can safely iterate over a map because it never changes.
idToReplicaset UUIDToReplicasetMap
// See comment for type consistentView.
view *consistentView

// ----------------------- Map-Reduce -----------------------
// Storage Ref ID. It must be unique for each ref request
// and therefore is global and monotonically growing.
refID atomic.Int64
}

func newRouterConcurrentData(totalBucketCount uint64) routerConcurrentData {
return &routerConcurrentDataImpl{
idToReplicaset: make(UUIDToReplicasetMap),
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], totalBucketCount+1),
},
}
}

func (d *routerConcurrentDataImpl) nextRefID() int64 {
return d.refID.Add(1)
}

func (d *routerConcurrentDataImpl) getRefs() (UUIDToReplicasetMap, *consistentView) {
d.mutex.RLock()
idToReplicasetRef, view := d.idToReplicaset, d.view
d.mutex.RUnlock()

return idToReplicasetRef, view
}

func (d *routerConcurrentDataImpl) setConsistentView(view *consistentView) {
d.mutex.Lock()
d.view = view
d.mutex.Unlock()
}

func (d *routerConcurrentDataImpl) setIDToReplicaset(idToReplicasetNew UUIDToReplicasetMap) {
d.mutex.Lock()
d.idToReplicaset = idToReplicasetNew
d.mutex.Unlock()
}
12 changes: 5 additions & 7 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

view := r.getConsistentView()
_, view := r.concurrentData.getRefs()

rs := view.routeMap[bucketID].Load()
if rs != nil {
Expand All @@ -70,7 +70,7 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
}

func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Replicaset, error) {
idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()

type rsFuture struct {
rsID uuid.UUID
Expand Down Expand Up @@ -127,8 +127,7 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L1700
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/consts.lua#L37
func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) (*Replicaset, error) {
idToReplicasetRef := r.getIDToReplicaset()
view := r.getConsistentView()
idToReplicasetRef, view := r.concurrentData.getRefs()

type rsFuture struct {
rs *Replicaset
Expand Down Expand Up @@ -190,7 +189,7 @@ func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicase

// DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64) {
view := r.getConsistentView()
_, view := r.concurrentData.getRefs()
removedFrom := make(map[*Replicaset]int)

for _, bucketID := range buckets {
Expand Down Expand Up @@ -230,8 +229,7 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {

errGr, ctx := errgroup.WithContext(ctx)

view := r.getConsistentView()
idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, view := r.concurrentData.getRefs()

for _, rs := range idToReplicasetRef {
rs := rs
Expand Down
5 changes: 1 addition & 4 deletions discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package vshard_router //nolint:revive

import (
"context"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -16,9 +15,7 @@ func TestRouter_BucketResolve_InvalidBucketID(t *testing.T) {
TotalBucketCount: uint64(10),
Loggerf: emptyLogfProvider,
},
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], 11),
},
concurrentData: newRouterConcurrentData(10),
}

_, err := r.BucketResolve(ctx, 20)
Expand Down
4 changes: 2 additions & 2 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
}

// ReplicaCall perform function on remote storage
// link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661
// link https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L661
// This method is deprecated, because looks like it has a little bit broken interface
func (rs *Replicaset) ReplicaCall(
ctx context.Context,
Expand Down Expand Up @@ -216,7 +216,7 @@ func (rs *Replicaset) bucketsDiscovery(ctx context.Context, from uint64) (bucket
// At each iteration, the algorithm either concludes or disregards at least
// one new overloaded replicaset. Therefore, its time complexity is O(N^2),
// where N is the number of replicasets.
// based on https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L1358
// based on https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L1358
func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error {
isBalanceFound := false
weightSum := 0.0
Expand Down
4 changes: 1 addition & 3 deletions tests/tnt/replicaset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,7 @@ func TestReplicasetBucketsCount(t *testing.T) {

require.NoError(t, err, "NewRouter finished successfully")
for _, rs := range router.RouterRouteAll() {
count := uint64(0)

count, err = rs.BucketsCount(ctx)
count, err := rs.BucketsCount(ctx)
require.NoError(t, err)
require.NotEqual(t, count, 0)
}
Expand Down
30 changes: 8 additions & 22 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,6 @@ type TopologyController interface {
AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
}

func (r *Router) getIDToReplicaset() map[uuid.UUID]*Replicaset {
r.idToReplicasetMutex.RLock()
idToReplicasetRef := r.idToReplicaset
r.idToReplicasetMutex.RUnlock()

return idToReplicasetRef
}

func (r *Router) setIDToReplicaset(idToReplicasetNew map[uuid.UUID]*Replicaset) {
r.idToReplicasetMutex.Lock()
r.idToReplicaset = idToReplicasetNew
r.idToReplicasetMutex.Unlock()
}

func (r *Router) Topology() TopologyController {
return r
}
Expand All @@ -62,7 +48,7 @@ func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceI
Opts: r.cfg.PoolOpts,
}

idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()

rs := idToReplicasetRef[rsID]
if rs == nil {
Expand All @@ -75,7 +61,7 @@ func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceI
func (r *Router) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error {
r.log().Debugf(ctx, "Trying to remove instance %s from router topology in rs %s", instanceID, rsID)

idToReplicasetRef := r.getIDToReplicaset()
idToReplicasetRef, _ := r.concurrentData.getRefs()

rs := idToReplicasetRef[rsID]
if rs == nil {
Expand All @@ -88,7 +74,7 @@ func (r *Router) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID)
func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error {
r.log().Debugf(ctx, "Trying to add replicaset %s to router topology", rsInfo)

idToReplicasetOld := r.getIDToReplicaset()
idToReplicasetOld, _ := r.concurrentData.getRefs()

if _, ok := idToReplicasetOld[rsInfo.UUID]; ok {
return ErrReplicasetExists
Expand Down Expand Up @@ -138,7 +124,7 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
replicaset.conn = conn

// Create an entirely new map object
idToReplicasetNew := make(map[uuid.UUID]*Replicaset)
idToReplicasetNew := make(UUIDToReplicasetMap)
for k, v := range idToReplicasetOld {
idToReplicasetNew[k] = v
}
Expand All @@ -148,7 +134,7 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
// by comparing references to r.idToReplicaset and idToReplicasetOld.
// But it requires reflection which I prefer to avoid.
// See: https://stackoverflow.com/questions/58636694/how-to-know-if-2-go-maps-reference-the-same-data.
r.setIDToReplicaset(idToReplicasetNew)
r.concurrentData.setIDToReplicaset(idToReplicasetNew)

return nil
}
Expand All @@ -171,21 +157,21 @@ func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetI
func (r *Router) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error {
r.log().Debugf(ctx, "Trying to remove replicaset %s from router topology", rsID)

idToReplicasetOld := r.getIDToReplicaset()
idToReplicasetOld, _ := r.concurrentData.getRefs()

rs := idToReplicasetOld[rsID]
if rs == nil {
return []error{ErrReplicasetNotExists}
}

// Create an entirely new map object
idToReplicasetNew := make(map[uuid.UUID]*Replicaset)
idToReplicasetNew := make(UUIDToReplicasetMap)
for k, v := range idToReplicasetOld {
idToReplicasetNew[k] = v
}
delete(idToReplicasetNew, rsID)

r.setIDToReplicaset(idToReplicasetNew)
r.concurrentData.setIDToReplicaset(idToReplicasetNew)

return rs.conn.CloseGraceful()
}
Loading
Loading