Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
more replicaset and replicaset's methods to replicaset.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Talkytitan5127 committed Mar 13, 2024
1 parent 79c9437 commit 0a01e4e
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 74 deletions.
88 changes: 88 additions & 0 deletions replicaset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package vshard_router

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
)

type ReplicasetInfo struct {
Name string
UUID uuid.UUID
}

type Replicaset struct {
conn *pool.ConnectionPool
info ReplicasetInfo

bucketCount atomic.Int32
}

type ReplicasetCallOpts struct {
PoolMode pool.Mode
Timeout time.Duration
}

// ReplicasetCallImpl perform function on remote storage
func (rs *Replicaset) ReplicasetCallImpl(
ctx context.Context,
opts ReplicasetCallOpts,
fnc string,
args interface{},
) (interface{}, StorageResultTypedFunc, error) {
if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
}

timeout := opts.Timeout
timeStart := time.Now()

req := tarantool.NewCallRequest(fnc)
req = req.Context(ctx)
req = req.Args(args)

var (
respData []interface{}
err error
)

for {
if since := time.Since(timeStart); since > timeout {
return nil, nil, err
}

future := rs.conn.Do(req, opts.PoolMode)

respData, err = future.Get()
if err != nil {
continue
}

if len(respData) != 2 {
err = fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData))
continue
}

if respData[1] != nil {
assertErr := &StorageCallAssertError{}

err = mapstructure.Decode(respData[1], assertErr)
if err != nil {
continue
}

err = assertErr
continue
}

return respData[0], func(result interface{}) error {
return future.GetTyped(&[]interface{}{&result})
}, nil
}
}
74 changes: 0 additions & 74 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,6 @@ type Config struct {
PoolOpts tarantool.Opts
}

type ReplicasetInfo struct {
Name string
UUID uuid.UUID
}

type Replicaset struct {
conn *pool.ConnectionPool
info ReplicasetInfo

bucketCount atomic.Int32
}

type BucketStatInfo struct {
BucketID uint64 `mapstructure:"id"`
Status string `mapstructure:"status"`
Expand Down Expand Up @@ -113,68 +101,6 @@ func (rs *Replicaset) bucketStat(ctx context.Context, bucketID uint64) (BucketSt
return *bsInfo, bsError
}

type ReplicasetCallOpts struct {
PoolMode pool.Mode
Timeout time.Duration
}

func (rs *Replicaset) ReplicasetCallImpl(
ctx context.Context,
opts ReplicasetCallOpts,
fnc string,
args interface{},
) (interface{}, StorageResultTypedFunc, error) {
if opts.Timeout == 0 {
opts.Timeout = CallTimeoutMin
}

timeout := opts.Timeout
timeStart := time.Now()

req := tarantool.NewCallRequest(fnc)
req = req.Context(ctx)
req = req.Args(args)

var (
respData []interface{}
err error
)

for {
if since := time.Since(timeStart); since > timeout {
return nil, nil, err
}

future := rs.conn.Do(req, opts.PoolMode)

respData, err = future.Get()
if err != nil {
continue
}

if len(respData) != 2 {
err = fmt.Errorf("invalid length of response data: must be = 2, current: %d", len(respData))
continue
}

if respData[1] != nil {
assertErr := &StorageCallAssertError{}

err = mapstructure.Decode(respData[1], assertErr)
if err != nil {
continue
}

err = assertErr
continue
}

return respData[0], func(result interface{}) error {
return future.GetTyped(&[]interface{}{&result})
}, nil
}
}

type InstanceInfo struct {
Addr string
UUID uuid.UUID
Expand Down

0 comments on commit 0a01e4e

Please sign in to comment.