Skip to content

Commit

Permalink
[release-19.0] Support passing filters to `discovery.NewHealthCheck(.…
Browse files Browse the repository at this point in the history
…..)` (#16170) (#16871)

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt authored Oct 2, 2024
1 parent 15a1d99 commit dd8222a
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 57 deletions.
48 changes: 30 additions & 18 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"net/http"
Expand Down Expand Up @@ -97,6 +98,9 @@ var (
// HealthCheckHealthyTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Healthy Tablets` title to
// create the HTML code required to render the list of healthy tablets from the HealthCheck.
HealthCheckHealthyTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Healthy Tablets")

// errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined.
errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -289,6 +293,24 @@ type HealthCheckImpl struct {
loadTabletsTrigger chan struct{}
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
return nil, errKeyspacesToWatchAndTabletFilters
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
return filters, nil
}

// NewHealthCheck creates a new HealthCheck object.
// Parameters:
// retryDelay.
Expand All @@ -310,10 +332,14 @@ type HealthCheckImpl struct {
//
// The localCell for this healthcheck
//
// callback.
// cellsToWatch.
//
// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl {
// Is a list of cells to watch for tablets.
//
// filters.
//
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

hc := &HealthCheckImpl{
Expand All @@ -329,7 +355,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
loadTabletsTrigger: make(chan struct{}),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
Expand All @@ -340,20 +365,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
64 changes: 60 additions & 4 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,62 @@ func init() {
refreshInterval = time.Minute
}

func TestNewVTGateHealthCheckFilters(t *testing.T) {
defer func() {
KeyspacesToWatch = nil
tabletFilters = nil
}()

testCases := []struct {
name string
keyspacesToWatch []string
tabletFilters []string
expectedError string
expectedFilterTypes []any
}{
{
name: "noFilters",
},
{
name: "tabletFilters",
tabletFilters: []string{"ks1|-80"},
expectedFilterTypes: []any{&FilterByShard{}},
},
{
name: "keyspacesToWatch",
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}},
},
{
name: "failKeyspacesToWatchAndFilters",
tabletFilters: []string{"ks1|-80"},
keyspacesToWatch: []string{"ks1"},
expectedError: errKeyspacesToWatchAndTabletFilters.Error(),
},
{
name: "failInvalidTabletFilters",
tabletFilters: []string{"shouldfail|"},
expectedError: "failed to parse tablet_filters value \"shouldfail|\": error parsing shard name : Code: INVALID_ARGUMENT\nempty name\n",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
KeyspacesToWatch = testCase.keyspacesToWatch
tabletFilters = testCase.tabletFilters

filters, err := NewVTGateHealthCheckFilters()
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
}
assert.Len(t, filters, len(testCase.expectedFilterTypes))
for i, filter := range filters {
assert.IsType(t, testCase.expectedFilterTypes[i], filter)
}
})
}
}

func TestHealthCheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)
// reset error counters
Expand Down Expand Up @@ -1121,7 +1177,7 @@ func TestPrimaryInOtherCell(t *testing.T) {

ts := memorytopo.NewServer(ctx, "cell1", "cell2")
defer ts.Close()
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as primary in different cell
Expand Down Expand Up @@ -1181,7 +1237,7 @@ func TestReplicaInOtherCell(t *testing.T) {

ts := memorytopo.NewServer(ctx, "cell1", "cell2")
defer ts.Close()
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as replica
Expand Down Expand Up @@ -1286,7 +1342,7 @@ func TestCellAliases(t *testing.T) {

ts := memorytopo.NewServer(ctx, "cell1", "cell2")
defer ts.Close()
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Expand Down Expand Up @@ -1437,7 +1493,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic
}

func createTestHc(ctx context.Context, ts *topo.Server) *HealthCheckImpl {
return NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell", "")
return NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell", "", nil)
}

type fakeConn struct {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(ctx, factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)
kss := &keyspaceState{
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestKeyspaceEventTypes(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(ctx, factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)

Expand Down
13 changes: 13 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ type TabletFilter interface {
IsIncluded(tablet *topodata.Tablet) bool
}

// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter

// IsIncluded returns true if a tablet passes all filters.
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
}
}
return true
}

// FilterByShard is a filter that filters tablets by
// keyspace/shard.
type FilterByShard struct {
Expand Down
36 changes: 30 additions & 6 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
filter := NewFilterByKeyspace([]string{"keyspace"})
logger := logutil.NewMemoryLogger()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down Expand Up @@ -172,10 +173,31 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias)
tw.loadTablets()

// Confirm second tablet triggers ListTablets + AddTablet calls.
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

// Check the new tablet is returned by GetAllTablets().
// Add a third tablet in a filtered keyspace to the topology.
tablet3 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 3,
},
Hostname: "host3",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "excluded",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet3), "CreateTablet failed for %v", tablet3.Alias)
tw.loadTablets()

// Confirm filtered tablet did not trigger an AddTablet call.
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 0})
checkChecksum(t, tw, 3177315266)

// Check the second tablet is returned by GetAllTablets(). This should not contain the filtered tablet.
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet2)
assert.Len(t, allTablets, 2)
Expand Down Expand Up @@ -207,14 +229,14 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
assert.Contains(t, allTablets, key)
assert.True(t, proto.Equal(tablet, allTablets[key]))
assert.NotContains(t, allTablets, origKey)
checkChecksum(t, tw, 2762153755)
checkChecksum(t, tw, 3177315266)
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0})
assert.Len(t, allTablets, 2)
assert.Contains(t, allTablets, origKey)
assert.True(t, proto.Equal(origTablet, allTablets[origKey]))
assert.NotContains(t, allTablets, key)
checkChecksum(t, tw, 2762153755)
checkChecksum(t, tw, 3177315266)
}

// Both tablets restart on different hosts.
Expand Down Expand Up @@ -269,7 +291,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
require.Nil(t, err, "FixShardReplication failed")
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
checkChecksum(t, tw, 789108290)
checkChecksum(t, tw, 852159264)

allTablets = fhc.GetAllTablets()
assert.Len(t, allTablets, 1)
Expand All @@ -280,8 +302,10 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
assert.Contains(t, allTablets, key)
assert.True(t, proto.Equal(tablet2, allTablets[key]))

// Remove the other and check that it is detected as being gone.
// Remove the other tablets and check that it is detected as being gone.
// Deleting the filtered tablet should not trigger a RemoveTablet call.
require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias))
require.NoError(t, ts.DeleteTablet(context.Background(), tablet3.Alias))
_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard")
require.Nil(t, err, "FixShardReplication failed")
tw.loadTablets()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func newClient(ctx context.Context, primary *primary, replica *replica, ts *topo
log.Fatal(err)
}

healthCheck := discovery.NewHealthCheck(ctx, 5*time.Second, 1*time.Minute, ts, "cell1", "")
healthCheck := discovery.NewHealthCheck(ctx, 5*time.Second, 1*time.Minute, ts, "cell1", "", nil)
c := &client{
primary: primary,
healthCheck: healthCheck,
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ type TabletGateway struct {
}

func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck {
return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch)
filters, err := discovery.NewVTGateHealthCheckFilters()
if err != nil {
log.Exit(err)
}
return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch, filters)
}

// NewTabletGateway creates and returns a new TabletGateway
Expand Down
Loading

0 comments on commit dd8222a

Please sign in to comment.