Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add vtagte target mysql service #602

Merged
merged 13 commits into from
Dec 16, 2024
17 changes: 16 additions & 1 deletion endtoend/branch/branch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ func printBranchShowMergeBackDDL(rows *sql.Rows) {
}

func TestBranchBasic(t *testing.T) {
for i := 0; i < 5; i++ {
testBranchBasic(t)
}
}

func testBranchBasic(t *testing.T) {
testSourceAndTargetClusterConnection(t)
sourcePrepare()
targetPrepare()
Expand Down Expand Up @@ -478,7 +484,16 @@ func TestBranchBasicWithFailPoint(t *testing.T) {
defer rows.Close()
printBranchDiff(rows)

// branch prepare merge back
// test branch prepare merge back

// first let ExecInTxn always rollback, so the merge back ddl table should be empty, because it is inserted in a txn
framework.EnableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/engine/VTGateExecuteInTxnRollback", "return(true)")
framework.QueryNoError(t, targetCluster.WescaleDb, getBranchPrepareMergeBackCMD())
rowsShouleBeEmpty := framework.QueryNoError(t, targetCluster.WescaleDb, "select * from mysql.branch_patch")
defer rowsShouleBeEmpty.Close()
assert.Equal(t, false, rowsShouleBeEmpty.Next())
framework.DisableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/engine/VTGateExecuteInTxnRollback")

framework.EnableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchInsertMergeBackDDLError", "return(true)")
framework.ExecWithErrorContains(t, targetCluster.WescaleDb, "failpoint", getBranchPrepareMergeBackCMD())
expectBranchStatus(t, "origin", "preparing")
Expand Down
1 change: 0 additions & 1 deletion examples/cdc/mirror/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ Copyright ApeCloud, Inc.
Licensed under the Apache v2(found in the LICENSE file in the root directory).
*/


package main

import (
Expand Down
6 changes: 6 additions & 0 deletions go/vt/failpointkey/failpoint_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ var (
Name: "BranchExecuteMergeBackDDLError",
ExampleStr: "return(true)",
}
VTGateExecuteInTxnRollback = FailpointValue{
FullName: "vitess.io/vitess/go/vt/vtgate/engine/VTGateExecuteInTxnRollback",
Name: "VTGateExecuteInTxnRollback",
ExampleStr: "return(true)",
}
)

func init() {
Expand All @@ -108,4 +113,5 @@ func init() {
FailpointTable[BranchApplySnapshotError.FullName] = BranchApplySnapshotError
FailpointTable[BranchInsertMergeBackDDLError.FullName] = BranchInsertMergeBackDDLError
FailpointTable[BranchExecuteMergeBackDDLError.FullName] = BranchExecuteMergeBackDDLError
FailpointTable[VTGateExecuteInTxnRollback.FullName] = VTGateExecuteInTxnRollback
}
10 changes: 6 additions & 4 deletions go/vt/vtgate/branch/branch_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type BranchDiff struct {
}

const (
SelectBatchSize = 5000

// branch meta related

UpsertBranchMetaSQL = `
Expand Down Expand Up @@ -121,7 +123,7 @@ const (

// snapshot related

SelectBranchSnapshotSQL = "select * from mysql.branch_snapshot where Name='%s' order by id"
SelectBranchSnapshotInBatchSQL = "select * from mysql.branch_snapshot where Name='%s' and id > %d order by id asc limit %d"

DeleteBranchSnapshotSQL = "delete from mysql.branch_snapshot where Name='%s'"

Expand All @@ -131,11 +133,11 @@ const (

DeleteBranchMergeBackDDLSQL = "delete from mysql.branch_patch where Name='%s'"

SelectBranchUnmergedDDLSQL = "select * from mysql.branch_patch where Name='%s' and merged = false order by id"
SelectBranchUnmergedDDLInBatchSQL = "select * from mysql.branch_patch where Name='%s' and merged = false and id > %d order by id asc limit %d"

SelectBranchUnmergedDBDDLSQL = "select * from mysql.branch_patch where Name='%s' and merged = false and `table` = '' order by id"
SelectBranchUnmergedDBDDLInBatchSQL = "select * from mysql.branch_patch where Name='%s' and merged = false and `table` = '' and id > %d order by id asc limit %d"

SelectBranchMergeBackDDLSQL = "select * from mysql.branch_patch where Name='%s' order by id"
SelectBranchMergeBackDDLInBatchSQL = "select * from mysql.branch_patch where Name='%s' and id > %d order by id asc limit %d"

InsertBranchMergeBackDDLSQL = "insert into mysql.branch_patch (`Name`, `database`, `table`, `ddl`, `merged`) values ('%s', '%s', '%s', '%s', false)"

Expand Down
73 changes: 40 additions & 33 deletions go/vt/vtgate/branch/branch_service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package branch

import (
"database/sql"
"fmt"
"github.com/pingcap/failpoint"
"strings"
Expand Down Expand Up @@ -378,46 +377,55 @@ func statusIsOneOf(status BranchStatus, statuses []BranchStatus) bool {

func (bs *BranchService) executeMergeBackDDL(name string) error {
// create or drop database first
selectUnmergedDBDDLSQL := getSelectUnmergedDBDDLSQL(name)
rows, err := bs.targetMySQLService.mysqlService.Query(selectUnmergedDBDDLSQL)
if err != nil {
return err
}
defer rows.Close()
err = bs.executeMergeBackDDLOneByOne(rows)
if err != nil {
return err
lastID := 0
for {
selectUnmergedDBDDLSQL := getSelectUnmergedDBDDLInBatchSQL(name, lastID, SelectBatchSize)
rows, err := bs.targetMySQLService.mysqlService.Query(selectUnmergedDBDDLSQL)
if err != nil {
return err
}
err = bs.executeMergeBackDDLOneByOne(rows)
if err != nil {
return err
}
if len(rows) < SelectBatchSize {
break
}
lastID, _ = BytesToInt(rows[len(rows)-1].RowData["id"])
}

// then, execute table ddl
selectMergeBackDDLSQL := getSelectUnmergedDDLSQL(name)

rows2, err := bs.targetMySQLService.mysqlService.Query(selectMergeBackDDLSQL)
if err != nil {
return err
lastID = 0
for {
selectMergeBackDDLSQL := getSelectUnmergedDDLInBatchSQL(name, lastID, SelectBatchSize)
rows2, err := bs.targetMySQLService.mysqlService.Query(selectMergeBackDDLSQL)
if err != nil {
return err
}
err = bs.executeMergeBackDDLOneByOne(rows2)
if err != nil {
return err
}
if len(rows2) < SelectBatchSize {
break
}
lastID, _ = BytesToInt(rows2[len(rows2)-1].RowData["id"])
}
defer rows2.Close()
return bs.executeMergeBackDDLOneByOne(rows2)
return nil
}

// caller should close rows
func (bs *BranchService) executeMergeBackDDLOneByOne(rows *sql.Rows) error {
for rows.Next() {
var (
id int
name string
database string
table string
ddl string
merged bool
)
var err error
if err = rows.Scan(&id, &name, &database, &table, &ddl, &merged); err != nil {
return fmt.Errorf("failed to scan row: %v", err)
}
func (bs *BranchService) executeMergeBackDDLOneByOne(rows Rows) error {
for _, row := range rows {

id, _ := BytesToInt(row.RowData["id"])
table := BytesToString(row.RowData["table"])
database := BytesToString(row.RowData["database"])
ddl := BytesToString(row.RowData["ddl"])

var err error
if table == "" {
// create or drop database ddl
// create or drop database ddl, don't specify database
_, err = bs.sourceMySQLService.mysqlService.Exec("", ddl)
} else {
// todo enhancement: track whether the current ddl to apply has finished or is executing
Expand All @@ -435,7 +443,6 @@ func (bs *BranchService) executeMergeBackDDLOneByOne(rows *sql.Rows) error {
failpoint.Return(fmt.Errorf("error executing merge back ddl by failpoint"))
})
}

return nil
}

Expand Down
101 changes: 38 additions & 63 deletions go/vt/vtgate/branch/common_mysql_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package branch
import "fmt"

type CommonMysqlService struct {
mysqlService *MysqlService
mysqlService MysqlService
}

// GetBranchSchema retrieves CREATE TABLE statements for all tables in databases filtered by `databasesInclude` and `databasesExclude`
Expand All @@ -16,93 +16,68 @@ func (c *CommonMysqlService) GetBranchSchema(databasesInclude, databasesExclude
return nil, fmt.Errorf("no table found")
}

return c.getBranchSchemaInBatches(tableInfos, GetBranchSchemaBatchSize)
return c.getTableSchemaOneByOne(tableInfos)
}

/**********************************************************************************************************************/

// getTableInfos executes the table info query and returns a slice of tableInfo
func (c *CommonMysqlService) getTableInfos(databasesInclude, databasesExclude []string) ([]TableInfo, error) {
query, err := buildTableInfosQuerySQL(databasesInclude, databasesExclude)
if err != nil {
return nil, err
}
var tableInfos []TableInfo

rows, err := c.mysqlService.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to query table information: %v", err)
}
defer rows.Close()
lastSchema := ""
lastTable := ""

var tableInfos []TableInfo
for {
query, err := buildTableInfosQueryInBatchSQL(databasesInclude, databasesExclude, lastSchema, lastTable, SelectBatchSize)
if err != nil {
return nil, err
}

for rows.Next() {
var database, tableName string
if err := rows.Scan(&database, &tableName); err != nil {
return nil, fmt.Errorf("failed to scan query result: %v", err)
rows, err := c.mysqlService.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to query table information: %v", err)
}

for _, row := range rows {
var database, tableName string
database = BytesToString(row.RowData["TABLE_SCHEMA"])
tableName = BytesToString(row.RowData["TABLE_NAME"])

tableInfos = append(tableInfos, TableInfo{database: database, name: tableName})
}
tableInfos = append(tableInfos, TableInfo{database: database, name: tableName})
}

if err = rows.Err(); err != nil {
return nil, fmt.Errorf("error occurred while iterating query results: %v", err)
if len(rows) < SelectBatchSize {
break
}

lastSchema = BytesToString(rows[SelectBatchSize-1].RowData["TABLE_SCHEMA"])
lastTable = BytesToString(rows[SelectBatchSize-1].RowData["TABLE_NAME"])

}

return tableInfos, nil
}

// getBranchSchemaInBatches retrieves CREATE TABLE statements in batches
func (c *CommonMysqlService) getBranchSchemaInBatches(tableInfos []TableInfo, batchSize int) (*BranchSchema, error) {
// get table schema one by one
func (c *CommonMysqlService) getTableSchemaOneByOne(tableInfos []TableInfo) (*BranchSchema, error) {
result := make(map[string]map[string]string)

for i := 0; i < len(tableInfos); i += batchSize {
end := i + batchSize
if end > len(tableInfos) {
end = len(tableInfos)
}
batch := tableInfos[i:end]
for i := 0; i < len(tableInfos); i++ {

combinedQuery := getCombinedShowCreateTableSQL(batch)
query := fmt.Sprintf("SHOW CREATE TABLE `%s`.`%s`;", tableInfos[i].database, tableInfos[i].name)

// Execute the combined query
multiRows, err := c.mysqlService.Query(combinedQuery)
rows, err := c.mysqlService.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to execute combined query: %v", err)
return nil, fmt.Errorf("failed to execute query %v: %v", query, err)
}

// Process each result set in the batch
for j := 0; j < len(batch); j++ {
table := batch[j]
db := table.database
tableName := table.name

// Ensure database map is initialized
if _, exists := result[db]; !exists {
result[db] = make(map[string]string)
}

// Each SHOW CREATE TABLE result has two columns: Table and Create Table
if !multiRows.Next() {
return nil, fmt.Errorf("unexpected end of result sets while processing %s.%s", db, tableName)
}

var tableNameResult, createTableStmt string
if err := multiRows.Scan(&tableNameResult, &createTableStmt); err != nil {
return nil, fmt.Errorf("failed to scan create table result for %s.%s: %v", db, tableName, err)
}

// Store the result
result[db][tableName] = createTableStmt

// Move to next result set, unless it's the last table in the batch
if j < len(batch)-1 {
if !multiRows.NextResultSet() {
return nil, fmt.Errorf("failed to move to next result set after processing %s.%s", db, tableName)
}
for _, row := range rows {
if _, exists := result[tableInfos[i].database]; !exists {
result[tableInfos[i].database] = make(map[string]string)
}
result[tableInfos[i].database][tableInfos[i].name] = BytesToString(row.RowData["Create Table"])
}

multiRows.Close()
}

return &BranchSchema{branchSchema: result}, nil
Expand Down
Loading
Loading