Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2 from KaymeKaydex/feature/map_callrw
Browse files Browse the repository at this point in the history
#2 map_callrw implementation
  • Loading branch information
KaymeKaydex authored Mar 28, 2024
2 parents b73aaa1 + 6326ace commit a48e8a0
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 9 deletions.
248 changes: 245 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package vshard_router
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"golang.org/x/sync/errgroup"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
Expand Down Expand Up @@ -96,7 +98,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
for {

if since := time.Since(timeStart); since > timeout {
r.metrics().RequestDuration(since, false)
r.metrics().RequestDuration(since, false, false)

r.log().Debug(ctx, fmt.Sprintf("return result on timeout; since %s of timeout %s", since, timeout))
if err == nil {
Expand Down Expand Up @@ -188,7 +190,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
err = errorResp
}

r.metrics().RequestDuration(time.Since(timeStart), true)
r.metrics().RequestDuration(time.Since(timeStart), true, false)

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))

Expand All @@ -200,7 +202,247 @@ func (r *Router) RouterCallImpl(ctx context.Context,
}
}

// todo: router_map_callrw
// call function "storage_unref" if map_callrw is failed or successed
func (r *Router) callStorageUnref(refID int64) {
req := tarantool.NewCallRequest("vshard.storage._call")
req = req.Args([]interface{}{"storage_unref", refID})

for _, replicaset := range r.idToReplicaset {
conn := replicaset.conn

future := conn.Do(req, pool.RW)
future.SetError(nil)
}
}

type replicasetFuture struct {
id uuid.UUID
future *tarantool.Future
}

// RouterMapCallRWImpl perform call function on all masters in the cluster
// with a guarantee that in case of success it was executed with all
// buckets being accessible for reads and writes.
func (r *Router) RouterMapCallRWImpl(
ctx context.Context,
fnc string,
args interface{},
opts CallOpts,
) (map[uuid.UUID]interface{}, error) {
if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
}

timeout := opts.Timeout
timeStart := time.Now()

refID := r.refID.Load()
r.refID.Add(1)

defer r.callStorageUnref(refID)

mapCallCtx, cancel := context.WithTimeout(ctx, timeout)

req := tarantool.NewCallRequest("vshard.storage._call")
req = req.Context(ctx)

// ref stage: send

req = req.Args([]interface{}{
"storage_ref",
refID,
timeout,
})

g, gctx := errgroup.WithContext(mapCallCtx)
rsFutures := make(chan replicasetFuture)

g.Go(func() error {
defer close(rsFutures)

for id, replicaset := range r.idToReplicaset {
conn := replicaset.conn

future := conn.Do(req, pool.RW)
if err := future.Err(); err != nil {
cancel()

return fmt.Errorf("rs {%s} storage_ref err: %s", id.String(), err.Error())
}

select {
case <-gctx.Done():
return gctx.Err()
case rsFutures <- replicasetFuture{
id: id,
future: future,
}:
}
}

return nil
})

// ref stage collect

totalBucketCount := int32(0)

