Skip to content

Commit

Permalink
VReplication: Support passing VStream filters down to MySQL (vitessio…
Browse files Browse the repository at this point in the history
…#17677)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Feb 10, 2025
1 parent 8fc9801 commit 4c27ea8
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 73 deletions.
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ import (
// default collation as it has to work across versions and the 8.0 default does not exist in 5.7.
var (
// All standard user tables should have a primary key and at least one secondary key.
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null,
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
// We use utf8mb4_general_ci so that we can test with 5.7 and 8.0+.
customerTableTemplate = `create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_general_ci, meta json default null,
industryCategory varchar(100) generated always as (json_extract(meta, _utf8mb4'$.industry')) virtual, typ enum(%s),
sport set('football','cricket','baseball'), ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(%s), key(name)) CHARSET=utf8mb4`
Expand Down
134 changes: 134 additions & 0 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,3 +1090,137 @@ func TestVStreamHeartbeats(t *testing.T) {
})
}
}

// TestVStreamPushdownFilters confirms that pushdown filters are applied correctly
// when they are specified in the VStream API via the rule.Filter.
// It also confirms that we use the proper collation for the VStream filter when
// using VARCHAR fields.
func TestVStreamPushdownFilters(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
setSidecarDBName("_vt")
config := *mainClusterConfig
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: &config,
})
defer vc.TearDown()
require.NotNil(t, vc)
ks := "product"
shard := "0"
defaultCell := vc.Cells[vc.CellNames[0]]

_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)
insertInitialData(t)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()

// Make sure that we get at least one paul row event in the copy phase.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false)
require.NoError(t, err)
res, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select count(*) from %s.customer where name = 'pauĺ'", ks), 1, false)
require.NoError(t, err)
require.Len(t, res.Rows, 1)
startingPauls, err := res.Rows[0][0].ToInt()
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
createdPauls := startingPauls
createdNonPauls := 0
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
if id%10 == 0 {
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('paÜl')", ks), 1, false)
require.NoError(t, err)
createdPauls++
} else {
insertRow(ks, "customer", id)
createdNonPauls++
}
time.Sleep(10 * time.Millisecond)
id++
}
}
}()

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer where name = 'påul'",
}},
}
flags := &vtgatepb.VStreamFlags{}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// So we should have at least one paul row event in the copy phase.
copyPhaseRowEvents := 0
// And we should have many paul row events in the running phase.
runningPhaseRowEvents := 0
copyPhase := true

func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_COPY_COMPLETED:
copyPhase = false
case binlogdatapb.VEventType_ROW:
if copyPhase {
copyPhaseRowEvents++
} else {
runningPhaseRowEvents++
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-done:
return
default:
}
}
}()

require.NotZero(t, createdPauls)
require.NotZero(t, createdNonPauls)
require.Greater(t, createdNonPauls, createdPauls)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
}
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb3", // Use utf8mb3 to get a consistent default collation across MySQL versions
"create table src(idc varchar(20), val int, primary key(idc)) character set utf8mb4 collate utf8mb4_general_ci", // Use general_ci so that we have the same behavior across 5.7 and 8.0
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc varchar(20), val int, primary key(idc))", vrepldb),
})
Expand Down Expand Up @@ -282,7 +282,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
"/update _vt.vreplication set state='Copying'",
// Copy mode.
"insert into dst(idc,val) values ('a',1)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"a"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"a"}'.*`,
// Copy-catchup mode.
`/insert into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`,
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
Expand All @@ -292,11 +292,11 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
// upd1 := expect.
upd1 := expect.Then(qh.Eventually(
"insert into dst(idc,val) values ('B',3)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"B"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"B"}'.*`,
))
upd2 := expect.Then(qh.Eventually(
"insert into dst(idc,val) values ('c',2)",
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:33 flags:20483} rows:{lengths:1 values:"c"}'.*`,
`/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"idc" type:VARCHAR charset:45 flags:20483} rows:{lengths:1 values:"c"}'.*`,
))
upd1.Then(upd2.Eventually())
return upd2
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
lastPK := getLastPKFromQR(uvs.plans[tableName].tablePK.Lastpk)
filter := uvs.plans[tableName].rule.Filter

log.Infof("Starting copyTable for %s, PK %v", tableName, lastPK)
log.Infof("Starting copyTable for %s, Filter: %s, LastPK: %v", tableName, filter, lastPK)
uvs.sendTestEvent(fmt.Sprintf("Copy Start %s", tableName))

err := uvs.vse.StreamRows(ctx, filter, lastPK, func(rows *binlogdatapb.VStreamRowsResponse) error {
Expand Down
21 changes: 17 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ type Plan struct {
// of the table.
Filters []Filter

// Predicates in the Filter query that we can push down to MySQL
// to reduce the returned rows we need to filter in the VStreamer
// during the copy phase. This will contain any valid expressions
// in the Filter's WHERE clause with the exception of the
// in_keyrange() function which is a filter that must be applied
// by the VStreamer (it's not a valid MySQL function). Note that
// the Filter cannot contain any MySQL functions because the
// VStreamer cannot filter binlog events using them.
whereExprsToPushDown []sqlparser.Expr

// Convert any integer values seen in the binlog events for ENUM or SET
// columns to the string values. The map is keyed on the column number, with
// the value being the map of ordinal values to string values.
Expand Down Expand Up @@ -564,6 +574,7 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if where == nil {
return nil
}
// Only a series of AND expressions are supported.
exprs := splitAndExpression(nil, where.Expr)
for _, expr := range exprs {
switch expr := expr.(type) {
Expand Down Expand Up @@ -601,10 +612,6 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
// StrVal is varbinary, we do not support varchar since we would have to implement all collation types
if val.Type != sqlparser.IntVal && val.Type != sqlparser.StrVal {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
pv, err := evalengine.Translate(val, &evalengine.Config{
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
Environment: plan.env,
Expand All @@ -622,7 +629,11 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
ColNum: colnum,
Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
})
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
case *sqlparser.FuncExpr:
// We cannot filter binlog events in VStreamer using MySQL functions so
// we only allow the in_keyrange() function, which is VStreamer specific.
if !expr.Name.EqualString("in_keyrange") {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand All @@ -648,6 +659,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
Opcode: IsNotNull,
ColNum: colnum,
})
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand Down
Loading

0 comments on commit 4c27ea8

Please sign in to comment.