Skip to content

Commit 6d76be2

Browse files
vitess-bot[bot]dbussink
authored andcommitted
Cherry-pick 10d36cb with conflicts
Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
1 parent 28406ba commit 6d76be2

File tree

15 files changed

+46
-57
lines changed

15 files changed

+46
-57
lines changed

go/test/endtoend/vreplication/vdiff_helper_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,7 @@ func getVDiffInfo(json string) *vdiffInfo {
336336
}
337337

338338
func encodeString(in string) string {
339-
var buf strings.Builder
340-
sqltypes.NewVarChar(in).EncodeSQL(&buf)
341-
return buf.String()
339+
return sqltypes.EncodeStringSQL(in)
342340
}
343341

344342
// generateMoreCustomers creates additional test data for better tests

go/vt/binlog/binlogplayer/binlog_player.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ func (blp *BinlogPlayer) setVReplicationState(state binlogdatapb.VReplicationWor
549549
})
550550
}
551551
blp.blplStats.State.Store(state.String())
552-
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state.String(), encodeString(MessageTruncate(message)), blp.uid)
552+
query := fmt.Sprintf("update _vt.vreplication set state=%v, message=%v where id=%v", encodeString(state.String()), encodeString(MessageTruncate(message)), blp.uid)
553553
if _, err := blp.dbClient.ExecuteFetch(query, 1); err != nil {
554554
return fmt.Errorf("could not set state: %v: %v", query, err)
555555
}
@@ -637,9 +637,9 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi
637637
protoutil.SortBinlogSourceTables(source)
638638
return fmt.Sprintf("insert into _vt.vreplication "+
639639
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys, options) "+
640-
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v, %s)",
640+
"values (%v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %v, %s)",
641641
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag,
642-
timeUpdated, binlogdatapb.VReplicationWorkflowState_Running.String(), encodeString(dbName), workflowType,
642+
timeUpdated, encodeString(binlogdatapb.VReplicationWorkflowState_Running.String()), encodeString(dbName), workflowType,
643643
workflowSubType, deferSecondaryKeys, encodeString("{}"))
644644
}
645645

@@ -649,9 +649,9 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource,
649649
protoutil.SortBinlogSourceTables(source)
650650
return fmt.Sprintf("insert into _vt.vreplication "+
651651
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, options) "+
652-
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %s)",
652+
"values (%v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %s)",
653653
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled,
654-
throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state.String(), encodeString(dbName),
654+
throttler.ReplicationLagModuleDisabled, time.Now().Unix(), encodeString(state.String()), encodeString(dbName),
655655
workflowType, workflowSubType, encodeString("{}"))
656656
}
657657

@@ -694,15 +694,15 @@ func GenerateUpdateTimeThrottled(uid int32, timeThrottledUnix int64, componentTh
694694
// StartVReplicationUntil returns a statement to start the replication with a stop position.
695695
func StartVReplicationUntil(uid int32, pos string) string {
696696
return fmt.Sprintf(
697-
"update _vt.vreplication set state='%v', stop_pos=%v where id=%v",
698-
binlogdatapb.VReplicationWorkflowState_Running.String(), encodeString(pos), uid)
697+
"update _vt.vreplication set state=%v, stop_pos=%v where id=%v",
698+
encodeString(binlogdatapb.VReplicationWorkflowState_Running.String()), encodeString(pos), uid)
699699
}
700700

701701
// StopVReplication returns a statement to stop the replication.
702702
func StopVReplication(uid int32, message string) string {
703703
return fmt.Sprintf(
704-
"update _vt.vreplication set state='%v', message=%v where id=%v",
705-
binlogdatapb.VReplicationWorkflowState_Stopped.String(), encodeString(MessageTruncate(message)), uid)
704+
"update _vt.vreplication set state=%v, message=%v where id=%v",
705+
encodeString(binlogdatapb.VReplicationWorkflowState_Stopped.String()), encodeString(MessageTruncate(message)), uid)
706706
}
707707

