From f460444e8a0a48e0f945272b327d0781877ad4f9 Mon Sep 17 00:00:00 2001 From: Maksim Konovalov Date: Mon, 9 Sep 2024 13:50:43 +0300 Subject: [PATCH] feature/suguar: implement go-tarantool like faces for sharded tarantool request --- CHANGELOG.md | 1 + suguar.go | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++ vshard.go | 10 ++++- 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 suguar.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d73e36..5298616 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ FEATURES: * Support new Sprintf-like logging interface (#48) +* Added call interfaces backwards compatible with go-tarantool (#31). REFACTOR: diff --git a/suguar.go b/suguar.go new file mode 100644 index 0000000..b407f06 --- /dev/null +++ b/suguar.go @@ -0,0 +1,104 @@ +package vshard_router //nolint:revive + +import ( + "context" + "fmt" + + "github.com/tarantool/go-tarantool/v2/pool" +) + +// CallRequest helps you to create a call request object for execution +// by a Connection. +type CallRequest struct { + ctx context.Context + fnc string + args interface{} + bucketID uint64 +} + +// CallResponse is a backwards-compatible structure with go-tarantool for easier replacement. +type CallResponse struct { + rawResp interface{} + getTypedFnc StorageResultTypedFunc + err error +} + +// NewCallRequest returns a new empty CallRequest. +func (r *Router) NewCallRequest(function string) *CallRequest { + req := new(CallRequest) + req.fnc = function + return req +} + +// Do perform a request asynchronously on the connection. +func (r *Router) Do(req *CallRequest, userMode pool.Mode) *CallResponse { + ctx := req.ctx + bucketID := req.bucketID + resp := new(CallResponse) + + if req.bucketID == 0 { + if r.cfg.BucketGetter == nil { + resp.err = fmt.Errorf("bucket id for request is not set") + return resp + } + + bucketID = r.cfg.BucketGetter(ctx) + } + + vshardMode := ReadMode + + if userMode == pool.RW { + vshardMode = WriteMode + } + + resp.rawResp, resp.getTypedFnc, resp.err = r.RouterCallImpl(ctx, + bucketID, + CallOpts{ + Timeout: r.cfg.RequestTimeout, + PoolMode: userMode, + VshardMode: vshardMode, + }, + req.fnc, + req.args) + + return resp +} + +// Args sets the args for the eval request. +// Note: default value is empty. +func (req *CallRequest) Args(args interface{}) *CallRequest { + req.args = args + return req +} + +// Context sets a passed context to the request. +func (req *CallRequest) Context(ctx context.Context) *CallRequest { + req.ctx = ctx + return req +} + +// BucketID method that sets the bucketID for your request. +// You can ignore this parameter if you have a bucketGetter. +// However, this method has a higher priority. +func (req *CallRequest) BucketID(bucketID uint64) *CallRequest { + req.bucketID = bucketID + return req +} + +// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens. +func (resp *CallResponse) GetTyped(result interface{}) error { + if resp.err != nil { + return resp.err + } + + return resp.getTypedFnc(result) +} + +// Get waits for Future to be filled and returns the data of the Response and error. +func (resp *CallResponse) Get() ([]interface{}, error) { + if resp.err != nil { + return nil, resp.err + } + + return []interface{}{resp.rawResp}, nil +} diff --git a/vshard.go b/vshard.go index a1f1a0f..b36a440 100644 --- a/vshard.go +++ b/vshard.go @@ -92,13 +92,21 @@ type Config struct { TopologyProvider TopologyProvider // TopologyProvider is required provider // Discovery - DiscoveryTimeout time.Duration // DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout + // DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout. + DiscoveryTimeout time.Duration DiscoveryMode DiscoveryMode TotalBucketCount uint64 User string Password string PoolOpts tarantool.Opts + + // BucketGetter is an optional argument. + // You can specify a function that will receive the bucket id from the context. + // This is useful if you use middleware that inserts the calculated bucket id into the request context. + BucketGetter func(ctx context.Context) uint64 + // RequestTimeout timeout for requests to Tarantool. + RequestTimeout time.Duration } type BucketStatInfo struct {