Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v19 backport: Online DDL: avoid SQL's CONVERT(...), convert programmatically if needed #16603

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MODIFY `t1` varchar(128) CHARACTER SET utf8mb4 NOT NULL, MODIFY `t2` varchar(128) CHARACTER SET latin2 NOT NULL, MODIFY `tutf8` varchar(128) CHARACTER SET latin1 NOT NULL
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id int auto_increment,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (null, md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (null, 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (null, 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');
insert into onlineddl_test values (null, 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (null, 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (null, 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);

drop event if exists onlineddl_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id varchar(128) charset latin1 collate latin1_swedish_ci,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');

drop event if exists onlineddl_test;
delimiter ;;
create event onlineddl_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);
end ;;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
15 changes: 9 additions & 6 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@
sb.WriteString(fmt.Sprintf("CONCAT(%s)", escapeName(name)))
case sourceCol.Type == vrepl.JSONColumnType:
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
case targetCol.Type == vrepl.JSONColumnType:

Check warning on line 574 in go/vt/vttablet/onlineddl/vrepl.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/onlineddl/vrepl.go#L574

Added line #L574 was not covered by tests
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))

Check warning on line 576 in go/vt/vttablet/onlineddl/vrepl.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/onlineddl/vrepl.go#L576

Added line #L576 was not covered by tests
case sourceCol.Type == vrepl.StringColumnType:
// Check source and target charset/encoding. If needed, create
// a binlogdatapb.CharsetConversion entry (later written to vreplication)
Expand All @@ -583,19 +586,19 @@
if targetCol.Type == vrepl.StringColumnType && toCollation == collations.Unknown {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", targetCol.Charset, targetCol.Name)
}

if trivialCharset(fromCollation) && trivialCharset(toCollation) && targetCol.Type != vrepl.JSONColumnType {
if trivialCharset(fromCollation) && trivialCharset(toCollation) {
sb.WriteString(escapeName(name))
} else if fromCollation == toCollation {

Check warning on line 591 in go/vt/vttablet/onlineddl/vrepl.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/onlineddl/vrepl.go#L589-L591

Added lines #L589 - L591 were not covered by tests
// No need for charset conversions as both have the same collation.
sb.WriteString(escapeName(name))
} else {
// Charset conversion required:
v.convertCharset[targetName] = &binlogdatapb.CharsetConversion{
FromCharset: sourceCol.Charset,
ToCharset: targetCol.Charset,
}
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
sb.WriteString(escapeName(name))

Check warning on line 600 in go/vt/vttablet/onlineddl/vrepl.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/onlineddl/vrepl.go#L600

Added line #L600 was not covered by tests
}
case targetCol.Type == vrepl.JSONColumnType && sourceCol.Type != vrepl.JSONColumnType:
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
default:
sb.WriteString(escapeName(name))
}
Expand Down
66 changes: 51 additions & 15 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"vitess.io/vitess/go/mysql/collations/charset"
"vitess.io/vitess/go/mysql/collations/colldata"
vjson "vitess.io/vitess/go/mysql/json"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -258,7 +259,7 @@
if i > 0 {
sqlbuffer.WriteString(", ")
}
if err := appendFromRow(tp.BulkInsertValues, sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil {
if err := tp.appendFromRow(sqlbuffer, row); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -313,6 +314,30 @@
return false
}

// convertStringCharset does a charset conversion given raw data and an applicable conversion rule.
// In case of a conversion error, it returns an equivalent of MySQL error 1366, which is what you'd
// get in a failed `CONVERT()` function, e.g.:
//
// > create table tascii(v varchar(100) charset ascii);
// > insert into tascii values ('€');
// ERROR 1366 (HY000): Incorrect string value: '\xE2\x82\xAC' for column 'v' at row 1
func (tp *TablePlan) convertStringCharset(raw []byte, conversion *binlogdatapb.CharsetConversion, fieldName string) ([]byte, error) {
fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.FromCharset, fieldName)

Check warning on line 327 in go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go#L324-L327

Added lines #L324 - L327 were not covered by tests
}
toCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.ToCharset)
if toCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.ToCharset, fieldName)

Check warning on line 331 in go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go#L329-L331

Added lines #L329 - L331 were not covered by tests
}

out, err := charset.Convert(nil, colldata.Lookup(toCollation).Charset(), raw, colldata.Lookup(fromCollation).Charset())
if err != nil {
return nil, sqlerror.NewSQLError(sqlerror.ERTruncatedWrongValueForField, sqlerror.SSUnknownSQLState, "Incorrect string value: %s", err.Error())

Check warning on line 336 in go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go#L334-L336

Added lines #L334 - L336 were not covered by tests
}
return out, nil

Check warning on line 338 in go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go#L338

Added line #L338 was not covered by tests
}

// bindFieldVal returns a bind variable based on given field and value.
// Most values will just bind directly. But some values may need manipulation:
// - text values with charset conversion
Expand All @@ -321,11 +346,7 @@
func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() {
// Non-null string value, for which we have a charset conversion instruction
fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name)
}
out, err := charset.Convert(nil, charset.Charset_utf8mb4{}, val.Raw(), colldata.Lookup(fromCollation).Charset())
out, err := tp.convertStringCharset(val.Raw(), conversion, field.Name)

Check warning on line 349 in go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go#L349

Added line #L349 was not covered by tests
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -619,28 +640,30 @@
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
// values from the database on the source: sum/count for aggregation queries, for example
func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row, skipFields map[string]bool) error {
bindLocations := pq.BindLocations()
if len(fields) < len(bindLocations) {
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(fields), len(bindLocations))
len(tp.Fields), len(bindLocations))

Check warning on line 647 in go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go#L647

Added line #L647 was not covered by tests
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range fields { // collect info required for fields to be bound
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !skipFields[strings.ToLower(field.Name)] {
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
Expand All @@ -652,7 +675,7 @@
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
buf.WriteString(pq.Query[offsetQuery:loc.Offset])
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ

switch typ {
Expand All @@ -674,12 +697,25 @@
// -1 means a null variable; serialize it directly
buf.WriteString(sqltypes.NullStr)
} else {
vv := sqltypes.MakeTrusted(typ, row.Values[col.offset:col.offset+col.length])
raw := row.Values[col.offset : col.offset+col.length]
var vv sqltypes.Value

if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
// Non-null string value, for which we have a charset conversion instruction
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
if err != nil {
return err

Check warning on line 707 in go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go#L705-L707

Added lines #L705 - L707 were not covered by tests
}
vv = sqltypes.MakeTrusted(typ, out)

Check warning on line 709 in go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go#L709

Added line #L709 was not covered by tests
} else {
vv = sqltypes.MakeTrusted(typ, raw)
}

vv.EncodeSQLBytes2(buf)
}
}
offsetQuery = loc.Offset + loc.Length
}
buf.WriteString(pq.Query[offsetQuery:])
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
return nil
}
Loading