Skip to content

Commit

Permalink
fix backport conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Dec 9, 2024
1 parent 4afc5b3 commit 18f53ea
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 58 deletions.
14 changes: 3 additions & 11 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,14 @@ func OpenTabletDiscovery() <-chan time.Time {
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
}
<<<<<<< HEAD
return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker
=======
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
if err := refreshAllInformation(ctx); err != nil {
log.Errorf("failed to initialize topo information: %+v", err)
}
return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker
>>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129))
return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
Expand All @@ -91,14 +87,10 @@ func refreshAllTablets(ctx context.Context) error {
}, false /* forceRefresh */)
}

<<<<<<< HEAD
func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error {
if !IsLeaderOrActive() {
return
return nil
}
=======
func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error {
>>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129))
if len(clustersToWatch) == 0 { // all known clusters
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
Expand Down
34 changes: 2 additions & 32 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
package logic

import (
<<<<<<< HEAD
"context"
"os"
"os/signal"
=======
"context"
>>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129))
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -420,33 +417,7 @@ func ContinuousDiscovery() {
}
}()
case <-tabletTopoTick:
<<<<<<< HEAD
// Create a wait group
var wg sync.WaitGroup

// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()

// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()

// Wait for both the refreshes to complete
wg.Wait()
// We have completed one discovery cycle in the entirety of it. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}
}
}
=======
ctx, cancel := context.WithTimeout(context.Background(), config.GetTopoInformationRefreshDuration())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(config.Config.TopoInformationRefreshSeconds))
if err := refreshAllInformation(ctx); err != nil {
log.Errorf("failed to refresh topo information: %+v", err)
}
Expand Down Expand Up @@ -477,4 +448,3 @@ func refreshAllInformation(ctx context.Context) error {
}
return err
}
>>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129))
20 changes: 13 additions & 7 deletions go/vt/vtorc/logic/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration {
}

func TestRefreshAllInformation(t *testing.T) {
defer process.ResetLastHealthCheckCache()

// Store the old flags and restore on test completion
oldTs := ts
defer func() {
Expand All @@ -74,27 +76,31 @@ func TestRefreshAllInformation(t *testing.T) {
}()

// Verify in the beginning, we have the first DiscoveredOnce field false.
_, discoveredOnce := process.HealthTest()
require.False(t, discoveredOnce)
_, err := process.HealthTest()
require.NoError(t, err)

// Create a memory topo-server and create the keyspace and shard records
ts = memorytopo.NewServer(context.Background(), cell1)
_, err := ts.GetOrCreateShard(context.Background(), keyspace, shard)
_, err = ts.GetOrCreateShard(context.Background(), keyspace, shard)
require.NoError(t, err)

// Test error
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel context to simulate timeout
require.Error(t, refreshAllInformation(ctx))
require.False(t, process.FirstDiscoveryCycleComplete.Load())
_, discoveredOnce = process.HealthTest()
require.False(t, discoveredOnce)
health, err := process.HealthTest()
require.NoError(t, err)
require.False(t, health.DiscoveredOnce)
process.ResetLastHealthCheckCache()

// Test success
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
require.NoError(t, refreshAllInformation(ctx2))
require.True(t, process.FirstDiscoveryCycleComplete.Load())
_, discoveredOnce = process.HealthTest()
require.True(t, discoveredOnce)
health, err = process.HealthTest()
require.NoError(t, err)
require.True(t, health.DiscoveredOnce)
process.ResetLastHealthCheckCache()
}
11 changes: 3 additions & 8 deletions go/vt/vtorc/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var FirstDiscoveryCycleComplete atomic.Bool

var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second)

func ResetLastHealthCheckCache() { lastHealthCheckCache.Flush() }

type NodeHealth struct {
Hostname string
Token string
Expand Down Expand Up @@ -105,18 +107,11 @@ func RegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) {
}

// HealthTest attempts to write to the backend database and get a result
<<<<<<< HEAD
func HealthTest() (health *HealthStatus, err error) {
cacheKey := util.ProcessToken.Hash
if healthStatus, found := lastHealthCheckCache.Get(cacheKey); found {
return healthStatus.(*HealthStatus), nil
}
=======
func HealthTest() (health *NodeHealth, discoveredOnce bool) {
ThisNodeHealth.LastReported = time.Now()
discoveredOnce = FirstDiscoveryCycleComplete.Load()
ThisNodeHealth.Healthy = discoveredOnce && writeHealthToDatabase()
>>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129))

health = &HealthStatus{Healthy: false, Hostname: ThisHostname, Token: util.ProcessToken.Hash}
defer lastHealthCheckCache.Set(cacheKey, health, cache.DefaultExpiration)
Expand All @@ -127,8 +122,8 @@ func HealthTest() (health *NodeHealth, discoveredOnce bool) {
log.Error(err)
return health, err
}
health.Healthy = healthy
health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load()
health.Healthy = healthy && health.DiscoveredOnce

if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil {
health.Error = err
Expand Down

0 comments on commit 18f53ea

Please sign in to comment.