Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
resolve issue #48 (#51)
Browse files Browse the repository at this point in the history
* support new Sprintf-like logging interface
* use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs
* do not export EmptyLogger anymore, because it is the default behavior
  • Loading branch information
nurzhan-saktaganov authored Sep 5, 2024
1 parent 3441423 commit 6de86ab
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 38 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
## Unreleased

FEATURES:

* Support new Sprintf-like logging interface (#48)

REFACTOR:

* resolve issue #38: simplify DiscoveryAllBuckets and remove suspicious if
* resolve issue #46: drastically simplify RouterMapCallRWImpl and added tests with real tnt
* Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs

## 0.0.12

Expand Down
18 changes: 9 additions & 9 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
if since := time.Since(timeStart); since > timeout {
r.metrics().RequestDuration(since, false, false)

r.log().Debug(ctx, fmt.Sprintf("return result on timeout; since %s of timeout %s", since, timeout))
r.log().Debugf(ctx, "return result on timeout; since %s of timeout %s", since, timeout)
if err == nil {
err = fmt.Errorf("cant get call cause call impl timeout")
}
Expand All @@ -114,34 +114,34 @@ func (r *Router) RouterCallImpl(ctx context.Context,

rs, err = r.BucketResolve(ctx, bucketID)
if err != nil {
r.log().Debug(ctx, fmt.Sprintf("cant resolve bucket %d with error: %s", bucketID, err.Error()))
r.log().Debugf(ctx, "cant resolve bucket %d with error: %s", bucketID, err.Error())

r.metrics().RetryOnCall("bucket_resolve_error")
continue
}

r.log().Info(ctx, fmt.Sprintf("try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID))
r.log().Infof(ctx, "try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)

future := rs.conn.Do(req, opts.PoolMode)

var respData []interface{}
respData, err = future.Get()
if err != nil {
r.log().Error(ctx, fmt.Sprintf("got future error: %s", err))
r.log().Errorf(ctx, "got future error: %s", err)
r.metrics().RetryOnCall("future_get_error")

continue
}

r.log().Debug(ctx, fmt.Sprintf("got call result response data %s", respData))
r.log().Debugf(ctx, "got call result response data %s", respData)

if len(respData) < 1 {
// vshard.storage.call(func) returns up to two values:
// - true/false
// - func result, omitted if func does not return anything
err = fmt.Errorf("invalid length of response data: must be >= 1, current: %d", len(respData))

r.log().Error(ctx, err.Error())
r.log().Errorf(ctx, "%s", err.Error())

r.metrics().RetryOnCall("resp_data_error")
continue
Expand All @@ -161,13 +161,13 @@ func (r *Router) RouterCallImpl(ctx context.Context,

err = fmt.Errorf("cant decode vhsard err by trarantool with err: %s; continue try", err)

r.log().Error(ctx, err.Error())
r.log().Errorf(ctx, "%s", err.Error())
continue
}

err = vshardErr

r.log().Error(ctx, fmt.Sprintf("got vshard storage call error: %s", err))
r.log().Errorf(ctx, "got vshard storage call error: %s", err)

if vshardErr.Name == "WRONG_BUCKET" ||
vshardErr.Name == "BUCKET_IS_LOCKED" ||
Expand All @@ -184,7 +184,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
isVShardRespOk := false
err = future.GetTyped(&[]interface{}{&isVShardRespOk})
if err != nil {
r.log().Debug(ctx, fmt.Sprintf("cant get typed with err: %s", err))
r.log().Debugf(ctx, "cant get typed with err: %s", err)

continue
}
Expand Down
8 changes: 4 additions & 4 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
var emptyRouter = &Router{
cfg: Config{
TotalBucketCount: uint64(10),
Logger: &EmptyLogger{},
Metrics: &EmptyMetrics{},
Loggerf: emptyLogfProvider,
Metrics: emptyMetricsProvider,
},
}

Expand Down Expand Up @@ -48,8 +48,8 @@ func TestRouter_RouterCallImpl(t *testing.T) {
r := &Router{
cfg: Config{
TotalBucketCount: uint64(10),
Logger: &EmptyLogger{},
Metrics: &EmptyMetrics{},
Loggerf: emptyLogfProvider,
Metrics: emptyMetricsProvider,
},
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], 11),
Expand Down
16 changes: 8 additions & 8 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica

// it`s ok if in the same time we have few active searches
// mu per bucket is expansive
r.cfg.Logger.Info(ctx, fmt.Sprintf("Discovering bucket %d", bucketID))
r.log().Infof(ctx, "Discovering bucket %d", bucketID)

idToReplicasetRef := r.getIDToReplicaset()

Expand Down Expand Up @@ -137,20 +137,20 @@ func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buc
}

if count != rs.bucketCount.Load() {
r.cfg.Logger.Info(ctx, fmt.Sprintf("Updated %s buckets: was %d, became %d", rs.info.Name, rs.bucketCount.Load(), count))
r.log().Infof(ctx, "Updated %s buckets: was %d, became %d", rs.info.Name, rs.bucketCount.Load(), count)
}

rs.bucketCount.Store(count)

for rs, oldBucketCount := range affected {
r.log().Info(ctx, fmt.Sprintf("Affected buckets of %s: was %d, became %d", rs.info.Name, oldBucketCount, rs.bucketCount.Load()))
r.log().Infof(ctx, "Affected buckets of %s: was %d, became %d", rs.info.Name, oldBucketCount, rs.bucketCount.Load())
}
}

func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
t := time.Now()

r.log().Info(ctx, "start discovery all buckets")
r.log().Infof(ctx, "start discovery all buckets")

errGr, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -208,7 +208,7 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
if err != nil {
return fmt.Errorf("errGr.Wait() err: %w", err)
}
r.log().Info(ctx, fmt.Sprintf("discovery done since: %s", time.Since(t)))
r.log().Infof(ctx, "discovery done since: %s", time.Since(t))

return nil
}
Expand All @@ -222,19 +222,19 @@ func (r *Router) startCronDiscovery(ctx context.Context) error {

return ctx.Err()
case <-time.After(r.cfg.DiscoveryTimeout):
r.log().Debug(ctx, "started new cron discovery")
r.log().Debugf(ctx, "started new cron discovery")

tStartDiscovery := time.Now()

defer func() {
r.log().Info(ctx, fmt.Sprintf("discovery done since %s", time.Since(tStartDiscovery)))
r.log().Infof(ctx, "discovery done since %s", time.Since(tStartDiscovery))
}()

err := r.DiscoveryAllBuckets(ctx)
if err != nil {
r.metrics().CronDiscoveryEvent(false, time.Since(tStartDiscovery), "discovery-error")

r.log().Error(ctx, fmt.Sprintf("cant do cron discovery with error: %s", err))
r.log().Errorf(ctx, "cant do cron discovery with error: %s", err)
}

r.metrics().CronDiscoveryEvent(true, time.Since(tStartDiscovery), "ok")
Expand Down
2 changes: 1 addition & 1 deletion discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestRouter_BucketResolve_InvalidBucketID(t *testing.T) {
r := Router{
cfg: Config{
TotalBucketCount: uint64(10),
Logger: &EmptyLogger{},
Loggerf: emptyLogfProvider,
},
view: &consistentView{
routeMap: make([]atomic.Pointer[Replicaset], 11),
Expand Down
47 changes: 39 additions & 8 deletions providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,60 @@ package vshard_router //nolint:revive

import (
"context"
"fmt"
"log"
"time"
)

var (
_ MetricsProvider = (*EmptyMetrics)(nil)
_ LogProvider = (*EmptyLogger)(nil)
_ LogProvider = (*StdoutLogger)(nil)
emptyMetricsProvider MetricsProvider = (*EmptyMetrics)(nil)

emptyLogfProvider LogfProvider = (*emptyLogger)(nil)
_ LogProvider = (*StdoutLogger)(nil)
)

// A legacy interface for backward compatibility
type LogProvider interface {
Info(context.Context, string)
Debug(context.Context, string)
Error(context.Context, string)
Warn(context.Context, string)
}

type EmptyLogger struct{}
type LogfProvider interface {
Infof(ctx context.Context, format string, v ...any)
Debugf(ctx context.Context, format string, v ...any)
Errorf(ctx context.Context, format string, v ...any)
Warnf(ctx context.Context, format string, v ...any)
}

// We use this type to support legacy logger api
type legacyLoggerProxy struct {
l LogProvider
}

func (p *legacyLoggerProxy) Infof(ctx context.Context, format string, v ...any) {
p.l.Info(ctx, fmt.Sprintf(format, v...))
}

func (p *legacyLoggerProxy) Debugf(ctx context.Context, format string, v ...any) {
p.l.Debug(ctx, fmt.Sprintf(format, v...))
}

func (p *legacyLoggerProxy) Errorf(ctx context.Context, format string, v ...any) {
p.l.Error(ctx, fmt.Sprintf(format, v...))
}

func (p *legacyLoggerProxy) Warnf(ctx context.Context, format string, v ...any) {
p.l.Warn(ctx, fmt.Sprintf(format, v...))
}

type emptyLogger struct{}

func (e *EmptyLogger) Info(_ context.Context, _ string) {}
func (e *EmptyLogger) Debug(_ context.Context, _ string) {}
func (e *EmptyLogger) Error(_ context.Context, _ string) {}
func (e *EmptyLogger) Warn(_ context.Context, _ string) {}
func (e *emptyLogger) Infof(_ context.Context, _ string, _ ...any) {}
func (e *emptyLogger) Debugf(_ context.Context, _ string, _ ...any) {}
func (e *emptyLogger) Errorf(_ context.Context, _ string, _ ...any) {}
func (e *emptyLogger) Warnf(_ context.Context, _ string, _ ...any) {}

type StdoutLogger struct{}

Expand Down
22 changes: 14 additions & 8 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type Router struct {
func (r *Router) metrics() MetricsProvider {
return r.cfg.Metrics
}
func (r *Router) log() LogProvider {
return r.cfg.Logger

func (r *Router) log() LogfProvider {
return r.cfg.Loggerf
}

func (r *Router) getConsistentView() *consistentView {
Expand All @@ -85,7 +86,8 @@ func (r *Router) setConsistentView(view *consistentView) {

type Config struct {
// Providers
Logger LogProvider // Logger is not required
Logger LogProvider // Logger is not required, legacy interface
Loggerf LogfProvider // Loggerf is not required, new interface
Metrics MetricsProvider // Metrics is not required
TopologyProvider TopologyProvider // TopologyProvider is required provider

Expand Down Expand Up @@ -147,7 +149,7 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {

err = cfg.TopologyProvider.Init(router.Topology())
if err != nil {
router.log().Error(ctx, fmt.Sprintf("cant create new topology provider with err: %s", err))
router.log().Errorf(ctx, "cant create new topology provider with err: %s", err)

return nil, fmt.Errorf("%w; cant init topology with err: %w", ErrTopologyProvider, err)
}
Expand All @@ -163,7 +165,7 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
go func() {
discoveryErr := router.startCronDiscovery(discoveryCronCtx)
if discoveryErr != nil {
router.log().Error(ctx, fmt.Sprintf("error when run cron discovery: %s", discoveryErr))
router.log().Errorf(ctx, "error when run cron discovery: %s", discoveryErr)
}
}()

Expand Down Expand Up @@ -230,12 +232,16 @@ func prepareCfg(cfg Config) (Config, error) {
return Config{}, fmt.Errorf("%v: %v", ErrInvalidConfig, err)
}

if cfg.Logger == nil {
cfg.Logger = &EmptyLogger{}
if cfg.Loggerf == nil {
if cfg.Logger != nil {
cfg.Loggerf = &legacyLoggerProxy{l: cfg.Logger}
} else {
cfg.Loggerf = emptyLogfProvider
}
}

if cfg.Metrics == nil {
cfg.Metrics = &EmptyMetrics{}
cfg.Metrics = emptyMetricsProvider
}

return cfg, nil
Expand Down

0 comments on commit 6de86ab

Please sign in to comment.