diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 8115e614418..5beb3c8eb3b 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -18,9 +18,7 @@ package logic import ( "context" - "sync" - - "golang.org/x/exp/maps" + "time" "vitess.io/vitess/go/vt/log" @@ -28,46 +26,74 @@ import ( "vitess.io/vitess/go/vt/vtorc/inst" ) -// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. -func RefreshAllKeyspacesAndShards(ctx context.Context) error { - var keyspaces []string - if len(shardsToWatch) == 0 { // all known keyspaces - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - var err error - // Get all the keyspaces - keyspaces, err = ts.GetKeyspaces(ctx) - if err != nil { - return err +// setupKeyspaceAndShardRecordsWatch sets up a watch on all keyspace and shard records. +func setupKeyspaceAndShardRecordsWatch(ctx context.Context, ts *topo.Server) { + go func() { + for { + if ctx.Err() != nil { + return + } + initialRecs, updateChan, err := ts.WatchAllKeyspaceAndShardRecords(ctx) + if err != nil { + if ctx.Err() != nil { + return + } + // Back for a while and then try setting up the watch again. + time.Sleep(10 * time.Second) + continue + } + for _, rec := range initialRecs { + err = processKeyspacePrefixWatchUpdate(rec) + if err != nil { + log.Errorf("failed to process initial keyspace/shard record: %+v", err) + break + } + } + if err != nil { + continue + } + + for data := range updateChan { + err = processKeyspacePrefixWatchUpdate(data) + if err != nil { + log.Errorf("failed to process keyspace/shard record update: %+v", err) + break + } + } } - } else { - // Get keyspaces to watch from the list of known keyspaces. - keyspaces = maps.Keys(shardsToWatch) + }() +} + +// processKeyspacePrefixWatchUpdate processes a keyspace prefix watch update. +func processKeyspacePrefixWatchUpdate(wd *topo.WatchKeyspacePrefixData) error { + // We ignore the error in the watch data. + // If there is an error that closes the watch, then + // we will open it again. + if wd.Err != nil { + return nil } + if wd.KeyspaceInfo != nil { + return processKeyspaceUpdate(wd) + } else if wd.ShardInfo != nil { + return processShardUpdate(wd) + } + return wd.Err +} - refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer refreshCancel() - var wg sync.WaitGroup - for idx, keyspace := range keyspaces { - // Check if the current keyspace name is the same as the last one. - // If it is, then we know we have already refreshed its information. - // We do not need to do it again. - if idx != 0 && keyspace == keyspaces[idx-1] { - continue - } - wg.Add(2) - go func(keyspace string) { - defer wg.Done() - _ = refreshKeyspaceHelper(refreshCtx, keyspace) - }(keyspace) - go func(keyspace string) { - defer wg.Done() - _ = refreshAllShards(refreshCtx, keyspace) - }(keyspace) +// processShardUpdate processes a shard update. +func processShardUpdate(wd *topo.WatchKeyspacePrefixData) error { + if !shardPartOfWatch(wd.ShardInfo.Keyspace(), wd.ShardInfo.GetKeyRange()) { + return nil } - wg.Wait() + return inst.SaveShard(wd.ShardInfo) +} - return nil +// processKeyspaceUpdate processes a keyspace update. +func processKeyspaceUpdate(wd *topo.WatchKeyspacePrefixData) error { + if !keyspacePartOfWatch(wd.KeyspaceInfo.KeyspaceName()) { + return nil + } + return inst.SaveKeyspace(wd.KeyspaceInfo) } // RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard. @@ -107,28 +133,6 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { return err } -// refreshAllShards refreshes all the shard records in the given keyspace. -func refreshAllShards(ctx context.Context, keyspaceName string) error { - shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{ - // Fetch shard records concurrently to speed up discovery. A typical - // Vitess cluster will have 1-3 vtorc instances deployed, so there is - // little risk of a thundering herd. - Concurrency: 8, - }) - if err != nil { - log.Error(err) - return err - } - for _, shardInfo := range shardInfos { - err = inst.SaveShard(shardInfo) - if err != nil { - log.Error(err) - return err - } - } - return nil -} - // refreshSingleShardHelper is a helper function that refreshes the shard record of the given keyspace/shard. func refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardName string) error { shardInfo, err := ts.GetShard(ctx, keyspaceName, shardName) diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 5d21bfc4d04..a1a80b5138c 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -20,10 +20,12 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/external/golib/sqlutils" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -52,7 +54,86 @@ var ( } ) -func TestRefreshAllKeyspaces(t *testing.T) { +// TestSetupKeyspaceAndShardRecordsWatch tests that the watch is setup correctly for keyspace and shard records. +func TestSetupKeyspaceAndShardRecordsWatch(t *testing.T) { + // Store the old flags and restore on test completion + oldTs := ts + oldClustersToWatch := clustersToWatch + defer func() { + ts = oldTs + clustersToWatch = oldClustersToWatch + }() + + db.ClearVTOrcDatabase() + defer func() { + db.ClearVTOrcDatabase() + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts = memorytopo.NewServer(ctx, "zone1") + + for _, ks := range []string{"ks1", "ks2"} { + err := ts.CreateKeyspace(ctx, ks, keyspaceDurabilityNone) + require.NoError(t, err) + for idx, sh := range []string{"-80", "80-"} { + err = ts.CreateShard(ctx, ks, sh) + require.NoError(t, err) + _, err = ts.UpdateShardFields(ctx, ks, sh, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodatapb.TabletAlias{ + Cell: fmt.Sprintf("zone_%v", ks), + Uid: uint32(100 + idx), + } + return nil + }) + require.NoError(t, err) + } + } + + // Set up the keyspace and shard watch. + setupKeyspaceAndShardRecordsWatch(ctx, ts) + waitForKeyspaceCount(t, 2) + // Verify that we only have ks1 and ks2 in vtorc's db. + verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "") + verifyPrimaryAlias(t, "ks1", "-80", "zone_ks1-0000000100", "") + verifyKeyspaceInfo(t, "ks2", keyspaceDurabilityNone, "") + verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "") + verifyKeyspaceInfo(t, "ks3", nil, "keyspace not found") + verifyPrimaryAlias(t, "ks3", "80-", "", "shard not found") + verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found") + + // Update primary on the shard. + _, err := ts.UpdateShardFields(ctx, "ks1", "-80", func(si *topo.ShardInfo) error { + si.PrimaryAlias.Cell = "updated_new_cell" + return nil + }) + require.NoError(t, err) + + // Delete a shard. + // We will verify that we don't delete a shard info in VTOrc. + // We ignore delete updates for now. + err = ts.DeleteShard(ctx, "ks2", "80-") + require.NoError(t, err) + + // Create a new keyspace record. + err = ts.CreateKeyspace(ctx, "ks3", keyspaceDurabilitySemiSync) + require.NoError(t, err) + + // Check that the watch sees these updates. + waitForKeyspaceCount(t, 3) + // Verify that we only have ks1 and ks2 in vtorc's db. + verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "") + verifyPrimaryAlias(t, "ks1", "-80", "updated_new_cell-0000000100", "") + verifyKeyspaceInfo(t, "ks2", keyspaceDurabilityNone, "") + verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "") + verifyKeyspaceInfo(t, "ks3", keyspaceDurabilitySemiSync, "") + verifyPrimaryAlias(t, "ks3", "80-", "", "shard not found") + verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found") +} + +// TestInitialSetupOfWatch tests that the initial setup of the watch for shards +// and keyspaces loads the latest information from the topo server. +func TestInitialSetupOfWatch(t *testing.T) { // Store the old flags and restore on test completion oldTs := ts oldClustersToWatch := clustersToWatch @@ -94,7 +175,10 @@ func TestRefreshAllKeyspaces(t *testing.T) { onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"} clustersToWatch = onlyKs1and3 initializeShardsToWatch() - require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) + watchCtx, watchCancel := context.WithCancel(context.Background()) + setupKeyspaceAndShardRecordsWatch(watchCtx, ts) + waitForKeyspaceCount(t, 2) + watchCancel() // Verify that we only have ks1 and ks3 in vtorc's db. verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "") @@ -110,7 +194,10 @@ func TestRefreshAllKeyspaces(t *testing.T) { initializeShardsToWatch() // Change the durability policy of ks1 reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync) - require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) + watchCtx, watchCancel = context.WithCancel(context.Background()) + setupKeyspaceAndShardRecordsWatch(watchCtx, ts) + waitForKeyspaceCount(t, 4) + watchCancel() // Verify that all the keyspaces are correctly reloaded verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "") @@ -123,6 +210,30 @@ func TestRefreshAllKeyspaces(t *testing.T) { verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "") } +// waitForKeyspaceCount waits for the keyspace count to match the expected value. +func waitForKeyspaceCount(t *testing.T, count int) { + t.Helper() + timeout := time.After(10 * time.Second) + for { + select { + case <-timeout: + t.Errorf("timed out waiting for keyspace count") + return + default: + } + var curCount = 0 + err := db.QueryVTOrcRowsMap("select count(*) as c from vitess_keyspace", func(row sqlutils.RowMap) error { + curCount = row.GetInt("c") + return nil + }) + require.NoError(t, err) + if curCount == count { + return + } + time.Sleep(100 * time.Millisecond) + } +} + func TestRefreshKeyspace(t *testing.T) { // Store the old flags and restore on test completion oldTs := ts diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 704317f1b68..b06177d1b1a 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -103,32 +103,41 @@ func initializeShardsToWatch() { } } -// tabletPartOfWatch checks if the given tablet is part of the watch list. -func tabletPartOfWatch(tablet *topodatapb.Tablet) bool { +// keyspacePartOfWatch checks if the given keyspace is part of the watch list. +func keyspacePartOfWatch(keyspace string) bool { + // If we are watching all keyspaces, then we want to watch this keyspace too. + if len(shardsToWatch) == 0 { + return true + } + _, shouldWatch := shardsToWatch[keyspace] + return shouldWatch +} + +// shardPartOfWatch checks if the given tablet is part of the watch list. +func shardPartOfWatch(keyspace string, keyRange *topodatapb.KeyRange) bool { // If we are watching all keyspaces, then we want to watch this tablet too. if len(shardsToWatch) == 0 { return true } - shardRanges, ok := shardsToWatch[tablet.GetKeyspace()] + shardRanges, ok := shardsToWatch[keyspace] // If we don't have the keyspace in our map, then this tablet // doesn't need to be watched. if !ok { return false } - // Get the tablet's key range, and check if - // it is part of the shard ranges we are watching. - kr := tablet.GetKeyRange() + + // Check if the key range is part of the shard ranges we are watching. for _, shardRange := range shardRanges { - if key.KeyRangeContainsKeyRange(shardRange, kr) { + if key.KeyRangeContainsKeyRange(shardRange, keyRange) { return true } } return false } -// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker +// OpenDiscoveryFromTopo opens the vitess topo if enables and returns a ticker // channel for polling. -func OpenTabletDiscovery() { +func OpenDiscoveryFromTopo() { ts = topo.Open() tmc = inst.InitializeTMC() // Clear existing cache and perform a new refresh. @@ -137,6 +146,7 @@ func OpenTabletDiscovery() { } // Parse --clusters_to_watch into a filter. initializeShardsToWatch() + setupKeyspaceAndShardRecordsWatch(context.Background(), ts) // We refresh all information from the topo once before we start the ticks to do // it on a timer. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) @@ -198,7 +208,7 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f matchedTablets := make([]*topo.TabletInfo, 0, len(tablets)) func() { for _, t := range tablets { - if tabletPartOfWatch(t.Tablet) { + if shardPartOfWatch(t.Tablet.GetKeyspace(), t.Tablet.GetKeyRange()) { matchedTablets = append(matchedTablets, t) } } diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index b528624a746..90cd684e125 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -103,7 +103,7 @@ var ( } ) -func TestTabletsPartOfWatch(t *testing.T) { +func TestPartOfWatch(t *testing.T) { oldClustersToWatch := clustersToWatch defer func() { clustersToWatch = oldClustersToWatch @@ -111,9 +111,10 @@ func TestTabletsPartOfWatch(t *testing.T) { }() testCases := []struct { - in []string - tablet *topodatapb.Tablet - expectedPartOfWatch bool + in []string + tablet *topodatapb.Tablet + expectedShardPartOfWatch bool + expectedKsPartOfWatch bool }{ { in: []string{}, @@ -121,7 +122,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: keyspace, Shard: shard, }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{keyspace}, @@ -129,7 +131,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: keyspace, Shard: shard, }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{keyspace + "/-"}, @@ -137,7 +140,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: keyspace, Shard: shard, }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{keyspace + "/" + shard}, @@ -145,7 +149,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: keyspace, Shard: shard, }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{"ks/-70", "ks/70-"}, @@ -153,7 +158,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: "ks", KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}), }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{"ks/-70", "ks/70-"}, @@ -161,7 +167,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: "ks", KeyRange: key.NewKeyRange([]byte{0x40}, []byte{0x50}), }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{"ks/-70", "ks/70-"}, @@ -169,7 +176,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: "ks", KeyRange: key.NewKeyRange([]byte{0x70}, []byte{0x90}), }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{"ks/-70", "ks/70-"}, @@ -177,7 +185,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: "ks", KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x90}), }, - expectedPartOfWatch: false, + expectedShardPartOfWatch: false, + expectedKsPartOfWatch: true, }, { in: []string{"ks/50-70"}, @@ -185,7 +194,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: "ks", KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}), }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, @@ -193,7 +203,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: "ks", KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x80}), }, - expectedPartOfWatch: true, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, }, { in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, @@ -201,7 +212,8 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: "ks", KeyRange: key.NewKeyRange([]byte{0x80}, []byte{0x90}), }, - expectedPartOfWatch: false, + expectedShardPartOfWatch: false, + expectedKsPartOfWatch: true, }, { in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, @@ -209,7 +221,17 @@ func TestTabletsPartOfWatch(t *testing.T) { Keyspace: "ks", KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}), }, - expectedPartOfWatch: false, + expectedShardPartOfWatch: false, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks2/-70", "ks2/70-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "unknownKs", + KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}), + }, + expectedShardPartOfWatch: false, + expectedKsPartOfWatch: false, }, } @@ -217,7 +239,8 @@ func TestTabletsPartOfWatch(t *testing.T) { t.Run(fmt.Sprintf("%v-Tablet-%v-%v", strings.Join(tt.in, ","), tt.tablet.GetKeyspace(), tt.tablet.GetShard()), func(t *testing.T) { clustersToWatch = tt.in initializeShardsToWatch() - assert.Equal(t, tt.expectedPartOfWatch, tabletPartOfWatch(tt.tablet)) + assert.Equal(t, tt.expectedShardPartOfWatch, shardPartOfWatch(tt.tablet.GetKeyspace(), tt.tablet.GetKeyRange())) + assert.Equal(t, tt.expectedKsPartOfWatch, keyspacePartOfWatch(tt.tablet.GetKeyspace())) }) } } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 26dd46aa4a1..5ed1e605015 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -24,7 +24,6 @@ import ( "github.com/patrickmn/go-cache" "github.com/sjmudd/stopwatch" - "golang.org/x/sync/errgroup" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -255,7 +254,7 @@ func ContinuousDiscovery() { go handleDiscoveryRequests() - OpenTabletDiscovery() + OpenDiscoveryFromTopo() healthTick := time.Tick(config.HealthPollSeconds * time.Second) caretakingTick := time.Tick(time.Minute) recoveryTick := time.Tick(config.GetRecoveryPollDuration()) @@ -317,23 +316,9 @@ func ContinuousDiscovery() { } } -// refreshTopoTick refreshes information from the topo server on a time tick. +// refreshTopoTick refreshes tablet information from the topo server on a time tick. func refreshTopoTick(ctx context.Context) error { - // Create an errgroup - eg, ctx := errgroup.WithContext(ctx) - - // Refresh all keyspace information. - eg.Go(func() error { - return RefreshAllKeyspacesAndShards(ctx) - }) - - // Refresh all tablets. - eg.Go(func() error { - return refreshAllTablets(ctx) - }) - - // Wait for both the refreshes to complete - err := eg.Wait() + err := refreshAllTablets(ctx) if err == nil { process.FirstDiscoveryCycleComplete.Store(true) }