diff --git a/CHANGELOG.md b/CHANGELOG.md index 2204454..5997b8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ BUG FIXES: * BucketStat: always returns non-nil err, fixed * DiscoveryAllBuckets returns nil even if errGr.Wait() returns err, fixed * DiscoveryHandleBuckets: misusage of atomics, fixed +* race when accessing to idToReplicaset, fixed: idToReplicaset is immutable object now FEATURES: @@ -20,6 +21,7 @@ REFACTOR: * Several linters are enabled because they are usefull * Ignore .tmp files * Refactored provider creation test caused by golang-ci lint (#33) +* Router implements directly TopologyController, no proxy object is used now diff --git a/api.go b/api.go index a2930cf..61f8ded 100644 --- a/api.go +++ b/api.go @@ -229,11 +229,11 @@ func (r *Router) RouterCallImpl(ctx context.Context, } // call function "storage_unref" if map_callrw is failed or successed -func (r *Router) callStorageUnref(refID int64) { +func (r *Router) callStorageUnref(idToReplicasetRef map[uuid.UUID]*Replicaset, refID int64) { req := tarantool.NewCallRequest("vshard.storage._call") req = req.Args([]interface{}{"storage_unref", refID}) - for _, replicaset := range r.idToReplicaset { + for _, replicaset := range idToReplicasetRef { conn := replicaset.conn future := conn.Do(req, pool.RW) @@ -265,7 +265,9 @@ func (r *Router) RouterMapCallRWImpl( refID := r.refID.Load() r.refID.Add(1) - defer r.callStorageUnref(refID) + idToReplicasetRef := r.getIDToReplicaset() + + defer r.callStorageUnref(idToReplicasetRef, refID) mapCallCtx, cancel := context.WithTimeout(ctx, timeout) @@ -286,7 +288,7 @@ func (r *Router) RouterMapCallRWImpl( g.Go(func() error { defer close(rsFutures) - for id, replicaset := range r.idToReplicaset { + for id, replicaset := range idToReplicasetRef { conn := replicaset.conn future := conn.Do(req, pool.RW) @@ -372,7 +374,7 @@ func (r *Router) RouterMapCallRWImpl( g.Go(func() error { defer close(rsFutures) - for id, replicaset := range r.idToReplicaset { + for id, replicaset := range idToReplicasetRef { conn := replicaset.conn future := conn.Do(req, pool.RW) @@ -479,5 +481,14 @@ func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset, // RouterRouteAll return map of all replicasets. func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset { - return r.idToReplicaset + idToReplicasetRef := r.getIDToReplicaset() + + // Do not expose the original map to prevent unauthorized modification. + idToReplicasetCopy := make(map[uuid.UUID]*Replicaset) + + for k, v := range idToReplicasetRef { + idToReplicasetCopy[k] = v + } + + return idToReplicasetCopy } diff --git a/discovery.go b/discovery.go index 5e19163..cf79b53 100644 --- a/discovery.go +++ b/discovery.go @@ -65,13 +65,15 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica r.cfg.Logger.Info(ctx, fmt.Sprintf("Discovering bucket %d", bucketID)) + idToReplicasetRef := r.getIDToReplicaset() + wg := sync.WaitGroup{} - wg.Add(len(r.idToReplicaset)) + wg.Add(len(idToReplicasetRef)) var err error var resultRs *Replicaset - for rsID, rs := range r.idToReplicaset { + for rsID, rs := range idToReplicasetRef { rsID := rsID go func(_rs *Replicaset) { defer wg.Done() @@ -164,7 +166,9 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error { errGr, ctx := errgroup.WithContext(ctx) - for _, rs := range r.idToReplicaset { + idToReplicasetRef := r.getIDToReplicaset() + + for _, rs := range idToReplicasetRef { rs := rs errGr.Go(func() error { diff --git a/topology.go b/topology.go index a2ae9d6..3cbab92 100644 --- a/topology.go +++ b/topology.go @@ -10,8 +10,15 @@ import ( "github.com/tarantool/go-tarantool/v2/pool" ) -var ErrReplicasetNotExists = fmt.Errorf("replicaset not exists") +var ( + ErrReplicasetExists = fmt.Errorf("replicaset exists") + ErrReplicasetNotExists = fmt.Errorf("replicaset not exists") +) +// TopologyController is an entity that allows you to interact with the topology. +// TopologyController is not concurrent safe. +// This decision is made intentionally because there is no point in providing concurrence safety for this case. +// In any case, a caller can use his own external synchronization primitive to handle concurrent access. type TopologyController interface { AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error @@ -20,16 +27,25 @@ type TopologyController interface { AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error } -// TopologyController is an entity that allows you to interact with the topology -type controller struct { - r *Router +func (r *Router) getIDToReplicaset() map[uuid.UUID]*Replicaset { + r.idToReplicasetMutex.RLock() + idToReplicasetRef := r.idToReplicaset + r.idToReplicasetMutex.RUnlock() + + return idToReplicasetRef +} + +func (r *Router) setIDToReplicaset(idToReplicasetNew map[uuid.UUID]*Replicaset) { + r.idToReplicasetMutex.Lock() + r.idToReplicaset = idToReplicasetNew + r.idToReplicasetMutex.Unlock() } func (r *Router) Topology() TopologyController { - return &controller{r: r} + return r } -func (c *controller) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error { +func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error { err := info.Validate() if err != nil { return err @@ -39,12 +55,14 @@ func (c *controller) AddInstance(ctx context.Context, rsID uuid.UUID, info Insta Name: info.UUID.String(), Dialer: tarantool.NetDialer{ Address: info.Addr, - User: c.r.cfg.User, - Password: c.r.cfg.Password, + User: r.cfg.User, + Password: r.cfg.Password, }, } - rs := c.r.idToReplicaset[rsID] + idToReplicasetRef := r.getIDToReplicaset() + + rs := idToReplicasetRef[rsID] if rs == nil { return ErrReplicasetNotExists } @@ -52,8 +70,10 @@ func (c *controller) AddInstance(ctx context.Context, rsID uuid.UUID, info Insta return rs.conn.Add(ctx, instance) } -func (c *controller) RemoveInstance(_ context.Context, rsID, instanceID uuid.UUID) error { - rs := c.r.idToReplicaset[rsID] +func (r *Router) RemoveInstance(_ context.Context, rsID, instanceID uuid.UUID) error { + idToReplicasetRef := r.getIDToReplicaset() + + rs := idToReplicasetRef[rsID] if rs == nil { return ErrReplicasetNotExists } @@ -61,35 +81,33 @@ func (c *controller) RemoveInstance(_ context.Context, rsID, instanceID uuid.UUI return rs.conn.Remove(instanceID.String()) } -func (c *controller) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error { - router := c.r - cfg := router.cfg +func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error { + idToReplicasetOld := r.getIDToReplicaset() + + if _, ok := idToReplicasetOld[rsInfo.UUID]; ok { + return ErrReplicasetExists + } replicaset := &Replicaset{ info: ReplicasetInfo{ Name: rsInfo.Name, UUID: rsInfo.UUID, }, + // according to the documentation, it will be initialized by zero, see: https://pkg.go.dev/sync/atomic#Int32 bucketCount: atomic.Int32{}, } - replicaset.bucketCount.Store(0) - - rsInstances := make([]pool.Instance, len(instances)) - - for i, instance := range instances { - dialer := tarantool.NetDialer{ - Address: instance.Addr, - User: cfg.User, - Password: cfg.Password, - } - inst := pool.Instance{ - Name: instance.UUID.String(), - Dialer: dialer, - Opts: router.cfg.PoolOpts, - } - - rsInstances[i] = inst + rsInstances := make([]pool.Instance, 0, len(instances)) + for _, instance := range instances { + rsInstances = append(rsInstances, pool.Instance{ + Name: instance.UUID.String(), + Dialer: tarantool.NetDialer{ + Address: instance.Addr, + User: r.cfg.User, + Password: r.cfg.Password, + }, + Opts: r.cfg.PoolOpts, + }) } conn, err := pool.Connect(ctx, rsInstances) @@ -107,14 +125,30 @@ func (c *controller) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, i } replicaset.conn = conn - router.idToReplicaset[rsInfo.UUID] = replicaset // add when conn is ready + + // Create an entirely new map object + idToReplicasetNew := make(map[uuid.UUID]*Replicaset) + for k, v := range idToReplicasetOld { + idToReplicasetNew[k] = v + } + idToReplicasetNew[rsInfo.UUID] = replicaset // add when conn is ready + + // We could detect concurrent access to the TopologyController interface + // by comparing references to r.idToReplicaset and idToReplicasetOld. + // But it requires reflection which I prefer to avoid. + // See: https://stackoverflow.com/questions/58636694/how-to-know-if-2-go-maps-reference-the-same-data. + r.setIDToReplicaset(idToReplicasetNew) return nil } -func (c *controller) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error { +func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error { for rsInfo, rsInstances := range replicasets { - err := c.AddReplicaset(ctx, rsInfo, rsInstances) + // We assume that AddReplicasets is called only once during initialization. + // We also expect that cluster configuration changes very rarely, + // so we prefer more simple code rather than the efficiency of this part of logic. + // Even if there are 1000 replicasets, it is still cheap. + err := r.AddReplicaset(ctx, rsInfo, rsInstances) if err != nil { return err } @@ -123,16 +157,22 @@ func (c *controller) AddReplicasets(ctx context.Context, replicasets map[Replica return nil } -func (c *controller) RemoveReplicaset(_ context.Context, rsID uuid.UUID) []error { - r := c.r +func (r *Router) RemoveReplicaset(_ context.Context, rsID uuid.UUID) []error { + idToReplicasetOld := r.getIDToReplicaset() - rs := r.idToReplicaset[rsID] + rs := idToReplicasetOld[rsID] if rs == nil { return []error{ErrReplicasetNotExists} } - errors := rs.conn.CloseGraceful() - delete(r.idToReplicaset, rsID) + // Create an entirely new map object + idToReplicasetNew := make(map[uuid.UUID]*Replicaset) + for k, v := range idToReplicasetOld { + idToReplicasetNew[k] = v + } + delete(idToReplicasetNew, rsID) + + r.setIDToReplicaset(idToReplicasetNew) - return errors + return rs.conn.CloseGraceful() } diff --git a/vshard.go b/vshard.go index 8ffc1bd..01d4f04 100644 --- a/vshard.go +++ b/vshard.go @@ -21,9 +21,18 @@ var ( type Router struct { cfg Config - idToReplicaset map[uuid.UUID]*Replicaset - routeMap []*Replicaset - searchLock searchLock + // idToReplicasetMutex guards not the map itself, but the variable idToReplicaset. + // idToReplicaset is an immutable object by our convention. + // Whenever we add or remove a replicaset, we create a new map object. + // idToReplicaset can be modified only by TopologyController methods. + // Assuming that we rarely add or remove some replicaset, + // it should be the simplest and most efficient way of handling concurrent access. + // Additionally, we can safely iterate over a map because it never changes. + idToReplicasetMutex sync.RWMutex + idToReplicaset map[uuid.UUID]*Replicaset + + routeMap []*Replicaset + searchLock searchLock knownBucketCount atomic.Int32 @@ -144,7 +153,9 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) { // BucketSet Set a bucket to a replicaset. func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error) { - rs := r.idToReplicaset[rsID] + idToReplicasetRef := r.getIDToReplicaset() + + rs := idToReplicasetRef[rsID] if rs == nil { return nil, Errors[9] // NO_ROUTE_TO_BUCKET } @@ -176,10 +187,12 @@ func (r *Router) BucketReset(bucketID uint64) { } func (r *Router) RouteMapClean() { + idToReplicasetRef := r.getIDToReplicaset() + r.routeMap = make([]*Replicaset, r.cfg.TotalBucketCount+1) r.knownBucketCount.Store(0) - for _, rs := range r.idToReplicaset { + for _, rs := range idToReplicasetRef { rs.bucketCount.Store(0) } }