Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

resolve issue #70 #72

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ REFACTOR:
* Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs
* resolve issue #44: remove bucketCount field from struct Replicaset
* rename startCronDiscovery to cronDiscovery and make it panic-tolerant
* BucketStat: split into bucketStatAsync and bucketStatWait parts
* BucketDiscovery: do not spawn goroutines, just use futures in the single goroutine
* BucketResolve: make it alias for BucketDiscovery

TESTS:

Expand Down
76 changes: 29 additions & 47 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
Expand All @@ -29,6 +27,10 @@ const (

// BucketDiscovery search bucket in whole cluster
func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replicaset, error) {
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

view := r.getConsistentView()

rs := view.routeMap[bucketID].Load()
Expand All @@ -37,42 +39,39 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
}

// it`s ok if in the same time we have few active searches
// mu per bucket is expansive
r.log().Infof(ctx, "Discovering bucket %d", bucketID)

idToReplicasetRef := r.getIDToReplicaset()

wg := sync.WaitGroup{}
wg.Add(len(idToReplicasetRef))

type result struct {
err error
rs *Replicaset
type rsFuture struct {
rsID uuid.UUID
future *tarantool.Future
}

// This works only for go 1.19 or higher. To support older versions
// we can use mutex + conditional compilation that checks go version.
// Example for conditional compilation: https://www.youtube.com/watch?v=5eQBKqVlNQg
var resultAtomic = atomic.Pointer[result]{}

var rsFutures = make([]rsFuture, 0, len(idToReplicasetRef))
// Send a bunch of parallel requests
for rsID, rs := range idToReplicasetRef {
go func(rs *Replicaset, rsID uuid.UUID) {
defer wg.Done()
if _, err := rs.BucketStat(ctx, bucketID); err == nil {
// It's ok if several replicasets return ok to bucket_stat command for the same bucketID,
// just pick any of them.
var res result
res.rs, res.err = r.BucketSet(bucketID, rsID)
resultAtomic.Store(&res)
}
}(rs, rsID)
rsFutures = append(rsFutures, rsFuture{
rsID: rsID,
future: rs.bucketStatAsync(ctx, bucketID),
})
}

wg.Wait()
for _, rsFuture := range rsFutures {
if _, err := bucketStatWait(rsFuture.future); err != nil {
// just skip, bucket seems do not belong to this replicaset
continue
}

res := resultAtomic.Load()
if res == nil || res.err != nil || res.rs == nil {
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
// It's ok if several replicasets return ok to bucket_stat command for the same bucketID, just pick any of them.
rs, err := r.BucketSet(bucketID, rsFuture.rsID)
if err != nil {
r.log().Errorf(ctx, "BucketDiscovery: can't set rsID %v for bucketID %d: %v", rsFuture.rsID, bucketID, err)
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
}

// TODO: should we release resources for unhandled futures?
return rs, nil
}

/*
Expand All @@ -84,29 +83,12 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
-- discovery).
*/

return res.rs, nil
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
}

// BucketResolve resolve bucket id to replicaset
func (r *Router) BucketResolve(ctx context.Context, bucketID uint64) (*Replicaset, error) {
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
}

view := r.getConsistentView()

rs := view.routeMap[bucketID].Load()
if rs != nil {
return rs, nil
}

// Replicaset removed from cluster, perform discovery
rs, err := r.BucketDiscovery(ctx, bucketID)
if err != nil {
return nil, err
}

return rs, nil
return r.BucketDiscovery(ctx, bucketID)
}

// DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
Expand Down
19 changes: 15 additions & 4 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,39 @@ func (rs *Replicaset) String() string {
}

func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error) {
const bucketStatFnc = "vshard.storage.bucket_stat"
future := rs.bucketStatAsync(ctx, bucketID)

var bsInfo BucketStatInfo
return bucketStatWait(future)
}

func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tarantool.Future {
const bucketStatFnc = "vshard.storage.bucket_stat"

req := tarantool.NewCallRequest(bucketStatFnc).
Args([]interface{}{bucketID}).
Context(ctx)

future := rs.conn.Do(req, pool.RO)

return future
}

func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
var bsInfo BucketStatInfo

respData, err := future.Get()
if err != nil {
return bsInfo, err
}

if len(respData) < 1 {
return bsInfo, fmt.Errorf("respData len is 0 for %s; unsupported or broken proto", bucketStatFnc)
return bsInfo, fmt.Errorf("respData len is 0 for bucketStatWait; unsupported or broken proto")
}

if respData[0] == nil {

if len(respData) < 2 {
return bsInfo, fmt.Errorf("respData len < 2 when respData[0] is nil for %s", bucketStatFnc)
return bsInfo, fmt.Errorf("respData len < 2 when respData[0] is nil for bucketStatWait")
}

var tmp interface{} // todo: fix non-panic crutch
Expand Down
Loading