From 23392f4560e8f8765e8b375d424c572126c623fa Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 15 Aug 2024 14:42:23 +0300 Subject: [PATCH] resolved conflict Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../allow_schemadiff_normalization | 0 .../alter-charset-non-utf8-80-vcopier/alter | 1 + .../create.sql | 19 +++ .../ignore_versions | 1 + .../allow_schemadiff_normalization | 0 .../testdata/non-utf8-charset-pk/create.sql | 30 +++++ .../non-utf8-charset-pk/ignore_versions | 1 + go/vt/sqlparser/parsed_query.go | 16 ++- go/vt/sqlparser/parsed_query_test.go | 2 +- go/vt/sqlparser/tracked_buffer.go | 4 +- go/vt/vttablet/onlineddl/vrepl.go | 15 ++- .../vreplication/replicator_plan.go | 119 +++++++++++++++++- 12 files changed, 187 insertions(+), 21 deletions(-) create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/allow_schemadiff_normalization create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/alter create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/create.sql create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/ignore_versions create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/allow_schemadiff_normalization create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/create.sql create mode 100644 go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/ignore_versions diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/allow_schemadiff_normalization b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/allow_schemadiff_normalization new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/alter b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/alter new file mode 100644 index 00000000000..b5ec82b1a8b --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/alter @@ -0,0 +1 @@ +MODIFY `t1` varchar(128) CHARACTER SET utf8mb4 NOT NULL, MODIFY `t2` varchar(128) CHARACTER SET latin2 NOT NULL, MODIFY `tutf8` varchar(128) CHARACTER SET latin1 NOT NULL diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/create.sql b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/create.sql new file mode 100644 index 00000000000..79e8fda23ee --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/create.sql @@ -0,0 +1,19 @@ +drop table if exists onlineddl_test; +create table onlineddl_test ( + id int auto_increment, + t1 varchar(128) charset latin1 collate latin1_swedish_ci, + t2 varchar(128) charset latin1 collate latin1_swedish_ci, + tutf8 varchar(128) charset utf8, + tutf8mb4 varchar(128) charset utf8mb4, + tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci, + primary key(id) +) auto_increment=1; + +insert into onlineddl_test values (null, md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand())); +insert into onlineddl_test values (null, 'átesting', 'átesting', 'átesting', 'átesting', 'átesting'); +insert into onlineddl_test values (null, 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting'); +insert into onlineddl_test values (null, 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog'); +insert into onlineddl_test values (null, 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog'); +insert into onlineddl_test values (null, 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null); + +drop event if exists onlineddl_test; diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/ignore_versions b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/ignore_versions new file mode 100644 index 00000000000..0790a1e68fd --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/ignore_versions @@ -0,0 +1 @@ +(5.5|5.6|5.7) diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/allow_schemadiff_normalization b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/allow_schemadiff_normalization new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/create.sql b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/create.sql new file mode 100644 index 00000000000..c0313e62c8d --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/create.sql @@ -0,0 +1,30 @@ +drop table if exists onlineddl_test; +create table onlineddl_test ( + id varchar(128) charset latin1 collate latin1_swedish_ci, + t1 varchar(128) charset latin1 collate latin1_swedish_ci, + t2 varchar(128) charset latin1 collate latin1_swedish_ci, + tutf8 varchar(128) charset utf8, + tutf8mb4 varchar(128) charset utf8mb4, + tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci, + primary key(id) +) auto_increment=1; + +insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand())); +insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting', 'átesting', 'átesting', 'átesting', 'átesting'); +insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting'); + +drop event if exists onlineddl_test; +delimiter ;; +create event onlineddl_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand())); + insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog'); + insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog'); + insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null); +end ;; diff --git a/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/ignore_versions b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/ignore_versions new file mode 100644 index 00000000000..0790a1e68fd --- /dev/null +++ b/go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/ignore_versions @@ -0,0 +1 @@ +(5.5|5.6|5.7) diff --git a/go/vt/sqlparser/parsed_query.go b/go/vt/sqlparser/parsed_query.go index b6b03a1901a..697c81acf02 100644 --- a/go/vt/sqlparser/parsed_query.go +++ b/go/vt/sqlparser/parsed_query.go @@ -38,7 +38,7 @@ type ParsedQuery struct { } type bindLocation struct { - offset, length int + Offset, Length int } // NewParsedQuery returns a ParsedQuery of the ast. @@ -67,8 +67,8 @@ func (pq *ParsedQuery) GenerateQuery(bindVariables map[string]*querypb.BindVaria func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*querypb.BindVariable, extras map[string]Encodable) error { current := 0 for _, loc := range pq.bindLocations { - buf.WriteString(pq.Query[current:loc.offset]) - name := pq.Query[loc.offset : loc.offset+loc.length] + buf.WriteString(pq.Query[current:loc.Offset]) + name := pq.Query[loc.Offset : loc.Offset+loc.Length] if encodable, ok := extras[name[1:]]; ok { encodable.EncodeSQL(buf) } else { @@ -78,7 +78,7 @@ func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*qu } EncodeValue(buf, supplied) } - current = loc.offset + loc.length + current = loc.Offset + loc.Length } buf.WriteString(pq.Query[current:]) return nil @@ -122,7 +122,7 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field var offsetQuery int for i, loc := range pq.bindLocations { col := rowInfo[i] - buf.WriteString(pq.Query[offsetQuery:loc.offset]) + buf.WriteString(pq.Query[offsetQuery:loc.Offset]) typ := col.typ switch typ { @@ -148,12 +148,16 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field vv.EncodeSQLBytes2(buf) } } - offsetQuery = loc.offset + loc.length + offsetQuery = loc.Offset + loc.Length } buf.WriteString(pq.Query[offsetQuery:]) return nil } +func (pq *ParsedQuery) BindLocations() []bindLocation { + return pq.bindLocations +} + // MarshalJSON is a custom JSON marshaler for ParsedQuery. // Note that any queries longer that 512 bytes will be truncated. func (pq *ParsedQuery) MarshalJSON() ([]byte, error) { diff --git a/go/vt/sqlparser/parsed_query_test.go b/go/vt/sqlparser/parsed_query_test.go index 8c89a51984d..b459ca49b5b 100644 --- a/go/vt/sqlparser/parsed_query_test.go +++ b/go/vt/sqlparser/parsed_query_test.go @@ -35,7 +35,7 @@ func TestNewParsedQuery(t *testing.T) { pq := NewParsedQuery(stmt) want := &ParsedQuery{ Query: "select * from a where id = :id", - bindLocations: []bindLocation{{offset: 27, length: 3}}, + bindLocations: []bindLocation{{Offset: 27, Length: 3}}, } if !reflect.DeepEqual(pq, want) { t.Errorf("GenerateParsedQuery: %+v, want %+v", pq, want) diff --git a/go/vt/sqlparser/tracked_buffer.go b/go/vt/sqlparser/tracked_buffer.go index 59aa9d8b13f..fc843b2e735 100644 --- a/go/vt/sqlparser/tracked_buffer.go +++ b/go/vt/sqlparser/tracked_buffer.go @@ -285,8 +285,8 @@ func areBothISExpr(op Expr, val Expr) bool { // tracking information for future substitutions. func (buf *TrackedBuffer) WriteArg(prefix, arg string) { buf.bindLocations = append(buf.bindLocations, bindLocation{ - offset: buf.Len(), - length: len(prefix) + len(arg), + Offset: buf.Len(), + Length: len(prefix) + len(arg), }) buf.WriteString(prefix) buf.WriteString(arg) diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index 50f9eb838d0..1c25ced2002 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -511,6 +511,9 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error { sb.WriteString(fmt.Sprintf("CONCAT(%s)", escapeName(name))) case sourceCol.Type == vrepl.JSONColumnType: sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) + case targetCol.Type == vrepl.JSONColumnType: + // Convert any type to JSON: encode the type as utf8mb4 text + sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) case sourceCol.Type == vrepl.StringColumnType: // Check source and target charset/encoding. If needed, create // a binlogdatapb.CharsetConversion entry (later written to vreplication) @@ -523,19 +526,19 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error { if targetCol.Type == vrepl.StringColumnType && toCollation == collations.Unknown { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", targetCol.Charset, targetCol.Name) } - - if trivialCharset(fromCollation) && trivialCharset(toCollation) && targetCol.Type != vrepl.JSONColumnType { + if trivialCharset(fromCollation) && trivialCharset(toCollation) { + sb.WriteString(escapeName(name)) + } else if fromCollation == toCollation { + // No need for charset conversions as both have the same collation. sb.WriteString(escapeName(name)) } else { + // Charset conversion required: v.convertCharset[targetName] = &binlogdatapb.CharsetConversion{ FromCharset: sourceCol.Charset, ToCharset: targetCol.Charset, } - sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) + sb.WriteString(escapeName(name)) } - case targetCol.Type == vrepl.JSONColumnType && sourceCol.Type != vrepl.JSONColumnType: - // Convert any type to JSON: encode the type as utf8mb4 text - sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) default: sb.WriteString(escapeName(name)) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index b5deb309d45..f95b358ccca 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/mysql/collations/charset" "vitess.io/vitess/go/mysql/collations/colldata" vjson "vitess.io/vitess/go/mysql/json" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -252,7 +253,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows []*querypb.R if i > 0 { sqlbuffer.WriteString(", ") } - if err := tp.BulkInsertValues.AppendFromRow(sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil { + if err := tp.appendFromRow(sqlbuffer, row); err != nil { return nil, err } } @@ -307,6 +308,30 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable, return false } +// convertStringCharset does a charset conversion given raw data and an applicable conversion rule. +// In case of a conversion error, it returns an equivalent of MySQL error 1366, which is what you'd +// get in a failed `CONVERT()` function, e.g.: +// +// > create table tascii(v varchar(100) charset ascii); +// > insert into tascii values ('€'); +// ERROR 1366 (HY000): Incorrect string value: '\xE2\x82\xAC' for column 'v' at row 1 +func (tp *TablePlan) convertStringCharset(raw []byte, conversion *binlogdatapb.CharsetConversion, fieldName string) ([]byte, error) { + fromCollation := collations.Local().DefaultCollationForCharset(conversion.FromCharset) + if fromCollation == collations.Unknown { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.FromCharset, fieldName) + } + toCollation := collations.Local().DefaultCollationForCharset(conversion.ToCharset) + if toCollation == collations.Unknown { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.ToCharset, fieldName) + } + + out, err := charset.Convert(nil, colldata.Lookup(toCollation).Charset(), raw, colldata.Lookup(fromCollation).Charset()) + if err != nil { + return nil, sqlerror.NewSQLError(sqlerror.ERTruncatedWrongValueForField, sqlerror.SSUnknownSQLState, "Incorrect string value: %s", err.Error()) + } + return out, nil +} + // bindFieldVal returns a bind variable based on given field and value. // Most values will just bind directly. But some values may need manipulation: // - text values with charset conversion @@ -315,11 +340,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable, func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) { if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() { // Non-null string value, for which we have a charset conversion instruction - fromCollation := collations.Local().DefaultCollationForCharset(conversion.FromCharset) - if fromCollation == collations.Unknown { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name) - } - out, err := charset.Convert(nil, charset.Charset_utf8mb4{}, val.Raw(), colldata.Lookup(fromCollation).Charset()) + out, err := tp.convertStringCharset(val.Raw(), conversion, field.Name) if err != nil { return nil, err } @@ -486,3 +507,89 @@ func valsEqual(v1, v2 sqltypes.Value) bool { // Compare content only if none are null. return v1.ToString() == v2.ToString() } + +// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that +// the fields in the row are in the same order as the placeholders in this query. The fields might include generated +// columns which are dropped, by checking against skipFields, before binding the variables +// note: there can be more fields than bind locations since extra columns might be requested from the source if not all +// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for +// values from the database on the source: sum/count for aggregation queries, for example +func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error { + bindLocations := tp.BulkInsertValues.BindLocations() + if len(tp.Fields) < len(bindLocations) { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ", + len(tp.Fields), len(bindLocations)) + } + + type colInfo struct { + typ querypb.Type + length int64 + offset int64 + field *querypb.Field + } + rowInfo := make([]*colInfo, 0) + + offset := int64(0) + for i, field := range tp.Fields { // collect info required for fields to be bound + length := row.Lengths[i] + if !tp.FieldsToSkip[strings.ToLower(field.Name)] { + rowInfo = append(rowInfo, &colInfo{ + typ: field.Type, + length: length, + offset: offset, + field: field, + }) + } + if length > 0 { + offset += row.Lengths[i] + } + } + + // bind field values to locations + var offsetQuery int + for i, loc := range bindLocations { + col := rowInfo[i] + buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset]) + typ := col.typ + + switch typ { + case querypb.Type_TUPLE: + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i) + case querypb.Type_JSON: + if col.length < 0 { // An SQL NULL and not an actual JSON value + buf.WriteString(sqltypes.NullStr) + } else { // A JSON value (which may be a JSON null literal value) + buf2 := row.Values[col.offset : col.offset+col.length] + vv, err := vjson.MarshalSQLValue(buf2) + if err != nil { + return err + } + buf.WriteString(vv.RawStr()) + } + default: + if col.length < 0 { + // -1 means a null variable; serialize it directly + buf.WriteString(sqltypes.NullStr) + } else { + raw := row.Values[col.offset : col.offset+col.length] + var vv sqltypes.Value + + if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 { + // Non-null string value, for which we have a charset conversion instruction + out, err := tp.convertStringCharset(raw, conversion, col.field.Name) + if err != nil { + return err + } + vv = sqltypes.MakeTrusted(typ, out) + } else { + vv = sqltypes.MakeTrusted(typ, raw) + } + + vv.EncodeSQLBytes2(buf) + } + } + offsetQuery = loc.Offset + loc.Length + } + buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:]) + return nil +}