Skip to content

VStream API: allow keyspace-level heartbeats to be streamed #16593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type ClusterConfig struct {
vtorcPort int

vreplicationCompressGTID bool
// Set overrideHeartbeatOptions to true to override the default heartbeat options:
// which are set to only on demand (5s) and 250ms interval.
overrideHeartbeatOptions bool
}

// enableGTIDCompression enables GTID compression for the cluster and returns a function
Expand Down Expand Up @@ -517,11 +520,15 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string,
// AddTablet creates new tablet with specified attributes
func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace, shard *Shard, tabletType string, tabletID int) (*Tablet, *exec.Cmd, error) {
tablet := &Tablet{}

options := []string{
var options []string
defaultHeartbeatOptions := []string{
"--heartbeat_on_demand_duration", "5s",
"--heartbeat_interval", "250ms",
}
if !mainClusterConfig.overrideHeartbeatOptions {
options = append(options, defaultHeartbeatOptions...)
}

options = append(options, extraVTTabletArgs...)

if mainClusterConfig.vreplicationCompressGTID {
Expand Down
134 changes: 134 additions & 0 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,137 @@ func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
require.NotZero(t, ne.numDash40Events)
require.NotZero(t, ne.num40DashEvents)
}

const (
vstreamHeartbeatsTestContextTimeout = 20 * time.Second
// Expect a reasonable number of heartbeats to be received in the test duration, should ideally be ~ timeout
// since the heartbeat interval is set to 1s. But we set it to 10 to be conservative to avoid CI flakiness.
numExpectedHeartbeats = 10
)

func doVStream(t *testing.T, vc *VitessCluster, flags *vtgatepb.VStreamFlags) (numRowEvents map[string]int, numFieldEvents map[string]int) {
// Stream for a while to ensure heartbeats are sent.
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(vstreamHeartbeatsTestContextTimeout))
defer cancel()

numRowEvents = make(map[string]int)
numFieldEvents = make(map[string]int)
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
if err != nil {
log.Fatal(err)
}
defer vstreamConn.Close()

done := false
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "product",
Shard: "0",
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer",
}},
}
// Stream events from the VStream API.
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for !done {
evs, err := reader.Recv()
switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
rowEvent := ev.RowEvent
tableName := strings.Split(rowEvent.TableName, ".")[1]
require.Equal(t, "product", rowEvent.Keyspace)
require.Equal(t, "0", rowEvent.Shard)
numRowEvents[tableName]++

case binlogdatapb.VEventType_FIELD:
fieldEvent := ev.FieldEvent
tableName := strings.Split(fieldEvent.TableName, ".")[1]
require.Equal(t, "product", fieldEvent.Keyspace)
require.Equal(t, "0", fieldEvent.Shard)
numFieldEvents[tableName]++
default:
}
}
case io.EOF:
log.Infof("Stream Ended")
done = true
default:
log.Infof("%s:: remote error: %v", time.Now(), err)
done = true
}
}
return numRowEvents, numFieldEvents
}

// TestVStreamHeartbeats enables streaming of the internal Vitess heartbeat tables in the VStream API and
// ensures that the heartbeat events are received as expected by the client.
func TestVStreamHeartbeats(t *testing.T) {
// Enable continuous heartbeats.
extraVTTabletArgs = append(extraVTTabletArgs,
"--heartbeat_enable",
"--heartbeat_interval", "1s",
"--heartbeat_on_demand_duration", "0",
)
setSidecarDBName("_vt")
config := mainClusterConfig
config.overrideHeartbeatOptions = true
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: config,
})
defer vc.TearDown()

require.NotNil(t, vc)
defaultReplicas = 0
defaultRdonly = 0

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema,
defaultReplicas, defaultRdonly, 100, nil)
verifyClusterHealth(t, vc)
insertInitialData(t)

expectedNumRowEvents := make(map[string]int)
expectedNumRowEvents["customer"] = 3

type testCase struct {
name string
flags *vtgatepb.VStreamFlags
expectedHeartbeats int
}
testCases := []testCase{
{
name: "With Keyspace Heartbeats On",
flags: &vtgatepb.VStreamFlags{
StreamKeyspaceHeartbeats: true,
},
expectedHeartbeats: numExpectedHeartbeats,
},
{
name: "With Keyspace Heartbeats Off",
flags: nil,
expectedHeartbeats: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gotNumRowEvents, gotNumFieldEvents := doVStream(t, vc, tc.flags)
for k := range expectedNumRowEvents {
require.Equalf(t, 1, gotNumFieldEvents[k], "incorrect number of field events for table %s, got %d", k, gotNumFieldEvents[k])
}
require.GreaterOrEqual(t, gotNumRowEvents["heartbeat"], tc.expectedHeartbeats, "incorrect number of heartbeat events received")
log.Infof("Total number of heartbeat events received: %v", gotNumRowEvents["heartbeat"])
delete(gotNumRowEvents, "heartbeat")
require.Equal(t, expectedNumRowEvents, gotNumRowEvents)
})
}
}
Loading
Loading