From 640f0c69f6a3a0f218b4c60aa687286e7d39ea16 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 25 Jul 2024 18:40:14 +0530 Subject: [PATCH] Error out if invalid LastPK is passed Signed-off-by: Rohit Nayak --- .../tabletserver/vstreamer/uvstreamer.go | 8 ++++-- .../tabletserver/vstreamer/vstreamer_test.go | 27 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 2b770c1d4f4..7cb1cc4a579 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -88,7 +88,7 @@ type uvstreamer struct { config *uvstreamerConfig - vs *vstreamer //last vstreamer created in uvstreamer + vs *vstreamer // last vstreamer created in uvstreamer } type uvstreamerConfig struct { @@ -138,6 +138,10 @@ func (uvs *uvstreamer) buildTablePlan() error { uvs.plans = make(map[string]*tablePlan) tableLastPKs := make(map[string]*binlogdatapb.TableLastPK) for _, tablePK := range uvs.inTablePKs { + if tablePK != nil && tablePK.Lastpk != nil && len(tablePK.Lastpk.Fields) == 0 { + log.Errorf("lastpk for table %s has no fields defined", tablePK.TableName) + return fmt.Errorf("lastpk for table %s has no fields defined", tablePK.TableName) + } tableLastPKs[tablePK.TableName] = tablePK } tables := uvs.se.GetSchema() @@ -313,7 +317,7 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error { } behind := time.Now().UnixNano() - uvs.lastTimestampNs uvs.setReplicationLagSeconds(behind / 1e9) - //log.Infof("sbm set to %d", uvs.ReplicationLagSeconds) + // log.Infof("sbm set to %d", uvs.ReplicationLagSeconds) var evs2 []*binlogdatapb.VEvent if len(uvs.plans) > 0 { evs2 = uvs.filterEvents(evs) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 8d0d182790e..11b53904d6d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -398,6 +398,33 @@ func TestMissingTables(t *testing.T) { runCases(t, filter, testcases, startPos, nil) } +// TestVStreamTableNoPKsWithoutFields confirms that, if a lastpk is passed with no fields defined, it errors out. +func TestVStreamTableNoPKsWithoutFields(t *testing.T) { + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id11 int, id12 int, primary key(id11))", + }, + } + ts.Init() + defer ts.Close() + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + var tablePKs []*binlogdatapb.TableLastPK + tablePKs = append(tablePKs, getTablePK("t1", 1)) + for _, tpk := range tablePKs { + tpk.Lastpk.Fields = nil + } + ctx := context.Background() + ch := make(chan []*binlogdatapb.VEvent) + err := vstream(ctx, t, "", tablePKs, filter, ch) + require.ErrorContains(t, err, "lastpk for table t1 has no fields defined") +} + func TestVStreamCopySimpleFlow(t *testing.T) { ts := &TestSpec{ t: t,