Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #36 (#40)
Browse files Browse the repository at this point in the history
* make the idToReplicaset map immutable by convention
	we create an entirely new object instead of updating the old one
* remove proxy object 'struct controller'
* update CHANGELOG.md
  • Loading branch information
nurzhan-saktaganov authored Aug 30, 2024
1 parent ec3d0cd commit cf02580
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 54 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ BUG FIXES:
* BucketStat: always returns non-nil err, fixed
* DiscoveryAllBuckets returns nil even if errGr.Wait() returns err, fixed
* DiscoveryHandleBuckets: misusage of atomics, fixed
* race when accessing to idToReplicaset, fixed: idToReplicaset is immutable object now

FEATURES:

Expand All @@ -20,6 +21,7 @@ REFACTOR:
* Several linters are enabled because they are usefull
* Ignore .tmp files
* Refactored provider creation test caused by golang-ci lint (#33)
* Router implements directly TopologyController, no proxy object is used now



Expand Down
23 changes: 17 additions & 6 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ func (r *Router) RouterCallImpl(ctx context.Context,
}

// call function "storage_unref" if map_callrw is failed or successed
func (r *Router) callStorageUnref(refID int64) {
func (r *Router) callStorageUnref(idToReplicasetRef map[uuid.UUID]*Replicaset, refID int64) {
req := tarantool.NewCallRequest("vshard.storage._call")
req = req.Args([]interface{}{"storage_unref", refID})

for _, replicaset := range r.idToReplicaset {
for _, replicaset := range idToReplicasetRef {
conn := replicaset.conn

future := conn.Do(req, pool.RW)
Expand Down Expand Up @@ -265,7 +265,9 @@ func (r *Router) RouterMapCallRWImpl(
refID := r.refID.Load()
r.refID.Add(1)

defer r.callStorageUnref(refID)
idToReplicasetRef := r.getIDToReplicaset()

defer r.callStorageUnref(idToReplicasetRef, refID)

mapCallCtx, cancel := context.WithTimeout(ctx, timeout)

Expand All @@ -286,7 +288,7 @@ func (r *Router) RouterMapCallRWImpl(
g.Go(func() error {
defer close(rsFutures)

for id, replicaset := range r.idToReplicaset {
for id, replicaset := range idToReplicasetRef {
conn := replicaset.conn

future := conn.Do(req, pool.RW)
Expand Down Expand Up @@ -372,7 +374,7 @@ func (r *Router) RouterMapCallRWImpl(
g.Go(func() error {
defer close(rsFutures)

for id, replicaset := range r.idToReplicaset {
for id, replicaset := range idToReplicasetRef {
conn := replicaset.conn

future := conn.Do(req, pool.RW)
Expand Down Expand Up @@ -479,5 +481,14 @@ func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset,

// RouterRouteAll return map of all replicasets.
func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset {
return r.idToReplicaset
idToReplicasetRef := r.getIDToReplicaset()

// Do not expose the original map to prevent unauthorized modification.
idToReplicasetCopy := make(map[uuid.UUID]*Replicaset)

for k, v := range idToReplicasetRef {
idToReplicasetCopy[k] = v
}

return idToReplicasetCopy
}
10 changes: 7 additions & 3 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica

r.cfg.Logger.Info(ctx, fmt.Sprintf("Discovering bucket %d", bucketID))

idToReplicasetRef := r.getIDToReplicaset()

wg := sync.WaitGroup{}
wg.Add(len(r.idToReplicaset))
wg.Add(len(idToReplicasetRef))

var err error
var resultRs *Replicaset

for rsID, rs := range r.idToReplicaset {
for rsID, rs := range idToReplicasetRef {
rsID := rsID
go func(_rs *Replicaset) {
defer wg.Done()
Expand Down Expand Up @@ -164,7 +166,9 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {

errGr, ctx := errgroup.WithContext(ctx)

for _, rs := range r.idToReplicaset {
idToReplicasetRef := r.getIDToReplicaset()

for _, rs := range idToReplicasetRef {
rs := rs

errGr.Go(func() error {
Expand Down
120 changes: 80 additions & 40 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ import (
"github.com/tarantool/go-tarantool/v2/pool"
)

var ErrReplicasetNotExists = fmt.Errorf("replicaset not exists")
var (
ErrReplicasetExists = fmt.Errorf("replicaset exists")
ErrReplicasetNotExists = fmt.Errorf("replicaset not exists")
)

// TopologyController is an entity that allows you to interact with the topology.
// TopologyController is not concurrent safe.
// This decision is made intentionally because there is no point in providing concurrence safety for this case.
// In any case, a caller can use his own external synchronization primitive to handle concurrent access.
type TopologyController interface {
AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error
RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error
Expand All @@ -20,16 +27,25 @@ type TopologyController interface {
AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
}

// TopologyController is an entity that allows you to interact with the topology
type controller struct {
r *Router
func (r *Router) getIDToReplicaset() map[uuid.UUID]*Replicaset {
r.idToReplicasetMutex.RLock()
idToReplicasetRef := r.idToReplicaset
r.idToReplicasetMutex.RUnlock()

return idToReplicasetRef
}

func (r *Router) setIDToReplicaset(idToReplicasetNew map[uuid.UUID]*Replicaset) {
r.idToReplicasetMutex.Lock()
r.idToReplicaset = idToReplicasetNew
r.idToReplicasetMutex.Unlock()
}

func (r *Router) Topology() TopologyController {
return &controller{r: r}
return r
}

func (c *controller) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error {
func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error {
err := info.Validate()
if err != nil {
return err
Expand All @@ -39,57 +55,59 @@ func (c *controller) AddInstance(ctx context.Context, rsID uuid.UUID, info Insta
Name: info.UUID.String(),
Dialer: tarantool.NetDialer{
Address: info.Addr,
User: c.r.cfg.User,
Password: c.r.cfg.Password,
User: r.cfg.User,
Password: r.cfg.Password,
},
}

rs := c.r.idToReplicaset[rsID]
idToReplicasetRef := r.getIDToReplicaset()

rs := idToReplicasetRef[rsID]
if rs == nil {
return ErrReplicasetNotExists
}

return rs.conn.Add(ctx, instance)
}

func (c *controller) RemoveInstance(_ context.Context, rsID, instanceID uuid.UUID) error {
rs := c.r.idToReplicaset[rsID]
func (r *Router) RemoveInstance(_ context.Context, rsID, instanceID uuid.UUID) error {
idToReplicasetRef := r.getIDToReplicaset()

rs := idToReplicasetRef[rsID]
if rs == nil {
return ErrReplicasetNotExists
}

return rs.conn.Remove(instanceID.String())
}

func (c *controller) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error {
router := c.r
cfg := router.cfg
func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error {
idToReplicasetOld := r.getIDToReplicaset()

if _, ok := idToReplicasetOld[rsInfo.UUID]; ok {
return ErrReplicasetExists
}

replicaset := &Replicaset{
info: ReplicasetInfo{
Name: rsInfo.Name,
UUID: rsInfo.UUID,
},
// according to the documentation, it will be initialized by zero, see: https://pkg.go.dev/sync/atomic#Int32
bucketCount: atomic.Int32{},
}

replicaset.bucketCount.Store(0)

rsInstances := make([]pool.Instance, len(instances))

for i, instance := range instances {
dialer := tarantool.NetDialer{
Address: instance.Addr,
User: cfg.User,
Password: cfg.Password,
}
inst := pool.Instance{
Name: instance.UUID.String(),
Dialer: dialer,
Opts: router.cfg.PoolOpts,
}

rsInstances[i] = inst
rsInstances := make([]pool.Instance, 0, len(instances))
for _, instance := range instances {
rsInstances = append(rsInstances, pool.Instance{
Name: instance.UUID.String(),
Dialer: tarantool.NetDialer{
Address: instance.Addr,
User: r.cfg.User,
Password: r.cfg.Password,
},
Opts: r.cfg.PoolOpts,
})
}

conn, err := pool.Connect(ctx, rsInstances)
Expand All @@ -107,14 +125,30 @@ func (c *controller) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, i
}

replicaset.conn = conn
router.idToReplicaset[rsInfo.UUID] = replicaset // add when conn is ready

// Create an entirely new map object
idToReplicasetNew := make(map[uuid.UUID]*Replicaset)
for k, v := range idToReplicasetOld {
idToReplicasetNew[k] = v
}
idToReplicasetNew[rsInfo.UUID] = replicaset // add when conn is ready

// We could detect concurrent access to the TopologyController interface
// by comparing references to r.idToReplicaset and idToReplicasetOld.
// But it requires reflection which I prefer to avoid.
// See: https://stackoverflow.com/questions/58636694/how-to-know-if-2-go-maps-reference-the-same-data.
r.setIDToReplicaset(idToReplicasetNew)

return nil
}

func (c *controller) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error {
func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error {
for rsInfo, rsInstances := range replicasets {
err := c.AddReplicaset(ctx, rsInfo, rsInstances)
// We assume that AddReplicasets is called only once during initialization.
// We also expect that cluster configuration changes very rarely,
// so we prefer more simple code rather than the efficiency of this part of logic.
// Even if there are 1000 replicasets, it is still cheap.
err := r.AddReplicaset(ctx, rsInfo, rsInstances)
if err != nil {
return err
}
Expand All @@ -123,16 +157,22 @@ func (c *controller) AddReplicasets(ctx context.Context, replicasets map[Replica
return nil
}

func (c *controller) RemoveReplicaset(_ context.Context, rsID uuid.UUID) []error {
r := c.r
func (r *Router) RemoveReplicaset(_ context.Context, rsID uuid.UUID) []error {
idToReplicasetOld := r.getIDToReplicaset()

rs := r.idToReplicaset[rsID]
rs := idToReplicasetOld[rsID]
if rs == nil {
return []error{ErrReplicasetNotExists}
}

errors := rs.conn.CloseGraceful()
delete(r.idToReplicaset, rsID)
// Create an entirely new map object
idToReplicasetNew := make(map[uuid.UUID]*Replicaset)
for k, v := range idToReplicasetOld {
idToReplicasetNew[k] = v
}
delete(idToReplicasetNew, rsID)

r.setIDToReplicaset(idToReplicasetNew)

return errors
return rs.conn.CloseGraceful()
}
23 changes: 18 additions & 5 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,18 @@ var (
type Router struct {
cfg Config

idToReplicaset map[uuid.UUID]*Replicaset
routeMap []*Replicaset
searchLock searchLock
// idToReplicasetMutex guards not the map itself, but the variable idToReplicaset.
// idToReplicaset is an immutable object by our convention.
// Whenever we add or remove a replicaset, we create a new map object.
// idToReplicaset can be modified only by TopologyController methods.
// Assuming that we rarely add or remove some replicaset,
// it should be the simplest and most efficient way of handling concurrent access.
// Additionally, we can safely iterate over a map because it never changes.
idToReplicasetMutex sync.RWMutex
idToReplicaset map[uuid.UUID]*Replicaset

routeMap []*Replicaset
searchLock searchLock

knownBucketCount atomic.Int32

Expand Down Expand Up @@ -144,7 +153,9 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {

// BucketSet Set a bucket to a replicaset.
func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error) {
rs := r.idToReplicaset[rsID]
idToReplicasetRef := r.getIDToReplicaset()

rs := idToReplicasetRef[rsID]
if rs == nil {
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
}
Expand Down Expand Up @@ -176,10 +187,12 @@ func (r *Router) BucketReset(bucketID uint64) {
}

func (r *Router) RouteMapClean() {
idToReplicasetRef := r.getIDToReplicaset()

r.routeMap = make([]*Replicaset, r.cfg.TotalBucketCount+1)
r.knownBucketCount.Store(0)

for _, rs := range r.idToReplicaset {
for _, rs := range idToReplicasetRef {
rs.bucketCount.Store(0)
}
}
Expand Down

0 comments on commit cf02580

Please sign in to comment.