From 24dce55a33a1f7dadcf37e6c29a622a84aaeb96e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 24 Dec 2024 12:05:47 -0500 Subject: [PATCH] Handle --vreplication_experimental_flags options properly Signed-off-by: Matt Lord --- .../vreplication/replicator_plan.go | 6 ++++-- .../vreplication/table_plan_partial.go | 9 +-------- .../vreplication/vplayer_flaky_test.go | 20 ++++++++++++++----- .../tabletserver/vstreamer/vstreamer.go | 4 ++-- 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index a201ce25847..b0a166bc57f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -408,7 +408,9 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun // It still has the data bit set, however, even though it's not really // present. So we have to account for this by unsetting the data bit so // that the column's current JSON value is not lost. - setBit(rowChange.DataColumns.Cols, i, false) + if rowChange.DataColumns != nil { + setBit(rowChange.DataColumns.Cols, i, false) + } newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, nil)) } else { escapedName := sqlescape.EscapeID(field.Name) @@ -482,7 +484,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun for i, field := range tp.Fields { if field.Type == querypb.Type_JSON && rowChange.JsonPartialValues != nil { switch { - case !isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex): + case rowChange.JsonPartialValues != nil && !isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex): // We use the full AFTER value which we already have. case len(afterVals[i].Raw()) == 0: // If the JSON column was NOT updated then the JSON column is marked as partial diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go index a4e177a9f14..ba2998b24c4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go @@ -26,7 +26,6 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" - vttablet "vitess.io/vitess/go/vt/vttablet/common" ) // isBitSet returns true if the bit at index is set @@ -47,13 +46,7 @@ func setBit(data []byte, index int, value bool) { } func (tp *TablePlan) isPartial(rowChange *binlogdatapb.RowChange) bool { - if (tp.WorkflowConfig.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage) == 0 || - rowChange.DataColumns == nil || - rowChange.DataColumns.Count == 0 { - - return false - } - return true + return rowChange != nil && rowChange.DataColumns != nil && rowChange.DataColumns.Count > 0 } func (tpb *tablePlanBuilder) generatePartialValuesPart(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter, dataColumns *binlogdatapb.RowChange_Bitmap) *sqlparser.ParsedQuery { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index fe96e47281c..8371928a9f5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1762,11 +1762,21 @@ func TestPlayerPartialImages(t *testing.T) { execStatements(t, []string{tc.input}) var want qh.ExpectationSequencer if tc.error != "" { - want = qh.Expect( - "rollback", - ).Then(qh.Immediately( - fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error), - )) + if vttablet.DefaultVReplicationConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching == 0 { + want = qh.Expect( + "begin", + "delete from dst where id=3", + "rollback", + ).Then(qh.Immediately( + fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error), + )) + } else { + want = qh.Expect( + "rollback", + ).Then(qh.Immediately( + fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error), + )) + } expectDBClientQueries(t, want) } else { want = qh.Expect( diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 1cedc01dbf1..6bd79343af2 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -1031,8 +1031,8 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea } if afterOK { rowChange.After = sqltypes.RowToProto3(afterValues) - if (vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) && - (partial || row.JSONPartialValues.Count() > 0) { + if ((vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) && + partial) || (row.JSONPartialValues.Count() > 0) { rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{ Count: int64(rows.DataColumns.Count()),