From 46ebaddacda161ee4eb50871ed9c47663bbaf3dd Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 10 Dec 2024 09:01:28 -0500 Subject: [PATCH] Get binlog_row_image = noblob and binlog-row-value-options = PARTIAL_JSON working together Signed-off-by: Matt Lord --- .../tabletmanager/vreplication/replicator_plan.go | 15 +++++++++++++++ .../vreplication/table_plan_partial.go | 10 ++++++++++ .../vttablet/tabletserver/vstreamer/vstreamer.go | 6 +++++- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index bf282e663c2..17cbe3d6d99 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -386,6 +386,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun var newVal *sqltypes.Value var err error if field.Type == querypb.Type_JSON { + //log.Errorf("DEBUG: JSON field %v, value: %v", field.Name, vals[i].RawStr()) switch { case vals[i].IsNull(): // An SQL NULL and not an actual JSON value newVal = &sqltypes.NULL @@ -394,6 +395,12 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun // as JSON_INSERT(). // This occurs e.g. when using partial JSON values as a result of // mysqld using binlog-row-value-options=PARTIAL_JSON. + if len(vals[i].Raw()) == 0 { + // When using BOTH binlog_row_image=NOBLOB AND + // binlog_row_value_options=PARTIAL_JSON then the JSON column + // has the data bit set and the diff is empty. + setBit(rowChange.DataColumns.Cols, i, false) + } s := fmt.Sprintf(vals[i].RawStr(), field.Name) newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte(s))) default: // A JSON value (which may be a JSON null literal value) @@ -437,6 +444,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun case before && after: if !tp.pkChanged(bindvars) && !tp.HasExtraSourcePkColumns { if tp.isPartial(rowChange) { + //log.Errorf("DEBUG: building partial update query using DataColumns: %08b", rowChange.DataColumns.Cols) upd, err := tp.getPartialUpdateQuery(rowChange.DataColumns) if err != nil { return nil, err @@ -452,6 +460,12 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun return nil, err } } + // TODO: the INSERTs done here after deleting the row with the original PK + // need to use the values from the BEFORE image for the columns NOT present. + // in the AFTER image due to being a partial image due to the source's usage + // of binlog-row-image=NOBLOB. + // For JSON columns when binlog-row-value-options=PARTIAL_JSON is used, we + // need to wrap the JSON diff function(s) around the BEFORE value. if tp.isOutsidePKRange(bindvars, before, after, "insert") { return nil, nil } @@ -593,6 +607,7 @@ func execParsedQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.Bin if err != nil { return nil, err } + //log.Errorf("DEBUG: execParsedQuery: %s", query) return executor(query) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go index 85e0fd8e50f..a4e177a9f14 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go @@ -36,6 +36,16 @@ func isBitSet(data []byte, index int) bool { return data[byteIndex]&bitMask > 0 } +func setBit(data []byte, index int, value bool) { + byteIndex := index / 8 + bitMask := byte(1 << (uint(index) & 0x7)) + if value { + data[byteIndex] |= bitMask + } else { + data[byteIndex] &= 0xff - bitMask + } +} + func (tp *TablePlan) isPartial(rowChange *binlogdatapb.RowChange) bool { if (tp.WorkflowConfig.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage) == 0 || rowChange.DataColumns == nil || diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 568461aed3d..ac648077c19 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -1031,19 +1031,23 @@ func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *strea } if afterOK { rowChange.After = sqltypes.RowToProto3(afterValues) + //log.Errorf("DEBUG: partial = %v", partial) + //log.Errorf("DEBUG: rowChange: After: %+v", rowChange.After) if (vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) && - partial { + (partial || row.JSONPartialValues.Count() > 0) { rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{ Count: int64(rows.DataColumns.Count()), Cols: rows.DataColumns.Bits(), } + //log.Errorf("DEBUG: rowChange: DataColumns: %08b", rowChange.DataColumns.Cols) } if row.JSONPartialValues.Count() > 0 { rowChange.JsonPartialValues = &binlogdatapb.RowChange_Bitmap{ Count: int64(row.JSONPartialValues.Count()), Cols: row.JSONPartialValues.Bits(), } + //log.Errorf("DEBUG: rowChange: JSONPartialColumns: %08b", rowChange.JsonPartialValues.Cols) } } rowChanges = append(rowChanges, rowChange)