Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

resolve issue #60 #64

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ BUG FIXES:
FEATURES:

* Support new Sprintf-like logging interface (#48)
* DiscoveryTimeout by default is 1 minute (zero DiscoveryTimeout is not allowed #60)
* All discovering logs has new prefix [DISCOVERY]

KaymeKaydex marked this conversation as resolved.
Show resolved Hide resolved
REFACTOR:

* resolve issue #38: simplify DiscoveryAllBuckets and remove suspicious if
* resolve issue #46: drastically simplify RouterMapCallRWImpl and added tests with real tnt
* Use typed nil pointers instead of memory allocation for EmptyMetrics and emptyLogger structs
* resolve issue #44: remove bucketCount field from struct Replicaset
* rename startCronDiscovery to cronDiscovery and make it panic-tolerant

TESTS:

Expand Down
45 changes: 32 additions & 13 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package vshard_router //nolint:revive
import (
"context"
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -209,31 +210,49 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
return nil
}

// startCronDiscovery is discovery_service_f analog with goroutines instead fibers
func (r *Router) startCronDiscovery(ctx context.Context) error {
// cronDiscovery is discovery_service_f analog with goroutines instead fibers
func (r *Router) cronDiscovery(ctx context.Context) {
var iterationCount uint64

for {
select {
case <-ctx.Done():
r.metrics().CronDiscoveryEvent(false, 0, "ctx-cancel")

return ctx.Err()
r.log().Infof(ctx, "[DISCOVERY] cron discovery has been stopped after %d iterations", iterationCount)
return
case <-time.After(r.cfg.DiscoveryTimeout):
r.log().Debugf(ctx, "started new cron discovery")

tStartDiscovery := time.Now()
iterationCount++
}

// Since the current for loop should not stop until ctx->Done() event fires,
// we should be able to continue execution even a panic occures.
// Therefore, we should wrap everyting into anonymous function that recovers after panic.
// (Similar to pcall in lua/tarantool)
func() {
defer func() {
r.log().Infof(ctx, "discovery done since %s", time.Since(tStartDiscovery))
if recovered := recover(); recovered != nil {
// Another one panic may happen due to log function below (e.g. bug in log().Errorf), in this case we have two options:
// 1. recover again and log nothing: panic will be muted and lost
// 2. don't try to recover, we hope that the second panic will be logged somehow by go runtime
// So, we choose the second behavior
r.log().Errorf(ctx, "[DISCOVERY] something unexpected has happened in cronDiscovery(%d): panic %v, stackstrace: %s",
iterationCount, recovered, string(debug.Stack()))
}
}()

err := r.DiscoveryAllBuckets(ctx)
if err != nil {
r.metrics().CronDiscoveryEvent(false, time.Since(tStartDiscovery), "discovery-error")
r.log().Infof(ctx, "[DISCOVERY] started cron discovery iteration %d", iterationCount)

tStartDiscovery := time.Now()

r.log().Errorf(ctx, "cant do cron discovery with error: %s", err)
if err := r.DiscoveryAllBuckets(ctx); err != nil {
r.metrics().CronDiscoveryEvent(false, time.Since(tStartDiscovery), "discovery-error")
r.log().Errorf(ctx, "[DISCOVERY] cant do cron discovery iteration %d with error: %s", iterationCount, err)
return
}

r.log().Infof(ctx, "[DISCOVERY] finished cron discovery iteration %d", iterationCount)

r.metrics().CronDiscoveryEvent(true, time.Since(tStartDiscovery), "ok")
}
}()
}
}
16 changes: 10 additions & 6 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,10 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
if cfg.DiscoveryMode == DiscoveryModeOn {
discoveryCronCtx, cancelFunc := context.WithCancel(context.Background())

go func() {
discoveryErr := router.startCronDiscovery(discoveryCronCtx)
if discoveryErr != nil {
router.log().Errorf(ctx, "error when run cron discovery: %s", discoveryErr)
}
}()
// run background cron discovery loop
// suppress linter warning: Non-inherited new context, use function like `context.WithXXX` instead (contextcheck)
//nolint:contextcheck
go router.cronDiscovery(discoveryCronCtx)

router.cancelDiscovery = cancelFunc
}
Expand Down Expand Up @@ -214,11 +212,17 @@ func (r *Router) RouteMapClean() {
}

func prepareCfg(cfg Config) (Config, error) {
const discoveryTimeoutDefault = 1 * time.Minute

err := validateCfg(cfg)
if err != nil {
return Config{}, fmt.Errorf("%v: %v", ErrInvalidConfig, err)
}

if cfg.DiscoveryTimeout == 0 {
cfg.DiscoveryTimeout = discoveryTimeoutDefault
}

if cfg.Loggerf == nil {
if cfg.Logger != nil {
cfg.Loggerf = &legacyLoggerProxy{l: cfg.Logger}
Expand Down
Loading