Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

feature/sugar: implement go-tarantool like faces for sharded tarantool #58

Merged
merged 10 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ FEATURES:
* All discovering logs has new prefix [DISCOVERY]
* Introduce Replicaset.CallAsync, it is usefull to send concurrent requests to replicasets;
additionally, CallAsync provides new interface to interact with replicaset without cons of interface of ReplicaCall
* Retry tarantool request only on some vshard errors (#66)
* Retry tarantool request only on some vshard errors (#66).
* Added call interfaces backwards compatible with go-tarantool (#31).

REFACTOR:

Expand Down
118 changes: 118 additions & 0 deletions sugar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package vshard_router //nolint:revive

import (
"context"
"fmt"

"github.com/tarantool/go-tarantool/v2/pool"
)

// CallRequest helps you to create a call request object for execution
// by a Connection.
type CallRequest struct {
ctx context.Context
fnc string
args interface{}
bucketID uint64
}

// CallResponse is a backwards-compatible structure with go-tarantool for easier replacement.
type CallResponse struct {
rawResp interface{}
getTypedFnc StorageResultTypedFunc
err error
}

// NewCallRequest returns a new empty CallRequest.
func (r *Router) NewCallRequest(function string) *CallRequest {
req := new(CallRequest)
req.fnc = function
return req
}

// Do perform a request synchronously on the connection.
// It is important that the logic of this method is different from go-tarantool.
func (r *Router) Do(req *CallRequest, userMode pool.Mode) *CallResponse {
ctx := req.ctx
bucketID := req.bucketID
resp := new(CallResponse)

if req.fnc == "" {
resp.err = fmt.Errorf("func name is empty")
return resp
}

if req.args == nil {
resp.err = fmt.Errorf("no request args")
return resp
}

if req.bucketID == 0 {
if r.cfg.BucketGetter == nil {
resp.err = fmt.Errorf("bucket id for request is not set")
return resp
}

bucketID = r.cfg.BucketGetter(ctx)
}

vshardMode := ReadMode

// If the user says he prefers to do it on the master,
// then he agrees that it will go to the replica, which means he will not write.
if userMode == pool.RW {
nurzhan-saktaganov marked this conversation as resolved.
Show resolved Hide resolved
vshardMode = WriteMode
}

resp.rawResp, resp.getTypedFnc, resp.err = r.RouterCallImpl(ctx,
bucketID,
CallOpts{
Timeout: r.cfg.RequestTimeout,
nurzhan-saktaganov marked this conversation as resolved.
Show resolved Hide resolved
PoolMode: userMode,
VshardMode: vshardMode,
},
req.fnc,
req.args)

return resp
}

// Args sets the args for the eval request.
// Note: default value is empty.
func (req *CallRequest) Args(args interface{}) *CallRequest {
req.args = args
return req
}

// Context sets a passed context to the request.
func (req *CallRequest) Context(ctx context.Context) *CallRequest {
req.ctx = ctx
return req
}

// BucketID method that sets the bucketID for your request.
// You can ignore this parameter if you have a bucketGetter.
// However, this method has a higher priority.
func (req *CallRequest) BucketID(bucketID uint64) *CallRequest {
req.bucketID = bucketID
return req
}

// GetTyped waits synchronously for response and calls msgpack.Decoder.Decode(result) if no error happens.
func (resp *CallResponse) GetTyped(result interface{}) error {
if resp.err != nil {
return resp.err
}

return resp.getTypedFnc(result)
}

// Get implementation now works synchronously for response.
// The interface was created purely for convenient migration to go-vshard-router from go-tarantool.
func (resp *CallResponse) Get() ([]interface{}, error) {
if resp.err != nil {
return nil, resp.err
}

return []interface{}{resp.rawResp}, nil
}
14 changes: 13 additions & 1 deletion vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,25 @@ type Config struct {
TopologyProvider TopologyProvider // TopologyProvider is required provider

// Discovery
DiscoveryTimeout time.Duration // DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout
// DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout.
DiscoveryTimeout time.Duration
DiscoveryMode DiscoveryMode

TotalBucketCount uint64
User string
Password string
PoolOpts tarantool.Opts

// BucketGetter is an optional argument.
// You can specify a function that will receive the bucket id from the context.
// This is useful if you use middleware that inserts the calculated bucket id into the request context.
BucketGetter func(ctx context.Context) uint64
// RequestTimeout timeout for requests to Tarantool.
// Don't rely on using this timeout.
// This is the difference between the timeout of the library itself
// that is, our retry timeout if the buckets, for example, move.
// Currently, it only works for sugar implementations .
RequestTimeout time.Duration
nurzhan-saktaganov marked this conversation as resolved.
Show resolved Hide resolved
}

type BucketStatInfo struct {
Expand Down
Loading