708708
// DeleteVReplication returns a statement to delete the replication.
@@ -717,9 +717,7 @@ func MessageTruncate(msg string) string {
717717
}
718718

719719
func encodeString(in string) string {
720-
buf := bytes.NewBuffer(nil)
721-
sqltypes.NewVarChar(in).EncodeSQL(buf)
722-
return buf.String()
720+
return sqltypes.EncodeStringSQL(in)
723721
}
724722

725723
// ReadVReplicationPos returns a statement to query the gtid for a

go/vt/vtctl/vdiff_env_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func newTestVDiffEnv(t testing.TB, ctx context.Context, sourceShards, targetShar
128128
// But this is one statement per stream.
129129
env.tmc.setVRResults(
130130
primary.tablet,
131-
fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", vdiffSourceGtid, j+1),
131+
fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=%s, message='synchronizing for vdiff' where id=%d", sqltypes.EncodeStringSQL(vdiffSourceGtid), j+1),
132132
&sqltypes.Result{},
133133
)
134134
}

go/vt/vtctl/workflow/resharder.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,6 @@ func (rs *resharder) createStreams(ctx context.Context) error {
298298
if err != nil {
299299
return err
300300
}
301-
optionsJSON = fmt.Sprintf("'%s'", optionsJSON)
302301
for _, source := range rs.sourceShards {
303302
if !key.KeyRangeIntersect(target.KeyRange, source.KeyRange) {
304303
continue

go/vt/vtctl/workflow/traffic_switcher.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -858,8 +858,8 @@ func (ts *trafficSwitcher) getReverseVReplicationUpdateQuery(targetCell string,
858858
}
859859

860860
if ts.optCells != "" || ts.optTabletTypes != "" {
861-
query := fmt.Sprintf("update _vt.vreplication set cell = '%s', tablet_types = '%s', options = '%s' where workflow = '%s' and db_name = '%s'",
862-
ts.optCells, ts.optTabletTypes, options, ts.ReverseWorkflowName(), dbname)
861+
query := fmt.Sprintf("update _vt.vreplication set cell = %s, tablet_types = %s, options = %s where workflow = %s and db_name = %s",
862+
sqltypes.EncodeStringSQL(ts.optCells), sqltypes.EncodeStringSQL(ts.optTabletTypes), sqltypes.EncodeStringSQL(options), sqltypes.EncodeStringSQL(ts.ReverseWorkflowName()), sqltypes.EncodeStringSQL(dbname))
863863
return query
864864
}
865865
return ""
@@ -941,8 +941,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error
941941
// For non-reference tables we return an error if there's no primary
942942
// vindex as it's not clear what to do.
943943
if len(vtable.ColumnVindexes) > 0 && len(vtable.ColumnVindexes[0].Columns) > 0 {
944-
inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]),
945-
ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, key.KeyRangeString(source.GetShard().KeyRange))
944+
inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', %s)", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]),
945+
ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, encodeString(key.KeyRangeString(source.GetShard().KeyRange)))
946946
} else {
947947
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary vindex found for the %s table in the %s keyspace",
948948
vtable.Name.String(), ts.SourceKeyspaceName())
@@ -1200,7 +1200,7 @@ func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error {
12001200
// re-invoked after a freeze, it will skip all the previous steps
12011201
err := ts.ForAllTargets(func(target *MigrationTarget) error {
12021202
ts.Logger().Infof("Marking target streams frozen for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName())
1203-
query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", Frozen,
1203+
query := fmt.Sprintf("update _vt.vreplication set message = %s where db_name=%s and workflow=%s", encodeString(Frozen),
12041204
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
12051205
_, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query)
12061206
return err

go/vt/vtctl/workflow/utils.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package workflow
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"encoding/json"
2322
"fmt"
@@ -626,9 +625,7 @@ func ReverseWorkflowName(workflow string) string {
626625
// this public, but it doesn't belong in package workflow. Maybe package sqltypes,
627626
// or maybe package sqlescape?
628627
func encodeString(in string) string {
629-
buf := bytes.NewBuffer(nil)
630-
sqltypes.NewVarChar(in).EncodeSQL(buf)
631-
return buf.String()
628+
return sqltypes.EncodeStringSQL(in)
632629
}
633630

634631
func getRenameFileName(tableName string) string {

go/vt/vttablet/endtoend/vstreamer_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package endtoend
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"errors"
2322
"fmt"
@@ -472,9 +471,7 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan []
472471
}
473472

474473
func encodeString(in string) string {
475-
buf := bytes.NewBuffer(nil)
476-
sqltypes.NewVarChar(in).EncodeSQL(buf)
477-
return buf.String()
474+
return sqltypes.EncodeStringSQL(in)
478475
}
479476

480477
func validateSchemaInserted(client *framework.QueryClient, ddl string) bool {

go/vt/vttablet/onlineddl/executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,8 +1517,8 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem
15171517

15181518
{
15191519
// temporary hack. todo: this should be done when inserting any _vt.vreplication record across all workflow types
1520-
query := fmt.Sprintf("update _vt.vreplication set workflow_type = %d where workflow = '%s'",
1521-
binlogdatapb.VReplicationWorkflowType_OnlineDDL, v.workflow)
1520+
query := fmt.Sprintf("update _vt.vreplication set workflow_type = %d where workflow = %s",
1521+
binlogdatapb.VReplicationWorkflowType_OnlineDDL, sqltypes.EncodeStringSQL(v.workflow))
15221522
if _, err := e.vreplicationExec(ctx, tablet.Tablet, query); err != nil {
15231523
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", tablet.Tablet, query)
15241524
}

go/vt/vttablet/tabletmanager/vdiff/utils.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package vdiff
1919
import (
2020
"context"
2121
"fmt"
22-
"strings"
2322

2423
"vitess.io/vitess/go/vt/sqlparser"
2524
"vitess.io/vitess/go/vt/vtgate/evalengine"
@@ -59,9 +58,7 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare
5958
// Utility functions
6059

6160
func encodeString(in string) string {
62-
var buf strings.Builder
63-
sqltypes.NewVarChar(in).EncodeSQL(&buf)
64-
return buf.String()
61+
return sqltypes.EncodeStringSQL(in)
6562
}
6663

6764
func pkColsToGroupByParams(pkCols []int, collationEnv *collations.Environment) []*engine.GroupByParams {

go/vt/vttablet/tabletmanager/vreplication/insert_generator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ func NewInsertGenerator(state binlogdatapb.VReplicationWorkflowState, dbname str
5353
func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string,
5454
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool, options string) {
5555
if options == "" {
56-
options = "'{}'"
56+
options = "{}"
5757
}
5858
protoutil.SortBinlogSourceTables(bls)
59-
fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v, %v)",
59+
fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %v, %v)",
6060
ig.prefix,
6161
encodeString(workflow),
6262
encodeString(bls.String()),
@@ -66,12 +66,12 @@ func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSourc
6666
encodeString(cell),
6767
encodeString(tabletTypes),
6868
ig.now,
69-
ig.state,
69+
encodeString(ig.state),
7070
encodeString(ig.dbname),
7171
workflowType,
7272
workflowSubType,
7373
deferSecondaryKeys,
74-
options,
74+
encodeString(options),
7575
)
7676
ig.prefix = ", "
7777
}

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
501501
})
502502
}
503503
vr.stats.State.Store(state.String())
504-
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id)
504+
query := fmt.Sprintf("update _vt.vreplication set state=%v, message=%v where id=%v", encodeString(state.String()), encodeString(binlogplayer.MessageTruncate(message)), vr.id)
505505
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
506506
return fmt.Errorf("could not set state: %v: %v", query, err)
507507
}
@@ -515,9 +515,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
515515
}
516516