for i := 0; i < int(r.nWorkers); i++ {
g.Go(func() error {
for rsFuture := range rsFutures {
future := rsFuture.future

respData, err := future.Get()
if err != nil {
cancel()

return err
}

if respData[0] == nil {
vshardErr := &StorageCallAssertError{}

err = mapstructure.Decode(respData[1], vshardErr)
if err != nil {
cancel()

return err
}

cancel()

return vshardErr
}

var bucketCount uint16
err = future.GetTyped(&[]interface{}{&bucketCount})
if err != nil {
cancel()

return err
}

atomic.AddInt32(&totalBucketCount, int32(bucketCount))
}

return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

if totalBucketCount != r.knownBucketCount.Load() {
return nil, fmt.Errorf("unknown bucket counts %d", totalBucketCount)
}

// map stage: send

g, gctx = errgroup.WithContext(mapCallCtx)
rsFutures = make(chan replicasetFuture)
req = req.Args([]interface{}{"storage_map", refID, fnc, args})

g.Go(func() error {
defer close(rsFutures)

for id, replicaset := range r.idToReplicaset {
conn := replicaset.conn

future := conn.Do(req, pool.RW)
if err := future.Err(); err != nil {
cancel()

return fmt.Errorf("rs {%s} storage_map err: %s", id.String(), err.Error())
}

select {
case <-gctx.Done():
return gctx.Err()
case rsFutures <- replicasetFuture{
id: id,
future: future,
}:
}
}

return nil
})

// map stage: collect

idToResult := make(map[uuid.UUID]interface{})

for i := 0; i < int(r.nWorkers); i++ {
g.Go(func() error {
for rsFuture := range rsFutures {
future := rsFuture.future

respData, err := future.Get()
if err != nil {
cancel()

return err
}

if len(respData) != 2 {
err = fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData))
cancel()

return err
}

if respData[0] == nil {
vshardErr := &StorageCallAssertError{}

err = mapstructure.Decode(respData[1], vshardErr)
if err != nil {
cancel()

return err
}

cancel()

return vshardErr
}

isVShardRespOk := false
err = future.GetTyped(&[]interface{}{&isVShardRespOk})
if err != nil {
cancel()

return err
}

if !isVShardRespOk { // error
errorResp := &StorageCallAssertError{}

err = future.GetTyped(&[]interface{}{&isVShardRespOk, errorResp})
if err != nil {
err = fmt.Errorf("cant get typed vshard err with err: %s", err)
}

cancel()

return err
}

idToResult[rsFuture.id] = respData[1]
}

return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

r.metrics().RequestDuration(time.Since(timeStart), true, true)

return idToResult, nil
}

// RouterRoute get replicaset object by bucket identifier.
// alias to BucketResolve
Expand Down
4 changes: 2 additions & 2 deletions providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (e *StdoutLogger) Warn(ctx context.Context, msg string) {
type MetricsProvider interface {
CronDiscoveryEvent(ok bool, duration time.Duration, reason string)
RetryOnCall(reason string)
RequestDuration(duration time.Duration, ok bool)
RequestDuration(duration time.Duration, ok bool, mapReduce bool)
}

// EmptyMetrics is default empty metrics provider
Expand All @@ -62,7 +62,7 @@ type EmptyMetrics struct{}

func (e *EmptyMetrics) CronDiscoveryEvent(ok bool, duration time.Duration, reason string) {}
func (e *EmptyMetrics) RetryOnCall(reason string) {}
func (e *EmptyMetrics) RequestDuration(duration time.Duration, ok bool) {}
func (e *EmptyMetrics) RequestDuration(duration time.Duration, ok bool, mapReduce bool) {}

type TopologyProvider struct {
r *Router
Expand Down
9 changes: 5 additions & 4 deletions tests/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package integration

import (
"context"
"github.com/KaymeKaydex/go-vshard-router"
"log"
"os"
"testing"
"time"

vshard_router "github.com/KaymeKaydex/go-vshard-router"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/tarantool/go-tarantool/v2"
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestReplicasetCall(t *testing.T) {
var isReady bool

for _, rs := range router.RouterRouteAll() {
_, getTypedResp, err := rs.ReplicasetCallImpl(
_, getTypedResp, err := rs.ReplicaCall(
ctx,
vshard_router.ReplicasetCallOpts{
PoolMode: pool.RW,
Expand All @@ -75,7 +76,7 @@ func TestReplicasetCall(t *testing.T) {
)

for _, rs := range router.RouterRouteAll() {
_, getTypedResp, err := rs.ReplicasetCallImpl(
_, getTypedResp, err := rs.ReplicaCall(
ctx,
vshard_router.ReplicasetCallOpts{
PoolMode: pool.RW,
Expand All @@ -96,7 +97,7 @@ func TestReplicasetCall(t *testing.T) {
}

for _, rs := range router.RouterRouteAll() {
_, _, err := rs.ReplicasetCallImpl(
_, _, err := rs.ReplicaCall(
ctx,
vshard_router.ReplicasetCallOpts{
PoolMode: pool.RW,
Expand Down
17 changes: 17 additions & 0 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type Router struct {

knownBucketCount atomic.Int32

// ----------------------- Map-Reduce -----------------------
// Storage Ref ID. It must be unique for each ref request
// and therefore is global and monotonically growing.
refID atomic.Int64

// worker's count to proceed channel of replicaset's futures
nWorkers int32

cancelDiscovery func()
}

Expand Down Expand Up @@ -61,6 +69,8 @@ type Config struct {
User string
Password string
PoolOpts tarantool.Opts

NWorkers int32
}

type BucketStatInfo struct {
Expand Down Expand Up @@ -156,6 +166,13 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
router.cancelDiscovery = cancelFunc
}

nWorkers := int32(2)
if cfg.NWorkers != 0 {
nWorkers = cfg.NWorkers
}

router.nWorkers = nWorkers

return router, err
}

Expand Down

0 comments on commit a48e8a0

Please sign in to comment.