Skip to content

Commit

Permalink
feat: use the new watch to remove refresh of keyspace and shards on t…
Browse files Browse the repository at this point in the history
…ime tick

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Jan 22, 2025
1 parent e53647b commit 78e60e2
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 108 deletions.
124 changes: 64 additions & 60 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,82 @@ package logic

import (
"context"
"sync"

"golang.org/x/exp/maps"
"time"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtorc/inst"
)

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
var keyspaces []string
if len(shardsToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var err error
// Get all the keyspaces
keyspaces, err = ts.GetKeyspaces(ctx)
if err != nil {
return err
// setupKeyspaceAndShardRecordsWatch sets up a watch on all keyspace and shard records.
func setupKeyspaceAndShardRecordsWatch(ctx context.Context, ts *topo.Server) {
go func() {
for {
if ctx.Err() != nil {
return
}
initialRecs, updateChan, err := ts.WatchAllKeyspaceAndShardRecords(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
// Back for a while and then try setting up the watch again.
time.Sleep(10 * time.Second)
continue
}
for _, rec := range initialRecs {
err = processKeyspacePrefixWatchUpdate(rec)
if err != nil {
log.Errorf("failed to process initial keyspace/shard record: %+v", err)
break
}
}
if err != nil {
continue
}

for data := range updateChan {
err = processKeyspacePrefixWatchUpdate(data)
if err != nil {
log.Errorf("failed to process keyspace/shard record update: %+v", err)
break
}
}
}
} else {
// Get keyspaces to watch from the list of known keyspaces.
keyspaces = maps.Keys(shardsToWatch)
}()
}

// processKeyspacePrefixWatchUpdate processes a keyspace prefix watch update.
func processKeyspacePrefixWatchUpdate(wd *topo.WatchKeyspacePrefixData) error {
// We ignore the error in the watch data.
// If there is an error that closes the watch, then
// we will open it again.
if wd.Err != nil {
return nil
}
if wd.KeyspaceInfo != nil {
return processKeyspaceUpdate(wd)
} else if wd.ShardInfo != nil {
return processShardUpdate(wd)
}
return wd.Err
}

refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for idx, keyspace := range keyspaces {
// Check if the current keyspace name is the same as the last one.
// If it is, then we know we have already refreshed its information.
// We do not need to do it again.
if idx != 0 && keyspace == keyspaces[idx-1] {
continue
}
wg.Add(2)
go func(keyspace string) {
defer wg.Done()
_ = refreshKeyspaceHelper(refreshCtx, keyspace)
}(keyspace)
go func(keyspace string) {
defer wg.Done()
_ = refreshAllShards(refreshCtx, keyspace)
}(keyspace)
// processShardUpdate processes a shard update.
func processShardUpdate(wd *topo.WatchKeyspacePrefixData) error {
if !shardPartOfWatch(wd.ShardInfo.Keyspace(), wd.ShardInfo.GetKeyRange()) {
return nil
}
wg.Wait()
return inst.SaveShard(wd.ShardInfo)
}

return nil
// processKeyspaceUpdate processes a keyspace update.
func processKeyspaceUpdate(wd *topo.WatchKeyspacePrefixData) error {
if !keyspacePartOfWatch(wd.KeyspaceInfo.KeyspaceName()) {
return nil
}
return inst.SaveKeyspace(wd.KeyspaceInfo)
}

// RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard.
Expand Down Expand Up @@ -107,28 +133,6 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error {
return err
}

// refreshAllShards refreshes all the shard records in the given keyspace.
func refreshAllShards(ctx context.Context, keyspaceName string) error {
shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{
// Fetch shard records concurrently to speed up discovery. A typical
// Vitess cluster will have 1-3 vtorc instances deployed, so there is
// little risk of a thundering herd.
Concurrency: 8,
})
if err != nil {
log.Error(err)
return err
}
for _, shardInfo := range shardInfos {
err = inst.SaveShard(shardInfo)
if err != nil {
log.Error(err)
return err
}
}
return nil
}

// refreshSingleShardHelper is a helper function that refreshes the shard record of the given keyspace/shard.
func refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardName string) error {
shardInfo, err := ts.GetShard(ctx, keyspaceName, shardName)
Expand Down
117 changes: 114 additions & 3 deletions go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand Down Expand Up @@ -52,7 +54,86 @@ var (
}
)

func TestRefreshAllKeyspaces(t *testing.T) {
// TestSetupKeyspaceAndShardRecordsWatch tests that the watch is setup correctly for keyspace and shard records.
func TestSetupKeyspaceAndShardRecordsWatch(t *testing.T) {
// Store the old flags and restore on test completion
oldTs := ts
oldClustersToWatch := clustersToWatch
defer func() {
ts = oldTs
clustersToWatch = oldClustersToWatch
}()

db.ClearVTOrcDatabase()
defer func() {
db.ClearVTOrcDatabase()
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts = memorytopo.NewServer(ctx, "zone1")

for _, ks := range []string{"ks1", "ks2"} {
err := ts.CreateKeyspace(ctx, ks, keyspaceDurabilityNone)
require.NoError(t, err)
for idx, sh := range []string{"-80", "80-"} {
err = ts.CreateShard(ctx, ks, sh)
require.NoError(t, err)
_, err = ts.UpdateShardFields(ctx, ks, sh, func(si *topo.ShardInfo) error {
si.PrimaryAlias = &topodatapb.TabletAlias{
Cell: fmt.Sprintf("zone_%v", ks),
Uid: uint32(100 + idx),
}
return nil
})
require.NoError(t, err)
}
}

// Set up the keyspace and shard watch.
setupKeyspaceAndShardRecordsWatch(ctx, ts)
waitForKeyspaceCount(t, 2)
// Verify that we only have ks1 and ks2 in vtorc's db.
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
verifyPrimaryAlias(t, "ks1", "-80", "zone_ks1-0000000100", "")
verifyKeyspaceInfo(t, "ks2", keyspaceDurabilityNone, "")
verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "")
verifyKeyspaceInfo(t, "ks3", nil, "keyspace not found")
verifyPrimaryAlias(t, "ks3", "80-", "", "shard not found")
verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found")

// Update primary on the shard.
_, err := ts.UpdateShardFields(ctx, "ks1", "-80", func(si *topo.ShardInfo) error {
si.PrimaryAlias.Cell = "updated_new_cell"
return nil
})
require.NoError(t, err)

// Delete a shard.
// We will verify that we don't delete a shard info in VTOrc.
// We ignore delete updates for now.
err = ts.DeleteShard(ctx, "ks2", "80-")
require.NoError(t, err)

// Create a new keyspace record.
err = ts.CreateKeyspace(ctx, "ks3", keyspaceDurabilitySemiSync)
require.NoError(t, err)

// Check that the watch sees these updates.
waitForKeyspaceCount(t, 3)
// Verify that we only have ks1 and ks2 in vtorc's db.
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
verifyPrimaryAlias(t, "ks1", "-80", "updated_new_cell-0000000100", "")
verifyKeyspaceInfo(t, "ks2", keyspaceDurabilityNone, "")
verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "")
verifyKeyspaceInfo(t, "ks3", keyspaceDurabilitySemiSync, "")
verifyPrimaryAlias(t, "ks3", "80-", "", "shard not found")
verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found")
}

// TestInitialSetupOfWatch tests that the initial setup of the watch for shards
// and keyspaces loads the latest information from the topo server.
func TestInitialSetupOfWatch(t *testing.T) {
// Store the old flags and restore on test completion
oldTs := ts
oldClustersToWatch := clustersToWatch
Expand Down Expand Up @@ -94,7 +175,10 @@ func TestRefreshAllKeyspaces(t *testing.T) {
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
clustersToWatch = onlyKs1and3
initializeShardsToWatch()
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
watchCtx, watchCancel := context.WithCancel(context.Background())
setupKeyspaceAndShardRecordsWatch(watchCtx, ts)
waitForKeyspaceCount(t, 2)
watchCancel()

// Verify that we only have ks1 and ks3 in vtorc's db.
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
Expand All @@ -110,7 +194,10 @@ func TestRefreshAllKeyspaces(t *testing.T) {
initializeShardsToWatch()
// Change the durability policy of ks1
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync)
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
watchCtx, watchCancel = context.WithCancel(context.Background())
setupKeyspaceAndShardRecordsWatch(watchCtx, ts)
waitForKeyspaceCount(t, 4)
watchCancel()

// Verify that all the keyspaces are correctly reloaded
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "")
Expand All @@ -123,6 +210,30 @@ func TestRefreshAllKeyspaces(t *testing.T) {
verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "")
}

// waitForKeyspaceCount waits for the keyspace count to match the expected value.
func waitForKeyspaceCount(t *testing.T, count int) {
t.Helper()
timeout := time.After(10 * time.Second)
for {
select {
case <-timeout:
t.Errorf("timed out waiting for keyspace count")
return
default:
}
var curCount = 0
err := db.QueryVTOrcRowsMap("select count(*) as c from vitess_keyspace", func(row sqlutils.RowMap) error {
curCount = row.GetInt("c")
return nil
})
require.NoError(t, err)
if curCount == count {
return
}
time.Sleep(100 * time.Millisecond)
}
}

func TestRefreshKeyspace(t *testing.T) {
// Store the old flags and restore on test completion
oldTs := ts
Expand Down
30 changes: 20 additions & 10 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,41 @@ func initializeShardsToWatch() {
}
}

// tabletPartOfWatch checks if the given tablet is part of the watch list.
func tabletPartOfWatch(tablet *topodatapb.Tablet) bool {
// keyspacePartOfWatch checks if the given keyspace is part of the watch list.
func keyspacePartOfWatch(keyspace string) bool {
// If we are watching all keyspaces, then we want to watch this keyspace too.
if len(shardsToWatch) == 0 {
return true
}
_, shouldWatch := shardsToWatch[keyspace]
return shouldWatch
}

// shardPartOfWatch checks if the given tablet is part of the watch list.
func shardPartOfWatch(keyspace string, keyRange *topodatapb.KeyRange) bool {
// If we are watching all keyspaces, then we want to watch this tablet too.
if len(shardsToWatch) == 0 {
return true
}
shardRanges, ok := shardsToWatch[tablet.GetKeyspace()]
shardRanges, ok := shardsToWatch[keyspace]
// If we don't have the keyspace in our map, then this tablet
// doesn't need to be watched.
if !ok {
return false
}
// Get the tablet's key range, and check if
// it is part of the shard ranges we are watching.
kr := tablet.GetKeyRange()

// Check if the key range is part of the shard ranges we are watching.
for _, shardRange := range shardRanges {
if key.KeyRangeContainsKeyRange(shardRange, kr) {
if key.KeyRangeContainsKeyRange(shardRange, keyRange) {
return true
}
}
return false
}

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
// OpenDiscoveryFromTopo opens the vitess topo if enables and returns a ticker
// channel for polling.
func OpenTabletDiscovery() {
func OpenDiscoveryFromTopo() {
ts = topo.Open()
tmc = inst.InitializeTMC()
// Clear existing cache and perform a new refresh.
Expand All @@ -137,6 +146,7 @@ func OpenTabletDiscovery() {
}
// Parse --clusters_to_watch into a filter.
initializeShardsToWatch()
setupKeyspaceAndShardRecordsWatch(context.Background(), ts)
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
Expand Down Expand Up @@ -198,7 +208,7 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
for _, t := range tablets {
if tabletPartOfWatch(t.Tablet) {
if shardPartOfWatch(t.Tablet.GetKeyspace(), t.Tablet.GetKeyRange()) {
matchedTablets = append(matchedTablets, t)
}
}
Expand Down
Loading

0 comments on commit 78e60e2

Please sign in to comment.