Skip to content

Commit 1b7fb6f

Browse files
authored
Online DDL: avoid SQL's CONVERT(...), convert programmatically if needed (#16597)
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
1 parent be95882 commit 1b7fb6f

File tree

9 files changed

+112
-21
lines changed

9 files changed

+112
-21
lines changed

go/test/endtoend/onlineddl/vrepl_suite/testdata/alter-charset-non-utf8-80-vcopier/allow_schemadiff_normalization

Whitespace-only changes.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
MODIFY `t1` varchar(128) CHARACTER SET utf8mb4 NOT NULL, MODIFY `t2` varchar(128) CHARACTER SET latin2 NOT NULL, MODIFY `tutf8` varchar(128) CHARACTER SET latin1 NOT NULL
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
drop table if exists onlineddl_test;
2+
create table onlineddl_test (
3+
id int auto_increment,
4+
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
5+
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
6+
tutf8 varchar(128) charset utf8,
7+
tutf8mb4 varchar(128) charset utf8mb4,
8+
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
9+
primary key(id)
10+
) auto_increment=1;
11+
12+
insert into onlineddl_test values (null, md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
13+
insert into onlineddl_test values (null, 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
14+
insert into onlineddl_test values (null, 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');
15+
insert into onlineddl_test values (null, 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
16+
insert into onlineddl_test values (null, 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
17+
insert into onlineddl_test values (null, 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);
18+
19+
drop event if exists onlineddl_test;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
(5.5|5.6|5.7)

go/test/endtoend/onlineddl/vrepl_suite/testdata/non-utf8-charset-pk/allow_schemadiff_normalization

Whitespace-only changes.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
drop table if exists onlineddl_test;
2+
create table onlineddl_test (
3+
id varchar(128) charset latin1 collate latin1_swedish_ci,
4+
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
5+
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
6+
tutf8 varchar(128) charset utf8,
7+
tutf8mb4 varchar(128) charset utf8mb4,
8+
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
9+
primary key(id)
10+
) auto_increment=1;
11+
12+
insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
13+
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
14+
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');
15+
16+
drop event if exists onlineddl_test;
17+
delimiter ;;
18+
create event onlineddl_test
19+
on schedule every 1 second
20+
starts current_timestamp
21+
ends current_timestamp + interval 60 second
22+
on completion not preserve
23+
enable
24+
do
25+
begin
26+
insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
27+
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
28+
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
29+
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);
30+
end ;;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
(5.5|5.6|5.7)

go/vt/vttablet/onlineddl/vrepl.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,9 @@ func (v *VRepl) generateFilterQuery() error {
289289
sb.WriteString(fmt.Sprintf("CONCAT(%s)", escapeName(name)))
290290
case sourceCol.Type() == "json":
291291
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
292+
case targetCol.Type() == "json": // we already know the source col is not JSON, per the above `case` condition
293+
// Convert any type to JSON: encode the type as utf8mb4 text
294+
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
292295
case sourceCol.IsTextual():
293296
// Check source and target charset/encoding. If needed, create
294297
// a binlogdatapb.CharsetConversion entry (later written to vreplication)
@@ -301,19 +304,19 @@ func (v *VRepl) generateFilterQuery() error {
301304
if targetCol.IsTextual() && toCollation == collations.Unknown {
302305
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", targetCol.Charset(), targetCol.Name())
303306
}
304-
305-
if trivialCharset(fromCollation) && trivialCharset(toCollation) && targetCol.Type() != "json" {
307+
if trivialCharset(fromCollation) && trivialCharset(toCollation) {
308+
sb.WriteString(escapeName(name))
309+
} else if fromCollation == toCollation {
310+
// No need for charset conversions as both have the same collation.
306311
sb.WriteString(escapeName(name))
307312
} else {
313+
// Charset conversion required:
308314
v.convertCharset[targetName] = &binlogdatapb.CharsetConversion{
309315
FromCharset: sourceCol.Charset(),
310316
ToCharset: targetCol.Charset(),
311317
}
312-
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
318+
sb.WriteString(escapeName(name))
313319
}
314-
case targetCol.Type() == "json" && sourceCol.Type() != "json":
315-
// Convert any type to JSON: encode the type as utf8mb4 text
316-
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
317320
default:
318321
sb.WriteString(escapeName(name))
319322
}

go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"vitess.io/vitess/go/mysql/collations/charset"
2828
"vitess.io/vitess/go/mysql/collations/colldata"
2929
vjson "vitess.io/vitess/go/mysql/json"
30+
"vitess.io/vitess/go/mysql/sqlerror"
3031
"vitess.io/vitess/go/sqltypes"
3132
"vitess.io/vitess/go/vt/binlog/binlogplayer"
3233
"vitess.io/vitess/go/vt/sqlparser"
@@ -257,7 +258,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows []*querypb.R
257258
if i > 0 {
258259
sqlbuffer.WriteString(", ")
259260
}
260-
if err := appendFromRow(tp.BulkInsertValues, sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil {
261+
if err := tp.appendFromRow(sqlbuffer, row); err != nil {
261262
return nil, err
262263
}
263264
}
@@ -312,6 +313,30 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
312313
return false
313314
}
314315

316+
// convertStringCharset does a charset conversion given raw data and an applicable conversion rule.
317+
// In case of a conversion error, it returns an equivalent of MySQL error 1366, which is what you'd
318+
// get in a failed `CONVERT()` function, e.g.:
319+
//
320+
// > create table tascii(v varchar(100) charset ascii);
321+
// > insert into tascii values ('€');
322+
// ERROR 1366 (HY000): Incorrect string value: '\xE2\x82\xAC' for column 'v' at row 1
323+
func (tp *TablePlan) convertStringCharset(raw []byte, conversion *binlogdatapb.CharsetConversion, fieldName string) ([]byte, error) {
324+
fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset)
325+
if fromCollation == collations.Unknown {
326+
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.FromCharset, fieldName)
327+
}
328+
toCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.ToCharset)
329+
if toCollation == collations.Unknown {
330+
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.ToCharset, fieldName)
331+
}
332+
333+
out, err := charset.Convert(nil, colldata.Lookup(toCollation).Charset(), raw, colldata.Lookup(fromCollation).Charset())
334+
if err != nil {
335+
return nil, sqlerror.NewSQLError(sqlerror.ERTruncatedWrongValueForField, sqlerror.SSUnknownSQLState, "Incorrect string value: %s", err.Error())
336+
}
337+
return out, nil
338+
}
339+
315340
// bindFieldVal returns a bind variable based on given field and value.
316341
// Most values will just bind directly. But some values may need manipulation:
317342
// - text values with charset conversion
@@ -320,11 +345,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
320345
func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) {
321346
if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() {
322347
// Non-null string value, for which we have a charset conversion instruction
323-
fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset)
324-
if fromCollation == collations.Unknown {
325-
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name)
326-
}
327-
out, err := charset.Convert(nil, charset.Charset_utf8mb4{}, val.Raw(), colldata.Lookup(fromCollation).Charset())
348+
out, err := tp.convertStringCharset(val.Raw(), conversion, field.Name)
328349
if err != nil {
329350
return nil, err
330351
}
@@ -590,28 +611,30 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
590611
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
591612
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
592613
// values from the database on the source: sum/count for aggregation queries, for example
593-
func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row, skipFields map[string]bool) error {
594-
bindLocations := pq.BindLocations()
595-
if len(fields) < len(bindLocations) {
614+
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
615+
bindLocations := tp.BulkInsertValues.BindLocations()
616+
if len(tp.Fields) < len(bindLocations) {
596617
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
597-
len(fields), len(bindLocations))
618+
len(tp.Fields), len(bindLocations))
598619
}
599620

