diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 89a93dde241..8563ef0f4f1 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -24,8 +24,10 @@ import ( "sync" "time" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" @@ -45,6 +47,9 @@ type vstreamManager struct { resolver *srvtopo.Resolver toposerv srvtopo.Server cell string + + vstreamsCreated *stats.CountersWithMultiLabels + vstreamsLag *stats.GaugesWithMultiLabels } // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set @@ -115,10 +120,19 @@ type journalEvent struct { } func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager { + exporter := servenv.NewExporter(cell, "VStreamManager") return &vstreamManager{ resolver: resolver, toposerv: serv, cell: cell, + vstreamsCreated: exporter.NewCountersWithMultiLabels( + "VStreamsCreated", + "Number of vstreams created", + []string{"Keyspace", "ShardName", "TabletType"}), + vstreamsLag: exporter.NewGaugesWithMultiLabels( + "VStreamsLag", + "Difference between event current time and the binlog event timestamp", + []string{"Keyspace", "ShardName", "TabletType"}), } } @@ -525,10 +539,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha log.Infof("Starting to vstream from %s", tablet.Alias.String()) // Safe to access sgtid.Gtid here (because it can't change until streaming begins). + var vstreamCreatedOnce sync.Once err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 + labels := []string{sgtid.Keyspace, sgtid.Shard, target.TabletType.String()} + + vstreamCreatedOnce.Do(func() { + vs.vsm.vstreamsCreated.Add(labels, 1) + }) + select { case <-ctx.Done(): return ctx.Err() @@ -608,6 +629,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha default: sendevents = append(sendevents, event) } + lag := event.CurrentTime/1e9 - event.Timestamp + vs.vsm.vstreamsLag.Set(labels, lag) + } if len(sendevents) != 0 { eventss = append(eventss, sendevents) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index ddae975c937..241ed3280d4 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -407,6 +407,59 @@ func TestVStreamMulti(t *testing.T) { } } +func TestVStreamsCreatedAndLagMetrics(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(hc, st, cell) + vsm.vstreamsCreated.ResetAll() + vsm.vstreamsLag.ResetAll() + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet()) + + send0 := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"}, + {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 15 * 1e9}, + } + sbc0.AddVStreamEvents(send0, nil) + + send1 := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"}, + {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9}, + } + sbc1.AddVStreamEvents(send1, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }, { + Keyspace: ks, + Shard: "20-40", + Gtid: "pos", + }}, + } + ch := startVStream(ctx, t, vsm, vgtid, nil) + <-ch + <-ch + wantVStreamsCreated := make(map[string]int64) + wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1 + wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1 + assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches") + + wantVStreamsLag := make(map[string]int64) + wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5 + wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7 + assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") +} + func TestVStreamRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1165,7 +1218,8 @@ func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants ...*binlogdatapb.VStreamResponse) { t.Helper() for i, want := range wants { - got := <-ch + val := <-ch + got := proto.Clone(val).(*binlogdatapb.VStreamResponse) require.NotNil(t, got) for _, event := range got.Events { event.Timestamp = 0