Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #70 (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
nurzhan-saktaganov authored Sep 20, 2024
1 parent e4e989d commit bf100ff
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 51 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ REFACTOR:
* Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs
* resolve issue #44: remove bucketCount field from struct Replicaset
* rename startCronDiscovery to cronDiscovery and make it panic-tolerant
* BucketStat: split into bucketStatAsync and bucketStatWait parts
* BucketDiscovery: do not spawn goroutines, just use futures in the single goroutine
* BucketResolve: make it alias for BucketDiscovery

TESTS:

Expand Down
76 changes: 29 additions & 47 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
Expand All @@ -29,6 +27,10 @@ const (

// BucketDiscovery search bucket in whole cluster
func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replicaset, error) {
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

view := r.getConsistentView()

rs := view.routeMap[bucketID].Load()
Expand All @@ -37,42 +39,39 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
}

// it`s ok if in the same time we have few active searches
// mu per bucket is expansive
r.log().Infof(ctx, "Discovering bucket %d", bucketID)

idToReplicasetRef := r.getIDToReplicaset()

wg := sync.WaitGroup{}
wg.Add(len(idToReplicasetRef))

type result struct {
err error
rs *Replicaset
type rsFuture struct {
rsID uuid.UUID
future *tarantool.Future
}

// This works only for go 1.19 or higher. To support older versions
// we can use mutex + conditional compilation that checks go version.
// Example for conditional compilation: https://www.youtube.com/watch?v=5eQBKqVlNQg
var resultAtomic = atomic.Pointer[result]{}

var rsFutures = make([]rsFuture, 0, len(idToReplicasetRef))
// Send a bunch of parallel requests
for rsID, rs := range idToReplicasetRef {
go func(rs *Replicaset, rsID uuid.UUID) {
defer wg.Done()
if _, err := rs.BucketStat(ctx, bucketID); err == nil {
// It's ok if several replicasets return ok to bucket_stat command for the same bucketID,
// just pick any of them.
var res result
res.rs, res.err = r.BucketSet(bucketID, rsID)
resultAtomic.Store(&res)
}
}(rs, rsID)
rsFutures = append(rsFutures, rsFuture{
rsID: rsID,
future: rs.bucketStatAsync(ctx, bucketID),
})
}

wg.Wait()
for _, rsFuture := range rsFutures {
if _, err := bucketStatWait(rsFuture.future); err != nil {
// just skip, bucket seems do not belong to this replicaset
continue
}

res := resultAtomic.Load()
if res == nil || res.err != nil || res.rs == nil {
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
// It's ok if several replicasets return ok to bucket_stat command for the same bucketID, just pick any of them.
rs, err := r.BucketSet(bucketID, rsFuture.rsID)
if err != nil {
r.log().Errorf(ctx, "BucketDiscovery: can't set rsID %v for bucketID %d: %v", rsFuture.rsID, bucketID, err)
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
}

// TODO: should we release resources for unhandled futures?
return rs, nil
}

/*
Expand All @@ -84,29 +83,12 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
-- discovery).
*/

return res.rs, nil
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
}

// BucketResolve resolve bucket id to replicaset
func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error) {
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

view := r.getConsistentView()

rs := view.routeMap[bucketID].Load()
if rs != nil {
return rs, nil
}

// Replicaset removed from cluster, perform discovery
rs, err := r.BucketDiscovery(ctx, bucketID)
if err != nil {
return nil, err
}

return rs, nil
return r.BucketDiscovery(ctx, bucketID)
}

// DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
Expand Down
19 changes: 15 additions & 4 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,39 @@ func (rs *Replicaset) String() string {
}

func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error) {
const bucketStatFnc = "vshard.storage.bucket_stat"
future := rs.bucketStatAsync(ctx, bucketID)

var bsInfo BucketStatInfo
return bucketStatWait(future)
}

func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tarantool.Future {
const bucketStatFnc = "vshard.storage.bucket_stat"

req := tarantool.NewCallRequest(bucketStatFnc).
Args([]interface{}{bucketID}).
Context(ctx)

future := rs.conn.Do(req, pool.RO)

return future
}

func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
var bsInfo BucketStatInfo

respData, err := future.Get()
if err != nil {
return bsInfo, err
}

if len(respData) < 1 {
return bsInfo, fmt.Errorf("respData len is 0 for %s; unsupported or broken proto", bucketStatFnc)
return bsInfo, fmt.Errorf("respData len is 0 for bucketStatWait; unsupported or broken proto")
}

if respData[0] == nil {

if len(respData) < 2 {
return bsInfo, fmt.Errorf("respData len < 2 when respData[0] is nil for %s", bucketStatFnc)
return bsInfo, fmt.Errorf("respData len < 2 when respData[0] is nil for bucketStatWait")
}

var tmp interface{} // todo: fix non-panic crutch
Expand Down

0 comments on commit bf100ff

Please sign in to comment.