Skip to content

Commit

Permalink
Add vstream metrics to vtgate (vitessio#13098) (#84)
Browse files Browse the repository at this point in the history
* Add vstream metrics to vtgate



* Update unit test name and use cell variable



* Reset metrics for TestVStreamsCreatedAndLagMetrics, fix data race issue



---------

Signed-off-by: twthorn <thomaswilliamthornton@gmail.com>
  • Loading branch information
twthorn authored May 20, 2023
1 parent 0e55277 commit 21b4c7f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
24 changes: 24 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"sync"
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
Expand All @@ -45,6 +47,9 @@ type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
}

// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
Expand Down Expand Up @@ -115,10 +120,19 @@ type journalEvent struct {
}

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")
return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
[]string{"Keyspace", "ShardName", "TabletType"}),
vstreamsLag: exporter.NewGaugesWithMultiLabels(
"VStreamsLag",
"Difference between event current time and the binlog event timestamp",
[]string{"Keyspace", "ShardName", "TabletType"}),
}
}

Expand Down Expand Up @@ -525,10 +539,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha

log.Infof("Starting to vstream from %s", tablet.Alias.String())
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
var vstreamCreatedOnce sync.Once
err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0

labels := []string{sgtid.Keyspace, sgtid.Shard, target.TabletType.String()}

vstreamCreatedOnce.Do(func() {
vs.vsm.vstreamsCreated.Add(labels, 1)
})

select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -608,6 +629,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
default:
sendevents = append(sendevents, event)
}
lag := event.CurrentTime/1e9 - event.Timestamp
vs.vsm.vstreamsLag.Set(labels, lag)

}
if len(sendevents) != 0 {
eventss = append(eventss, sendevents)
Expand Down
56 changes: 55 additions & 1 deletion go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,59 @@ func TestVStreamMulti(t *testing.T) {
}
}

func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cell := "aa"
ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet())

send0 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 15 * 1e9},
}
sbc0.AddVStreamEvents(send0, nil)

send1 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
}
sbc1.AddVStreamEvents(send1, nil)

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}, {
Keyspace: ks,
Shard: "20-40",
Gtid: "pos",
}},
}
ch := startVStream(ctx, t, vsm, vgtid, nil)
<-ch
<-ch
wantVStreamsCreated := make(map[string]int64)
wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1
wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1
assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches")

wantVStreamsLag := make(map[string]int64)
wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5
wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")
}

func TestVStreamRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1165,7 +1218,8 @@ func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid
func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants ...*binlogdatapb.VStreamResponse) {
t.Helper()
for i, want := range wants {
got := <-ch
val := <-ch
got := proto.Clone(val).(*binlogdatapb.VStreamResponse)
require.NotNil(t, got)
for _, event := range got.Events {
event.Timestamp = 0
Expand Down

0 comments on commit 21b4c7f

Please sign in to comment.