Skip to content

Commit

Permalink
Handle --vreplication_experimental_flags options properly
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 24, 2024
1 parent 059d01a commit ebc71af
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

// isBitSet returns true if the bit at index is set
Expand All @@ -47,13 +46,11 @@ func setBit(data []byte, index int, value bool) {
}

func (tp *TablePlan) isPartial(rowChange *binlogdatapb.RowChange) bool {
if (tp.WorkflowConfig.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage) == 0 ||
rowChange.DataColumns == nil ||
rowChange.DataColumns.Count == 0 {

if rowChange == nil {
return false
}
return true
return (rowChange.DataColumns != nil && rowChange.DataColumns.Count > 0) ||
(rowChange.JsonPartialValues != nil && rowChange.JsonPartialValues.Count > 0)
}

func (tpb *tablePlanBuilder) generatePartialValuesPart(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter, dataColumns *binlogdatapb.RowChange_Bitmap) *sqlparser.ParsedQuery {
Expand Down
42 changes: 34 additions & 8 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,8 +1560,24 @@ func TestPlayerPartialImages(t *testing.T) {
error string
}

testCases := []testCase{
{
var testCases []testCase

if vttablet.DefaultVReplicationConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching == 0 {
testCases = append(testCases, testCase{
input: "insert into src (id, jd, bd) values (1,'{\"key1\": \"val1\"}','blob data'), (2,'{\"key2\": \"val2\"}','blob data2'), (3,'{\"key3\": \"val3\"}','blob data3')",
output: []string{
"insert into dst(id,jd,bd) values (1,JSON_OBJECT(_utf8mb4'key1', _utf8mb4'val1'),_binary'blob data')",
"insert into dst(id,jd,bd) values (2,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'blob data2')",
"insert into dst(id,jd,bd) values (3,JSON_OBJECT(_utf8mb4'key3', _utf8mb4'val3'),_binary'blob data3')",
},
data: [][]string{
{"1", "{\"key1\": \"val1\"}", "blob data"},
{"2", "{\"key2\": \"val2\"}", "blob data2"},
{"3", "{\"key3\": \"val3\"}", "blob data3"},
},
})
} else {
testCases = append(testCases, testCase{
input: "insert into src (id, jd, bd) values (1,'{\"key1\": \"val1\"}','blob data'), (2,'{\"key2\": \"val2\"}','blob data2'), (3,'{\"key3\": \"val3\"}','blob data3')",
output: []string{
"insert into dst(id,jd,bd) values (1,JSON_OBJECT(_utf8mb4'key1', _utf8mb4'val1'),_binary'blob data'), (2,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'blob data2'), (3,JSON_OBJECT(_utf8mb4'key3', _utf8mb4'val3'),_binary'blob data3')",
Expand All @@ -1571,7 +1587,7 @@ func TestPlayerPartialImages(t *testing.T) {
{"2", "{\"key2\": \"val2\"}", "blob data2"},
{"3", "{\"key3\": \"val3\"}", "blob data3"},
},
},
})
}
if runNoBlobTest {
testCases = append(testCases, testCase{
Expand Down Expand Up @@ -1762,11 +1778,21 @@ func TestPlayerPartialImages(t *testing.T) {
execStatements(t, []string{tc.input})
var want qh.ExpectationSequencer
if tc.error != "" {
want = qh.Expect(
"rollback",
).Then(qh.Immediately(
fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error),
))
if vttablet.DefaultVReplicationConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching == 0 {
want = qh.Expect(
"begin",
"delete from dst where id=3",
"rollback",
).Then(qh.Immediately(
fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error),
))
} else {
want = qh.Expect(
"rollback",
).Then(qh.Immediately(
fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error),
))
}
expectDBClientQueries(t, want)
} else {
want = qh.Expect(
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,8 +1031,8 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea
}
if afterOK {
rowChange.After = sqltypes.RowToProto3(afterValues)
if (vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) &&
(partial || row.JSONPartialValues.Count() > 0) {
if ((vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) &&
partial) || (row.JSONPartialValues.Count() > 0) {

rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{
Count: int64(rows.DataColumns.Count()),
Expand Down

0 comments on commit ebc71af

Please sign in to comment.