From a75eea6e152428c47e62c4c9752b2180d9384e66 Mon Sep 17 00:00:00 2001 From: Gorbunov Pavel Date: Fri, 1 Mar 2024 15:05:57 +0300 Subject: [PATCH 1/4] map_callrw implementation --- api.go | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++++- vshard.go | 1 + 2 files changed, 152 insertions(+), 1 deletion(-) diff --git a/api.go b/api.go index 5c7548b..6139053 100644 --- a/api.go +++ b/api.go @@ -200,7 +200,157 @@ func (r *Router) RouterCallImpl(ctx context.Context, } } -// todo: router_map_callrw +func (r *Router) callStorageUnref(futures map[uuid.UUID]*tarantool.Future, refID int64) { + req := tarantool.NewCallRequest("vshard.storage._call") + req = req.Args([]interface{}{"storage_unref", refID}) + + for id, future := range futures { + future.SetError(nil) + + conn := r.idToReplicaset[id].conn + + future = conn.Do(req, pool.RW) + future.SetError(nil) + } +} + +// RouterMapCallRWImpl perform call function on all masters in the cluster +// with a guarantee that in case of success it was executed with all +// buckets being accessible for reads and writes. +func (r *Router) RouterMapCallRWImpl( + ctx context.Context, + fnc string, + args interface{}, + opts CallOpts, +) (map[uuid.UUID]interface{}, error) { + if opts.Timeout == 0 { + opts.Timeout = CallTimeoutMin + } + + timeout := opts.Timeout + timeStart := time.Now() + + refID := r.refID.Load() + r.refID.Add(1) + + futures := make(map[uuid.UUID]*tarantool.Future, 0) + + defer r.callStorageUnref(futures, refID) + + req := tarantool.NewCallRequest("vshard.storage._call") + req = req.Context(ctx) + + // ref stage: send + + req = req.Args([]interface{}{ + "storage_ref", + refID, + timeout, + }) + + for id, replicaset := range r.idToReplicaset { + conn := replicaset.conn + + future := conn.Do(req, pool.RW) + futures[id] = future + } + + // ref stage collect + + totalBucketCount := uint16(0) + + for _, future := range futures { + respData, err := future.Get() + if err != nil { + return nil, err + } + + if respData[0] == nil { + vshardErr := &StorageCallAssertError{} + + err = mapstructure.Decode(respData[1], vshardErr) + if err != nil { + return nil, err + } + + return nil, vshardErr + } + + var bucketCount uint16 + err = future.GetTyped(&[]interface{}{&bucketCount}) + if err != nil { + return nil, err + } + + totalBucketCount += bucketCount + } + + if int32(totalBucketCount) != r.knownBucketCount.Load() { + return nil, fmt.Errorf("unknown bucket counts %d", totalBucketCount) + } + + // map stage: send + + req = req.Args([]interface{}{"storage_map", refID, fnc, args}) + + for id, replicaset := range r.idToReplicaset { + conn := replicaset.conn + + future := conn.Do(req, pool.RW) + futures[id] = future + } + + // map stage: collect + + idToResult := make(map[uuid.UUID]interface{}) + + for id, future := range futures { + respData, err := future.Get() + if err != nil { + return nil, err + } + + if len(respData) != 2 { + err = fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData)) + + return nil, err + } + + if respData[0] == nil { + vshardErr := &StorageCallAssertError{} + + err = mapstructure.Decode(respData[1], vshardErr) + if err != nil { + return nil, err + } + + return nil, vshardErr + } + + isVShardRespOk := false + err = future.GetTyped(&[]interface{}{&isVShardRespOk}) + if err != nil { + return nil, err + } + + if !isVShardRespOk { // error + errorResp := &StorageCallAssertError{} + + err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp}) + if err != nil { + err = fmt.Errorf("cant get typed vshard err with err: %s", err) + } + + return nil, err + } + + idToResult[id] = respData[1] + } + + r.metrics().RequestDuration(time.Since(timeStart), true) + + return idToResult, nil +} // RouterRoute get replicaset object by bucket identifier. // alias to BucketResolve diff --git a/vshard.go b/vshard.go index 9222d8c..6c900f4 100644 --- a/vshard.go +++ b/vshard.go @@ -28,6 +28,7 @@ type Router struct { } knownBucketCount atomic.Int32 + refID atomic.Int64 cancelDiscovery func() } From 1df3592ef5f89c4a1cbcaa930c9c702ab19b84ba Mon Sep 17 00:00:00 2001 From: Gorbunov Pavel Date: Thu, 14 Mar 2024 01:04:14 +0300 Subject: [PATCH 2/4] map_callrw with errgroup --- api.go | 236 +++++++++++++++++++++++++++++++++++---------------- providers.go | 4 +- vshard.go | 10 +++ 3 files changed, 176 insertions(+), 74 deletions(-) diff --git a/api.go b/api.go index 6139053..d2847f3 100644 --- a/api.go +++ b/api.go @@ -3,10 +3,12 @@ package vshard_router import ( "context" "fmt" + "sync/atomic" "time" "github.com/google/uuid" "github.com/mitchellh/mapstructure" + "golang.org/x/sync/errgroup" "github.com/tarantool/go-tarantool/v2" "github.com/tarantool/go-tarantool/v2/pool" @@ -96,7 +98,7 @@ func (r *Router) RouterCallImpl(ctx context.Context, for { if since := time.Since(timeStart); since > timeout { - r.metrics().RequestDuration(since, false) + r.metrics().RequestDuration(since, false, false) r.log().Debug(ctx, fmt.Sprintf("return result on timeout; since %s of timeout %s", since, timeout)) if err == nil { @@ -188,7 +190,7 @@ func (r *Router) RouterCallImpl(ctx context.Context, err = errorResp } - r.metrics().RequestDuration(time.Since(timeStart), true) + r.metrics().RequestDuration(time.Since(timeStart), true, false) r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData)) @@ -200,20 +202,24 @@ func (r *Router) RouterCallImpl(ctx context.Context, } } -func (r *Router) callStorageUnref(futures map[uuid.UUID]*tarantool.Future, refID int64) { +// call function "storage_unref" if map_callrw is failed or successed +func (r *Router) callStorageUnref(refID int64) { req := tarantool.NewCallRequest("vshard.storage._call") req = req.Args([]interface{}{"storage_unref", refID}) - for id, future := range futures { - future.SetError(nil) - - conn := r.idToReplicaset[id].conn + for _, replicaset := range r.idToReplicaset { + conn := replicaset.conn - future = conn.Do(req, pool.RW) + future := conn.Do(req, pool.RW) future.SetError(nil) } } +type replicasetFuture struct { + id uuid.UUID + future *tarantool.Future +} + // RouterMapCallRWImpl perform call function on all masters in the cluster // with a guarantee that in case of success it was executed with all // buckets being accessible for reads and writes. @@ -233,9 +239,9 @@ func (r *Router) RouterMapCallRWImpl( refID := r.refID.Load() r.refID.Add(1) - futures := make(map[uuid.UUID]*tarantool.Future, 0) + defer r.callStorageUnref(refID) - defer r.callStorageUnref(futures, refID) + mapCallCtx, cancel := context.WithTimeout(ctx, timeout) req := tarantool.NewCallRequest("vshard.storage._call") req = req.Context(ctx) @@ -248,106 +254,192 @@ func (r *Router) RouterMapCallRWImpl( timeout, }) - for id, replicaset := range r.idToReplicaset { - conn := replicaset.conn + g, gctx := errgroup.WithContext(mapCallCtx) + rsFutures := make(chan replicasetFuture) - future := conn.Do(req, pool.RW) - futures[id] = future - } + g.Go(func() error { + defer close(rsFutures) - // ref stage collect + for id, replicaset := range r.idToReplicaset { + conn := replicaset.conn - totalBucketCount := uint16(0) + future := conn.Do(req, pool.RW) + if err := future.Err(); err != nil { + cancel() - for _, future := range futures { - respData, err := future.Get() - if err != nil { - return nil, err + return fmt.Errorf("rs {%s} storage_ref err: %s", id.String(), err.Error()) + } + + select { + case <-gctx.Done(): + return gctx.Err() + case rsFutures <- replicasetFuture{ + id: id, + future: future, + }: + } } - if respData[0] == nil { - vshardErr := &StorageCallAssertError{} + return nil + }) - err = mapstructure.Decode(respData[1], vshardErr) - if err != nil { - return nil, err - } + // ref stage collect - return nil, vshardErr - } + totalBucketCount := int32(0) - var bucketCount uint16 - err = future.GetTyped(&[]interface{}{&bucketCount}) - if err != nil { - return nil, err - } + for i := 0; i < int(r.nWorkers); i++ { + g.Go(func() error { + for rsFuture := range rsFutures { + future := rsFuture.future + + respData, err := future.Get() + if err != nil { + cancel() + + return err + } + + if respData[0] == nil { + vshardErr := &StorageCallAssertError{} + + err = mapstructure.Decode(respData[1], vshardErr) + if err != nil { + cancel() + + return err + } + + cancel() - totalBucketCount += bucketCount + return vshardErr + } + + var bucketCount uint16 + err = future.GetTyped(&[]interface{}{&bucketCount}) + if err != nil { + cancel() + + return err + } + + atomic.AddInt32(&totalBucketCount, int32(bucketCount)) + } + + return nil + }) } - if int32(totalBucketCount) != r.knownBucketCount.Load() { + if err := g.Wait(); err != nil { + return nil, err + } + + if totalBucketCount != r.knownBucketCount.Load() { return nil, fmt.Errorf("unknown bucket counts %d", totalBucketCount) } // map stage: send + g, gctx = errgroup.WithContext(mapCallCtx) + rsFutures = make(chan replicasetFuture) req = req.Args([]interface{}{"storage_map", refID, fnc, args}) - for id, replicaset := range r.idToReplicaset { - conn := replicaset.conn + g.Go(func() error { + defer close(rsFutures) - future := conn.Do(req, pool.RW) - futures[id] = future - } + for id, replicaset := range r.idToReplicaset { + conn := replicaset.conn + + future := conn.Do(req, pool.RW) + if err := future.Err(); err != nil { + cancel() + + return fmt.Errorf("rs {%s} storage_map err: %s", id.String(), err.Error()) + } + + select { + case <-gctx.Done(): + return gctx.Err() + case rsFutures <- replicasetFuture{ + id: id, + future: future, + }: + } + } + + return nil + }) // map stage: collect idToResult := make(map[uuid.UUID]interface{}) - for id, future := range futures { - respData, err := future.Get() - if err != nil { - return nil, err - } + for i := 0; i < int(r.nWorkers); i++ { + g.Go(func() error { + for rsFuture := range rsFutures { + future := rsFuture.future - if len(respData) != 2 { - err = fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData)) + respData, err := future.Get() + if err != nil { + cancel() - return nil, err - } + return err + } - if respData[0] == nil { - vshardErr := &StorageCallAssertError{} + if len(respData) != 2 { + err = fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData)) + cancel() - err = mapstructure.Decode(respData[1], vshardErr) - if err != nil { - return nil, err - } + return err + } - return nil, vshardErr - } + if respData[0] == nil { + vshardErr := &StorageCallAssertError{} - isVShardRespOk := false - err = future.GetTyped(&[]interface{}{&isVShardRespOk}) - if err != nil { - return nil, err - } + err = mapstructure.Decode(respData[1], vshardErr) + if err != nil { + cancel() - if !isVShardRespOk { // error - errorResp := &StorageCallAssertError{} + return err + } - err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp}) - if err != nil { - err = fmt.Errorf("cant get typed vshard err with err: %s", err) + cancel() + + return vshardErr + } + + isVShardRespOk := false + err = future.GetTyped(&[]interface{}{&isVShardRespOk}) + if err != nil { + cancel() + + return err + } + + if !isVShardRespOk { // error + errorResp := &StorageCallAssertError{} + + err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp}) + if err != nil { + err = fmt.Errorf("cant get typed vshard err with err: %s", err) + } + + cancel() + + return err + } + + idToResult[rsFuture.id] = respData[1] } - return nil, err - } + return nil + }) + } - idToResult[id] = respData[1] + if err := g.Wait(); err != nil { + return nil, err } - r.metrics().RequestDuration(time.Since(timeStart), true) + r.metrics().RequestDuration(time.Since(timeStart), true, true) return idToResult, nil } diff --git a/providers.go b/providers.go index ce94b66..f200c7a 100644 --- a/providers.go +++ b/providers.go @@ -53,7 +53,7 @@ func (e *StdoutLogger) Warn(ctx context.Context, msg string) { type MetricsProvider interface { CronDiscoveryEvent(ok bool, duration time.Duration, reason string) RetryOnCall(reason string) - RequestDuration(duration time.Duration, ok bool) + RequestDuration(duration time.Duration, ok bool, mapReduce bool) } // EmptyMetrics is default empty metrics provider @@ -62,7 +62,7 @@ type EmptyMetrics struct{} func (e *EmptyMetrics) CronDiscoveryEvent(ok bool, duration time.Duration, reason string) {} func (e *EmptyMetrics) RetryOnCall(reason string) {} -func (e *EmptyMetrics) RequestDuration(duration time.Duration, ok bool) {} +func (e *EmptyMetrics) RequestDuration(duration time.Duration, ok bool, mapReduce bool) {} type TopologyProvider struct { r *Router diff --git a/vshard.go b/vshard.go index 6c900f4..a07e647 100644 --- a/vshard.go +++ b/vshard.go @@ -29,6 +29,7 @@ type Router struct { knownBucketCount atomic.Int32 refID atomic.Int64 + nWorkers int32 cancelDiscovery func() } @@ -62,6 +63,8 @@ type Config struct { User string Password string PoolOpts tarantool.Opts + + NWorkers int32 } type BucketStatInfo struct { @@ -157,6 +160,13 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) { router.cancelDiscovery = cancelFunc } + nWorkers := int32(2) + if cfg.NWorkers != 0 { + nWorkers = cfg.NWorkers + } + + router.nWorkers = nWorkers + return router, err } From c8fb3340b75ad312b3957e322c10b1c9491f36ad Mon Sep 17 00:00:00 2001 From: Gorbunov Pavel Date: Thu, 14 Mar 2024 01:33:23 +0300 Subject: [PATCH 3/4] add comment to map-reduce vars --- vshard.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/vshard.go b/vshard.go index a07e647..71afb3e 100644 --- a/vshard.go +++ b/vshard.go @@ -28,8 +28,14 @@ type Router struct { } knownBucketCount atomic.Int32 - refID atomic.Int64 - nWorkers int32 + + // ----------------------- Map-Reduce ----------------------- + // Storage Ref ID. It must be unique for each ref request + // and therefore is global and monotonically growing. + refID atomic.Int64 + + // worker's count to proceed channel of replicaset's futures + nWorkers int32 cancelDiscovery func() } From 6326ace4fca35195f76f4900ea836f3991df5c94 Mon Sep 17 00:00:00 2001 From: Gorbunov Pavel Date: Tue, 19 Mar 2024 11:48:44 +0300 Subject: [PATCH 4/4] correct method name --- tests/integration/integration_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index c1d6785..3211f6f 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -2,12 +2,13 @@ package integration import ( "context" - "github.com/KaymeKaydex/go-vshard-router" "log" "os" "testing" "time" + vshard_router "github.com/KaymeKaydex/go-vshard-router" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/tarantool/go-tarantool/v2" @@ -48,7 +49,7 @@ func TestReplicasetCall(t *testing.T) { var isReady bool for _, rs := range router.RouterRouteAll() { - _, getTypedResp, err := rs.ReplicasetCallImpl( + _, getTypedResp, err := rs.ReplicaCall( ctx, vshard_router.ReplicasetCallOpts{ PoolMode: pool.RW, @@ -75,7 +76,7 @@ func TestReplicasetCall(t *testing.T) { ) for _, rs := range router.RouterRouteAll() { - _, getTypedResp, err := rs.ReplicasetCallImpl( + _, getTypedResp, err := rs.ReplicaCall( ctx, vshard_router.ReplicasetCallOpts{ PoolMode: pool.RW, @@ -96,7 +97,7 @@ func TestReplicasetCall(t *testing.T) { } for _, rs := range router.RouterRouteAll() { - _, _, err := rs.ReplicasetCallImpl( + _, _, err := rs.ReplicaCall( ctx, vshard_router.ReplicasetCallOpts{ PoolMode: pool.RW,