@@ -555,3 +555,139 @@ func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
555
555
require .NotZero (t , ne .numDash40Events )
556
556
require .NotZero (t , ne .num40DashEvents )
557
557
}
558
+
559
+ const (
560
+ vstreamHeartbeatsTestContextTimeout = 20 * time .Second
561
+ // Expect a reasonable number of heartbeats to be received in the test duration, should ideally be ~ timeout
562
+ // since the heartbeat interval is set to 1s. But we set it to 10 to be conservative to avoid CI flakiness.
563
+ numExpectedHeartbeats = 10
564
+ )
565
+
566
+ func doVStream (t * testing.T , vc * VitessCluster , flags * vtgatepb.VStreamFlags ) (numRowEvents map [string ]int , numFieldEvents map [string ]int ) {
567
+ // Stream for a while to ensure heartbeats are sent.
568
+ ctx , cancel := context .WithTimeout (context .Background (), vstreamHeartbeatsTestContextTimeout )
569
+ defer cancel ()
570
+
571
+ numRowEvents = make (map [string ]int )
572
+ numFieldEvents = make (map [string ]int )
573
+ vstreamConn , err := vtgateconn .Dial (ctx , fmt .Sprintf ("%s:%d" , vc .ClusterConfig .hostname , vc .ClusterConfig .vtgateGrpcPort ))
574
+ require .NoError (t , err )
575
+ defer vstreamConn .Close ()
576
+
577
+ done := false
578
+ vgtid := & binlogdatapb.VGtid {
579
+ ShardGtids : []* binlogdatapb.ShardGtid {{
580
+ Keyspace : "product" ,
581
+ Shard : "0" ,
582
+ Gtid : "" ,
583
+ }}}
584
+
585
+ filter := & binlogdatapb.Filter {
586
+ Rules : []* binlogdatapb.Rule {{
587
+ Match : "customer" ,
588
+ Filter : "select * from customer" ,
589
+ }},
590
+ }
591
+ // Stream events from the VStream API.
592
+ reader , err := vstreamConn .VStream (ctx , topodatapb .TabletType_PRIMARY , vgtid , filter , flags )
593
+ require .NoError (t , err )
594
+ for ! done {
595
+ evs , err := reader .Recv ()
596
+ switch err {
597
+ case nil :
598
+ for _ , ev := range evs {
599
+ switch ev .Type {
600
+ case binlogdatapb .VEventType_ROW :
601
+ rowEvent := ev .RowEvent
602
+ arr := strings .Split (rowEvent .TableName , "." )
603
+ require .Equal (t , len (arr ), 2 )
604
+ tableName := arr [1 ]
605
+ require .Equal (t , "product" , rowEvent .Keyspace )
606
+ require .Equal (t , "0" , rowEvent .Shard )
607
+ numRowEvents [tableName ]++
608
+
609
+ case binlogdatapb .VEventType_FIELD :
610
+ fieldEvent := ev .FieldEvent
611
+ arr := strings .Split (fieldEvent .TableName , "." )
612
+ require .Equal (t , len (arr ), 2 )
613
+ tableName := arr [1 ]
614
+ require .Equal (t , "product" , fieldEvent .Keyspace )
615
+ require .Equal (t , "0" , fieldEvent .Shard )
616
+ numFieldEvents [tableName ]++
617
+ default :
618
+ }
619
+ }
620
+ case io .EOF :
621
+ log .Infof ("Stream Ended" )
622
+ done = true
623
+ default :
624
+ log .Errorf ("remote error: %v" , err )
625
+ done = true
626
+ }
627
+ }
628
+ return numRowEvents , numFieldEvents
629
+ }
630
+
631
+ // TestVStreamHeartbeats enables streaming of the internal Vitess heartbeat tables in the VStream API and
632
+ // ensures that the heartbeat events are received as expected by the client.
633
+ func TestVStreamHeartbeats (t * testing.T ) {
634
+ // Enable continuous heartbeats.
635
+ extraVTTabletArgs = append (extraVTTabletArgs ,
636
+ "--heartbeat_enable" ,
637
+ "--heartbeat_interval" , "1s" ,
638
+ "--heartbeat_on_demand_duration" , "0" ,
639
+ )
640
+ setSidecarDBName ("_vt" )
641
+ config := * mainClusterConfig
642
+ config .overrideHeartbeatOptions = true
643
+ vc = NewVitessCluster (t , & clusterOptions {
644
+ clusterConfig : & config ,
645
+ })
646
+ defer vc .TearDown ()
647
+
648
+ require .NotNil (t , vc )
649
+ defaultReplicas = 0
650
+ defaultRdonly = 0
651
+
652
+ defaultCell := vc .Cells [vc .CellNames [0 ]]
653
+ vc .AddKeyspace (t , []* Cell {defaultCell }, "product" , "0" , initialProductVSchema , initialProductSchema ,
654
+ defaultReplicas , defaultRdonly , 100 , nil )
655
+ verifyClusterHealth (t , vc )
656
+ insertInitialData (t )
657
+
658
+ expectedNumRowEvents := make (map [string ]int )
659
+ expectedNumRowEvents ["customer" ] = 3 // 3 rows inserted in the customer table in insertInitialData()
660
+
661
+ type testCase struct {
662
+ name string
663
+ flags * vtgatepb.VStreamFlags
664
+ expectedHeartbeats int
665
+ }
666
+ testCases := []testCase {
667
+ {
668
+ name : "With Keyspace Heartbeats On" ,
669
+ flags : & vtgatepb.VStreamFlags {
670
+ StreamKeyspaceHeartbeats : true ,
671
+ },
672
+ expectedHeartbeats : numExpectedHeartbeats ,
673
+ },
674
+ {
675
+ name : "With Keyspace Heartbeats Off" ,
676
+ flags : nil ,
677
+ expectedHeartbeats : 0 ,
678
+ },
679
+ }
680
+
681
+ for _ , tc := range testCases {
682
+ t .Run (tc .name , func (t * testing.T ) {
683
+ gotNumRowEvents , gotNumFieldEvents := doVStream (t , vc , tc .flags )
684
+ for k := range expectedNumRowEvents {
685
+ require .Equalf (t , 1 , gotNumFieldEvents [k ], "incorrect number of field events for table %s, got %d" , k , gotNumFieldEvents [k ])
686
+ }
687
+ require .GreaterOrEqual (t , gotNumRowEvents ["heartbeat" ], tc .expectedHeartbeats , "incorrect number of heartbeat events received" )
688
+ log .Infof ("Total number of heartbeat events received: %v" , gotNumRowEvents ["heartbeat" ])
689
+ delete (gotNumRowEvents , "heartbeat" )
690
+ require .Equal (t , expectedNumRowEvents , gotNumRowEvents )
691
+ })
692
+ }
693
+ }
0 commit comments