From eae50cb5ef94c5e8c2c5baa9bf559b10ea6cd8e1 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 17 Feb 2025 20:02:56 +0100 Subject: [PATCH 1/4] Add stats for shards watched by VTOrc Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/inst/shard_dao.go | 37 +++++++++++++ go/vt/vtorc/inst/shard_dao_test.go | 13 ++++- go/vt/vtorc/logic/keyspace_shard_discovery.go | 54 ++++++++++++++++++- 3 files changed, 102 insertions(+), 2 deletions(-) diff --git a/go/vt/vtorc/inst/shard_dao.go b/go/vt/vtorc/inst/shard_dao.go index a90eed0f509..5501ba5d7b9 100644 --- a/go/vt/vtorc/inst/shard_dao.go +++ b/go/vt/vtorc/inst/shard_dao.go @@ -29,6 +29,30 @@ import ( // ErrShardNotFound is a fixed error message used when a shard is not found in the database. var ErrShardNotFound = errors.New("shard not found") +// ReadShardNames reads the names of vitess shards for a single keyspace. +func ReadShardNames(keyspaceName string) (shardNames []string, err error) { + shardNames = make([]string, 0) + query := `select shard from vitess_shard where keyspace = ?` + args := sqlutils.Args(keyspaceName) + err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { + shardNames = append(shardNames, row.GetString("shard")) + return nil + }) + return shardNames, err +} + +// ReadAllShardNames reads the names of all vitess shards by keyspace. +func ReadAllShardNames() (shardNames map[string][]string, err error) { + shardNames = make(map[string][]string) + query := `select keyspace, shard from vitess_shard` + err = db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error { + ks := row.GetString("keyspace") + shardNames[ks] = append(shardNames[ks], row.GetString("shard")) + return nil + }) + return shardNames, err +} + // ReadShardPrimaryInformation reads the vitess shard record and gets the shard primary alias and timestamp. func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias string, primaryTimestamp string, err error) { if err = topo.ValidateKeyspaceName(keyspaceName); err != nil { @@ -95,3 +119,16 @@ func getShardPrimaryTermStartTimeString(shard *topo.ShardInfo) string { } return protoutil.TimeFromProto(shard.PrimaryTermStartTime).UTC().String() } + +// DeleteShard deletes a shard using a keyspace and shard name. +func DeleteShard(keyspace, shard string) error { + _, err := db.ExecVTOrc(`DELETE FROM + vitess_shard + WHERE + keyspace = ? + AND shard = ?`, + keyspace, + shard, + ) + return err +} diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index 84f6aef7a4a..0077b3d64af 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -28,7 +28,7 @@ import ( "vitess.io/vitess/go/vt/vtorc/db" ) -func TestSaveAndReadShard(t *testing.T) { +func TestSaveReadAndDeleteShard(t *testing.T) { // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. defer func() { db.ClearVTOrcDatabase() @@ -93,6 +93,7 @@ func TestSaveAndReadShard(t *testing.T) { require.NoError(t, err) } + // ReadShardPrimaryInformation shardPrimaryAlias, primaryTimestamp, err := ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName) if tt.err != "" { require.EqualError(t, err, tt.err) @@ -101,6 +102,16 @@ func TestSaveAndReadShard(t *testing.T) { require.NoError(t, err) require.EqualValues(t, tt.primaryAliasWanted, shardPrimaryAlias) require.EqualValues(t, tt.primaryTimestampWanted, primaryTimestamp) + + // ReadShardNames + shardNames, err := ReadShardNames(tt.keyspaceName) + require.NoError(t, err) + require.Equal(t, []string{tt.shardName}, shardNames) + + // DeleteShard + require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName)) + _, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName) + require.EqualError(t, err, ErrShardNotFound.Error()) }) } } diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 8115e614418..77f3930be1e 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -22,14 +22,43 @@ import ( "golang.org/x/exp/maps" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/inst" ) +var statsShardsWatched = stats.NewGaugesFuncWithMultiLabels("ShardsWatched", + "Keyspace/shards currently watched", + []string{"Keyspace", "Shard"}, + getShardsWatchedStats) + +// getShardsWatchedStats returns the keyspace/shards watched in a format for stats. +func getShardsWatchedStats() map[string]int64 { + shardsWatched := make(map[string]int64) + allShardNames, err := inst.ReadAllShardNames() + if err != nil { + log.Errorf("Failed to read all shard names: %+v", err) + return shardsWatched + } + for ks, shards := range allShardNames { + for _, shard := range shards { + shardsWatched[ks+"."+shard] = 1 + } + } + return shardsWatched +} + +// refreshAllKeyspacesAndShardsMu ensures RefreshAllKeyspacesAndShards +// is not executed concurrently. +var refreshAllKeyspacesAndShardsMu sync.Mutex + // RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. func RefreshAllKeyspacesAndShards(ctx context.Context) error { + refreshAllKeyspacesAndShardsMu.Lock() + defer refreshAllKeyspacesAndShardsMu.Unlock() + var keyspaces []string if len(shardsToWatch) == 0 { // all known keyspaces ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) @@ -109,6 +138,7 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { // refreshAllShards refreshes all the shard records in the given keyspace. func refreshAllShards(ctx context.Context, keyspaceName string) error { + // get all shards for keyspace name. shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{ // Fetch shard records concurrently to speed up discovery. A typical // Vitess cluster will have 1-3 vtorc instances deployed, so there is @@ -119,13 +149,35 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { log.Error(err) return err } + savedShards := make(map[string]bool, len(shardInfos)) for _, shardInfo := range shardInfos { err = inst.SaveShard(shardInfo) if err != nil { log.Error(err) return err } + savedShards[shardInfo.ShardName()] = true } + + // delete shards that were not returned by ts.FindAllShardsInKeyspace(...), + // indicating they are stale. + shards, err := inst.ReadShardNames(keyspaceName) + if err != nil { + return err + } + for _, shard := range shards { + if savedShards[shard] { + continue + } + shardName := topoproto.KeyspaceShardString(keyspaceName, shard) + log.Infof("Forgetting shard: %s", shardName) + err = inst.DeleteShard(keyspaceName, shard) + if err != nil { + log.Errorf("Failed to delete shard %s: %+v", shardName, err) + return err + } + } + return nil } From a314c0336971590d398b9e11d751fba6fa23106e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 18 Feb 2025 00:37:50 +0100 Subject: [PATCH 2/4] add more tests Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/inst/shard_dao_test.go | 6 ++++ .../logic/keyspace_shard_discovery_test.go | 30 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index 0077b3d64af..3e4f71efcae 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -108,6 +108,12 @@ func TestSaveReadAndDeleteShard(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{tt.shardName}, shardNames) + // ReadAllShardNames + allShardNames, err := ReadAllShardNames() + ksShards, found := allShardNames[tt.keyspaceName] + require.True(t, found) + require.Equal(t, []string{tt.shardName}, ksShards) + // DeleteShard require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName)) _, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName) diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index f05295416d0..bfafc200b31 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -311,3 +311,33 @@ func verifyPrimaryAlias(t *testing.T, keyspaceName, shardName string, primaryAli require.NoError(t, err) require.Equal(t, primaryAliasWanted, primaryAlias) } + +func TestRefreshAllShards(t *testing.T) { + // Store the old flags and restore on test completion + oldTs := ts + defer func() { + ts = oldTs + db.ClearVTOrcDatabase() + }() + + ctx := context.Background() + ts = memorytopo.NewServer(ctx, "zone1") + require.NoError(t, ts.CreateKeyspace(ctx, "ks1", &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilityNone, + })) + shards := []string{"-80", "80-"} + for _, shard := range shards { + require.NoError(t, ts.CreateShard(ctx, "ks1", shard)) + } + require.NoError(t, refreshAllShards(context.Background(), "ks1")) + shardNames, err := inst.ReadShardNames("ks1") + require.NoError(t, err) + require.Equal(t, []string{"-80", "80-"}, shardNames) + + require.NoError(t, ts.DeleteShard(ctx, "ks1", "80-")) + require.NoError(t, refreshAllShards(context.Background(), "ks1")) + shardNames, err = inst.ReadShardNames("ks1") + require.NoError(t, err) + require.Equal(t, []string{"-80"}, shardNames) +} From 730d04681987c176ee942f0499555e00811d6262 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 18 Feb 2025 00:48:29 +0100 Subject: [PATCH 3/4] cleanup Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/keyspace_shard_discovery_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index bfafc200b31..183bfc40779 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -326,17 +326,18 @@ func TestRefreshAllShards(t *testing.T) { KeyspaceType: topodatapb.KeyspaceType_NORMAL, DurabilityPolicy: policy.DurabilityNone, })) + shards := []string{"-80", "80-"} for _, shard := range shards { require.NoError(t, ts.CreateShard(ctx, "ks1", shard)) } - require.NoError(t, refreshAllShards(context.Background(), "ks1")) + require.NoError(t, refreshAllShards(ctx, "ks1")) shardNames, err := inst.ReadShardNames("ks1") require.NoError(t, err) require.Equal(t, []string{"-80", "80-"}, shardNames) require.NoError(t, ts.DeleteShard(ctx, "ks1", "80-")) - require.NoError(t, refreshAllShards(context.Background(), "ks1")) + require.NoError(t, refreshAllShards(ctx, "ks1")) shardNames, err = inst.ReadShardNames("ks1") require.NoError(t, err) require.Equal(t, []string{"-80"}, shardNames) From 474115d2092a8f335c9224445ffbc3d012cad56b Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 18 Feb 2025 20:53:40 +0100 Subject: [PATCH 4/4] fix ineffassign Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/inst/shard_dao_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index 3e4f71efcae..8cf67f10ee6 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -110,6 +110,7 @@ func TestSaveReadAndDeleteShard(t *testing.T) { // ReadAllShardNames allShardNames, err := ReadAllShardNames() + require.NoError(t, err) ksShards, found := allShardNames[tt.keyspaceName] require.True(t, found) require.Equal(t, []string{tt.shardName}, ksShards)