Skip to content

Commit

Permalink
Error out if invalid LastPK is passed
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Jul 25, 2024
1 parent f481a77 commit 640f0c6
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
8 changes: 6 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type uvstreamer struct {

config *uvstreamerConfig

vs *vstreamer //last vstreamer created in uvstreamer
vs *vstreamer // last vstreamer created in uvstreamer
}

type uvstreamerConfig struct {
Expand Down Expand Up @@ -138,6 +138,10 @@ func (uvs *uvstreamer) buildTablePlan() error {
uvs.plans = make(map[string]*tablePlan)
tableLastPKs := make(map[string]*binlogdatapb.TableLastPK)
for _, tablePK := range uvs.inTablePKs {
if tablePK != nil && tablePK.Lastpk != nil && len(tablePK.Lastpk.Fields) == 0 {
log.Errorf("lastpk for table %s has no fields defined", tablePK.TableName)
return fmt.Errorf("lastpk for table %s has no fields defined", tablePK.TableName)
}
tableLastPKs[tablePK.TableName] = tablePK
}
tables := uvs.se.GetSchema()
Expand Down Expand Up @@ -313,7 +317,7 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error {
}
behind := time.Now().UnixNano() - uvs.lastTimestampNs
uvs.setReplicationLagSeconds(behind / 1e9)
//log.Infof("sbm set to %d", uvs.ReplicationLagSeconds)
// log.Infof("sbm set to %d", uvs.ReplicationLagSeconds)
var evs2 []*binlogdatapb.VEvent
if len(uvs.plans) > 0 {
evs2 = uvs.filterEvents(evs)
Expand Down
27 changes: 27 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,33 @@ func TestMissingTables(t *testing.T) {
runCases(t, filter, testcases, startPos, nil)
}

// TestVStreamTableNoPKsWithoutFields confirms that, if a lastpk is passed with no fields defined, it errors out.
func TestVStreamTableNoPKsWithoutFields(t *testing.T) {
ts := &TestSpec{
t: t,
ddls: []string{
"create table t1(id11 int, id12 int, primary key(id11))",
},
}
ts.Init()
defer ts.Close()
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
var tablePKs []*binlogdatapb.TableLastPK
tablePKs = append(tablePKs, getTablePK("t1", 1))
for _, tpk := range tablePKs {
tpk.Lastpk.Fields = nil
}
ctx := context.Background()
ch := make(chan []*binlogdatapb.VEvent)
err := vstream(ctx, t, "", tablePKs, filter, ch)
require.ErrorContains(t, err, "lastpk for table t1 has no fields defined")
}

func TestVStreamCopySimpleFlow(t *testing.T) {
ts := &TestSpec{
t: t,
Expand Down

0 comments on commit 640f0c6

Please sign in to comment.