517517
func encodeString(in string) string {
518-
var buf strings.Builder
519-
sqltypes.NewVarChar(in).EncodeSQL(&buf)
520-
return buf.String()
518+
return sqltypes.EncodeStringSQL(in)
521519
}
522520

523521
func (vr *vreplicator) getSettingFKCheck() error {

go/vt/vttablet/tabletserver/schema/tracker.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ limitations under the License.
1717
package schema
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"fmt"
2322
"sync"
2423
"time"
2524

25+
"vitess.io/vitess/go/bytes2"
2626
"vitess.io/vitess/go/constants/sidecar"
2727
"vitess.io/vitess/go/mysql/replication"
2828
"vitess.io/vitess/go/sqltypes"
@@ -231,10 +231,15 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
231231
}
232232
defer conn.Recycle()
233233

234+
// We serialize a blob here, encodeString is for strings only
235+
// and should not be used for binary data.
236+
blobVal := sqltypes.MakeTrusted(sqltypes.VarBinary, blob)
237+
buf := bytes2.Buffer{}
238+
blobVal.EncodeSQLBytes2(&buf)
234239
query := sqlparser.BuildParsedQuery("insert into %s.schema_version "+
235240
"(pos, ddl, schemax, time_updated) "+
236241
"values (%s, %s, %s, %d)", sidecar.GetIdentifier(), encodeString(gtid),
237-
encodeString(ddl), encodeString(string(blob)), timestamp).Query
242+
encodeString(ddl), buf.String(), timestamp).Query
238243
_, err = conn.Conn.Exec(ctx, query, 1, false)
239244
if err != nil {
240245
return err
@@ -243,9 +248,7 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
243248
}
244249