600621
type colInfo struct {
601622
typ querypb.Type
602623
length int64
603624
offset int64
625+
field *querypb.Field
604626
}
605627
rowInfo := make([]*colInfo, 0)
606628

607629
offset := int64(0)
608-
for i, field := range fields { // collect info required for fields to be bound
630+
for i, field := range tp.Fields { // collect info required for fields to be bound
609631
length := row.Lengths[i]
610-
if !skipFields[strings.ToLower(field.Name)] {
632+
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
611633
rowInfo = append(rowInfo, &colInfo{
612634
typ: field.Type,
613635
length: length,
614636
offset: offset,
637+
field: field,
615638
})
616639
}
617640
if length > 0 {
@@ -623,7 +646,7 @@ func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*quer
623646
var offsetQuery int
624647
for i, loc := range bindLocations {
625648
col := rowInfo[i]
626-
buf.WriteString(pq.Query[offsetQuery:loc.Offset])
649+
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
627650
typ := col.typ
628651

629652
switch typ {
@@ -645,12 +668,25 @@ func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*quer
645668
// -1 means a null variable; serialize it directly
646669
buf.WriteString(sqltypes.NullStr)
647670
} else {
648-
vv := sqltypes.MakeTrusted(typ, row.Values[col.offset:col.offset+col.length])
671+
raw := row.Values[col.offset : col.offset+col.length]
672+
var vv sqltypes.Value
673+
674+
if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
675+
// Non-null string value, for which we have a charset conversion instruction
676+
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
677+
if err != nil {
678+
return err
679+
}
680+
vv = sqltypes.MakeTrusted(typ, out)
681+
} else {
682+
vv = sqltypes.MakeTrusted(typ, raw)
683+
}
684+
649685
vv.EncodeSQLBytes2(buf)
650686
}
651687
}
652688
offsetQuery = loc.Offset + loc.Length
653689
}
654-
buf.WriteString(pq.Query[offsetQuery:])
690+
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
655691
return nil
656692
}

0 commit comments

Comments
 (0)