From edc5eb51a6eaaf93e9d540708577c66d78ee0533 Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Mon, 16 Sep 2024 23:08:36 +0300 Subject: [PATCH] resolve issue #70 * BucketStat: split into bucketStatAsync and bucketStatWait parts * BucketDiscovery: do not spawn goroutines, just use futures in the single goroutine * BucketResolve: make it alias for BucketDiscovery --- CHANGELOG.md | 3 ++ discovery.go | 76 ++++++++++++++++++++------------------------------- replicaset.go | 25 +++++++++++++---- 3 files changed, 51 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6505fd..358fd76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,9 @@ REFACTOR: * resolve issue #38: simplify DiscoveryAllBuckets and remove suspicious if * resolve issue #46: drastically simplify RouterMapCallRWImpl and added tests with real tnt * Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs +* BucketStat: split into bucketStatAsync and bucketStatWait parts +* BucketDiscovery: do not spawn goroutines, just use futures in the single goroutine +* BucketResolve: make it alias for BucketDiscovery TESTS: diff --git a/discovery.go b/discovery.go index 60d4b64..f212f21 100644 --- a/discovery.go +++ b/discovery.go @@ -3,8 +3,6 @@ package vshard_router //nolint:revive import ( "context" "fmt" - "sync" - "sync/atomic" "time" "golang.org/x/sync/errgroup" @@ -28,6 +26,10 @@ const ( // BucketDiscovery search bucket in whole cluster func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replicaset, error) { + if bucketID < 1 || r.cfg.TotalBucketCount < bucketID { + return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) + } + view := r.getConsistentView() rs := view.routeMap[bucketID].Load() @@ -36,42 +38,39 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica } // it`s ok if in the same time we have few active searches - // mu per bucket is expansive r.log().Infof(ctx, "Discovering bucket %d", bucketID) idToReplicasetRef := r.getIDToReplicaset() - wg := sync.WaitGroup{} - wg.Add(len(idToReplicasetRef)) - - type result struct { - err error - rs *Replicaset + type rsFuture struct { + rsID uuid.UUID + future Future } - // This works only for go 1.19 or higher. To support older versions - // we can use mutex + conditional compilation that checks go version. - // Example for conditional compilation: https://www.youtube.com/watch?v=5eQBKqVlNQg - var resultAtomic = atomic.Pointer[result]{} - + var rsFutures = make([]rsFuture, 0, len(idToReplicasetRef)) + // Send a bunch of parallel requests for rsID, rs := range idToReplicasetRef { - go func(rs *Replicaset, rsID uuid.UUID) { - defer wg.Done() - if _, err := rs.BucketStat(ctx, bucketID); err == nil { - // It's ok if several replicasets return ok to bucket_stat command for the same bucketID, - // just pick any of them. - var res result - res.rs, res.err = r.BucketSet(bucketID, rsID) - resultAtomic.Store(&res) - } - }(rs, rsID) + rsFutures = append(rsFutures, rsFuture{ + rsID: rsID, + future: rs.bucketStatAsync(ctx, bucketID), + }) } - wg.Wait() + for _, rsFuture := range rsFutures { + if _, err := bucketStatWait(rsFuture.future); err != nil { + // just skip, bucket seems do not belong to this replicaset + continue + } + + // It's ok if several replicasets return ok to bucket_stat command for the same bucketID, just pick any of them. + rs, err := r.BucketSet(bucketID, rsFuture.rsID) + if err != nil { + r.log().Errorf(ctx, "BucketDiscovery: can't set rsID %v for bucketID %d: %v", rsFuture.rsID, bucketID, err) + return nil, Errors[9] // NO_ROUTE_TO_BUCKET + } - res := resultAtomic.Load() - if res == nil || res.err != nil || res.rs == nil { - return nil, Errors[9] // NO_ROUTE_TO_BUCKET + // TODO: should we release resources for unhandled futures? + return rs, nil } /* @@ -83,29 +82,12 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica -- discovery). */ - return res.rs, nil + return nil, Errors[9] // NO_ROUTE_TO_BUCKET } // BucketResolve resolve bucket id to replicaset func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error) { - if bucketID < 1 || r.cfg.TotalBucketCount < bucketID { - return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount) - } - - view := r.getConsistentView() - - rs := view.routeMap[bucketID].Load() - if rs != nil { - return rs, nil - } - - // Replicaset removed from cluster, perform discovery - rs, err := r.BucketDiscovery(ctx, bucketID) - if err != nil { - return nil, err - } - - return rs, nil + return r.BucketDiscovery(ctx, bucketID) } // DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset. diff --git a/replicaset.go b/replicaset.go index 578507c..a3ed658 100644 --- a/replicaset.go +++ b/replicaset.go @@ -38,28 +38,41 @@ func (rs *Replicaset) String() string { } func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error) { - const bucketStatFnc = "vshard.storage.bucket_stat" + future := rs.bucketStatAsync(ctx, bucketID) - var bsInfo BucketStatInfo + return bucketStatWait(future) +} + +func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) Future { + const bucketStatFnc = "vshard.storage.bucket_stat" req := tarantool.NewCallRequest(bucketStatFnc). Args([]interface{}{bucketID}). Context(ctx) - future := rs.conn.Do(req, pool.RO) - respData, err := future.Get() + var future = Future{ + tntFut: rs.conn.Do(req, pool.RO), + } + + return future +} + +func bucketStatWait(future Future) (BucketStatInfo, error) { + var bsInfo BucketStatInfo + + respData, err := future.tntFut.Get() if err != nil { return bsInfo, err } if len(respData) < 1 { - return bsInfo, fmt.Errorf("respData len is 0 for %s; unsupported or broken proto", bucketStatFnc) + return bsInfo, fmt.Errorf("respData len is 0 for bucketStatWait; unsupported or broken proto") } if respData[0] == nil { if len(respData) < 2 { - return bsInfo, fmt.Errorf("respData len < 2 when respData[0] is nil for %s", bucketStatFnc) + return bsInfo, fmt.Errorf("respData len < 2 when respData[0] is nil for bucketStatWait") } var tmp interface{} // todo: fix non-panic crutch