245250
func encodeString(in string) string {
246-
buf := bytes.NewBuffer(nil)
247-
sqltypes.NewVarChar(in).EncodeSQL(buf)
248-
return buf.String()
251+
return sqltypes.EncodeStringSQL(in)
249252
}
250253

251254
// MustReloadSchemaOnDDL returns true if the ddl is for the db which is part of the workflow and is not an online ddl artifact

go/vt/vttablet/tabletserver/vstreamer/vstreamer.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -935,9 +935,7 @@ type extColInfo struct {
935935
}
936936

937937
func encodeString(in string) string {
938-
buf := bytes.NewBuffer(nil)
939-
sqltypes.NewVarChar(in).EncodeSQL(buf)
940-
return buf.String()
938+
return sqltypes.EncodeStringSQL(in)
941939
}
942940

943941
func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) {

go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/stretchr/testify/require"
3333
"google.golang.org/protobuf/proto"
3434

35+
"vitess.io/vitess/go/bytes2"
3536
"vitess.io/vitess/go/mysql"
3637
"vitess.io/vitess/go/mysql/collations"
3738
"vitess.io/vitess/go/sqltypes"
@@ -332,9 +333,15 @@ func TestVersion(t *testing.T) {
332333
}
333334
blob, _ := dbSchema.MarshalVT()
334335
gtid := "MariaDB/0-41983-20"
336+
// We serialize a blob here, encodeString is for strings only
337+
// and should not be used for binary data.
338+
blobVal := sqltypes.MakeTrusted(sqltypes.VarBinary, blob)
339+
buf := bytes2.Buffer{}
340+
blobVal.EncodeSQLBytes2(&buf)
341+
335342
testcases := []testcase{{
336343
input: []string{
337-
fmt.Sprintf("insert into _vt.schema_version values(1, '%s', 123, 'create table t1', %v)", gtid, encodeString(string(blob))),
344+
fmt.Sprintf("insert into _vt.schema_version values(1, '%s', 123, 'create table t1', %v)", gtid, buf.String()),
338345
},
339346
// External table events don't get sent.
340347
output: [][]string{{

go/vt/wrangler/keyspace.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package wrangler
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"errors"
2322
"fmt"
@@ -125,7 +124,5 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, sha
125124
}
126125

127126
func encodeString(in string) string {
128-
buf := bytes.NewBuffer(nil)
129-
sqltypes.NewVarChar(in).EncodeSQL(buf)
130-
return buf.String()
127+
return sqltypes.EncodeStringSQL(in)
131128
}

0 commit comments

Comments
 (0)