Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
bug/discovery lock (#20)
Browse files Browse the repository at this point in the history
* fix comment of ReplicaCall
* rename rec pointer name
* fix discovery range
* add more tests & use search lock struct instead
---------

Co-authored-by: maxim-konovalov <maksim.konovalov@vk.team>
  • Loading branch information
KaymeKaydex and maxim-konovalov authored May 6, 2024
1 parent 9c5be22 commit 9e72b8c
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 33 deletions.
39 changes: 30 additions & 9 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,43 @@ const (
DiscoveryModeOnce
)

type searchLock struct {
mu sync.RWMutex
perBucket []chan struct{}
}

func (s *searchLock) WaitOnSearch(bucketID uint64) {
ch := s.perBucket[bucketID]
if ch == nil {
return
}

<-ch
}

func (s *searchLock) StartSearch(bucketID uint64) chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()

ch := make(chan struct{})
s.perBucket[bucketID] = ch

return ch
}

// BucketDiscovery search bucket in whole cluster
func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replicaset, error) {
r.searchLock.mu.Lock() // локаем чтобы понять можно ли начать ли поиск и не пытается ли узнать другой бакет что искать и записать свой лок канал
<-r.searchLock.perBucket[bucketID] // проверяем что этот бакет ранее не вошел в поиск
r.searchLock.WaitOnSearch(bucketID)

rs := r.routeMap[bucketID]
if rs != nil {
r.searchLock.mu.Unlock()

return rs, nil
}

lockCh := make(chan struct{})
r.searchLock.perBucket[bucketID] = lockCh
r.searchLock.mu.Unlock()

defer close(lockCh)
// it`s ok if in the same time we have few active searches
// mu per bucket is expansive
stopSearchCh := r.searchLock.StartSearch(bucketID)
defer close(stopSearchCh)

r.cfg.Logger.Info(ctx, fmt.Sprintf("Discovering bucket %d", bucketID))

Expand All @@ -52,6 +72,7 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
var resultRs *Replicaset

for rsID, rs := range r.idToReplicaset {
rsID := rsID
go func(_rs *Replicaset) {
defer wg.Done()
_, errStat := _rs.bucketStat(ctx, bucketID)
Expand Down
35 changes: 35 additions & 0 deletions discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package vshard_router

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestSearchLock_WaitOnSearch(t *testing.T) {
lock := searchLock{
mu: sync.RWMutex{},
perBucket: make([]chan struct{}, 10),
}

noLockStart := time.Now()
lock.WaitOnSearch(5)
require.True(t, time.Since(noLockStart) < time.Millisecond)

lockStart := time.Now()
chStopSearch := lock.StartSearch(3)
go func() {
time.Sleep(time.Millisecond * 10)
close(chStopSearch)
}()

noLockStart = time.Now()
lock.WaitOnSearch(5)
require.True(t, time.Since(noLockStart) < time.Millisecond)

lock.WaitOnSearch(3)

require.True(t, time.Since(lockStart) < 12*time.Millisecond && time.Since(lockStart) > 9*time.Millisecond)
}
6 changes: 5 additions & 1 deletion replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Replicaset struct {
bucketCount atomic.Int32
}

func (rs *Replicaset) String() string {
return fmt.Sprintf("%s:%s", rs.info.Name, rs.info.UUID.String())
}

func (rs *Replicaset) bucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error) {
bsInfo := &BucketStatInfo{}
bsError := &BucketStatError{}
Expand Down Expand Up @@ -62,7 +66,7 @@ type ReplicasetCallOpts struct {
Timeout time.Duration
}

// ReplicasetCallImpl perform function on remote storage
// ReplicaCall perform function on remote storage
// link https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L661
func (rs *Replicaset) ReplicaCall(
ctx context.Context,
Expand Down
24 changes: 12 additions & 12 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,24 @@ func (r *Router) Topology() TopologyController {
return &controller{r: r}
}

func (t *controller) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error {
func (c *controller) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error {
instance := pool.Instance{
Name: info.UUID.String(),
Dialer: tarantool.NetDialer{
Address: info.Addr,
User: t.r.cfg.User,
Password: t.r.cfg.Password,
User: c.r.cfg.User,
Password: c.r.cfg.Password,
},
}
return t.r.idToReplicaset[rsID].conn.Add(ctx, instance)
return c.r.idToReplicaset[rsID].conn.Add(ctx, instance)
}

func (t *controller) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error {
return t.r.idToReplicaset[rsID].conn.Remove(instanceID.String())
func (c *controller) RemoveInstance(ctx context.Context, rsID, instanceID uuid.UUID) error {
return c.r.idToReplicaset[rsID].conn.Remove(instanceID.String())
}

func (t *controller) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error {
router := t.r
func (c *controller) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error {
router := c.r
cfg := router.cfg

replicaset := &Replicaset{
Expand Down Expand Up @@ -94,9 +94,9 @@ func (t *controller) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, i
return nil
}

func (t *controller) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error {
func (c *controller) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error {
for rsInfo, rsInstances := range replicasets {
err := t.AddReplicaset(ctx, rsInfo, rsInstances)
err := c.AddReplicaset(ctx, rsInfo, rsInstances)
if err != nil {
return err
}
Expand All @@ -105,8 +105,8 @@ func (t *controller) AddReplicasets(ctx context.Context, replicasets map[Replica
return nil
}

func (t *controller) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error {
r := t.r
func (c *controller) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error {
r := c.r

errors := r.idToReplicaset[rsID].conn.CloseGraceful()
delete(r.idToReplicaset, rsID)
Expand Down
16 changes: 5 additions & 11 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ type Router struct {

idToReplicaset map[uuid.UUID]*Replicaset
routeMap []*Replicaset
searchLock struct {
mu sync.Mutex // запись для per bucket
perBucket []chan struct{}
}
searchLock searchLock

knownBucketCount atomic.Int32

Expand Down Expand Up @@ -86,13 +83,10 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
}

router := &Router{
cfg: cfg,
idToReplicaset: make(map[uuid.UUID]*Replicaset),
routeMap: make([]*Replicaset, cfg.TotalBucketCount+1),
searchLock: struct {
mu sync.Mutex
perBucket []chan struct{}
}{mu: sync.Mutex{}, perBucket: make([]chan struct{}, cfg.TotalBucketCount+1)},
cfg: cfg,
idToReplicaset: make(map[uuid.UUID]*Replicaset),
routeMap: make([]*Replicaset, cfg.TotalBucketCount+1),
searchLock: searchLock{mu: sync.RWMutex{}, perBucket: make([]chan struct{}, cfg.TotalBucketCount+1)},
knownBucketCount: atomic.Int32{},
}

Expand Down

0 comments on commit 9e72b8c

Please sign in to comment.