Skip to content

Commit

Permalink
Fix DDL split events (#960)
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 authored Feb 6, 2025
1 parent 6e545e6 commit 93cc0d6
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 36 deletions.
10 changes: 7 additions & 3 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1906,24 +1906,27 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{heartbeatpb.DDLSpan.TableID},
}
querys, err := SplitQueries(rawEvent.Query)
querys, err := commonEvent.SplitQueries(rawEvent.Query)
if err != nil {
log.Panic("split queries failed", zap.Error(err))
}
var addNames, dropNames []commonEvent.SchemaTableName
allFiltered := true
resultQuerys := make([]string, 0)
if len(querys) != len(rawEvent.MultipleTableInfos) {
log.Panic("rename tables length is not equal table infos", zap.Any("querys", querys), zap.Any("tableInfos", rawEvent.MultipleTableInfos))
}
for i, tableInfo := range rawEvent.MultipleTableInfos {
ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaNames[i], rawEvent.PrevTableNames[i], tableInfo)
ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaNames[i], tableInfo.Name.O, tableInfo)
if ignorePrevTable && ignoreCurrentTable {
continue
}
resultQuerys = append(resultQuerys, querys[i])
allFiltered = false
if isPartitionTable(rawEvent.TableInfo) {
allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
if !ignorePrevTable {
resultQuerys = append(resultQuerys, querys[i])
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, allPhysicalIDs...)
if !ignoreCurrentTable {
// check whether schema change
Expand Down Expand Up @@ -1965,6 +1968,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
}
} else {
if !ignorePrevTable {
resultQuerys = append(resultQuerys, querys[i])
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, tableInfo.ID)
if !ignoreCurrentTable {
if rawEvent.PrevSchemaIDs[i] != rawEvent.CurrentSchemaIDs[i] {
Expand Down Expand Up @@ -2040,7 +2044,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
if allFiltered {
return commonEvent.DDLEvent{}, false
}
querys, err := SplitQueries(rawEvent.Query)
querys, err := commonEvent.SplitQueries(rawEvent.Query)
if err != nil {
log.Panic("split queries failed", zap.Error(err))
}
Expand Down
30 changes: 0 additions & 30 deletions logservice/schemastore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,6 @@ import (
"go.uber.org/zap"
)

func SplitQueries(queries string) ([]string, error) {
p := parser.New()
stmts, warns, err := p.ParseSQL(queries)
for _, w := range warns {
log.Warn("parse sql warnning", zap.Error(w))
}
if err != nil {
return nil, errors.Trace(err)
}

var res []string
for _, stmt := range stmts {
var sb strings.Builder
err := stmt.Restore(&format.RestoreCtx{
Flags: format.DefaultRestoreFlags,
In: &sb,
})
// The (ast.Node).Restore function generates a SQL string representation of the AST (Abstract Syntax Tree) node.
// By default, the resulting SQL string does not include a trailing semicolon ";".
// Therefore, we explicitly append a semicolon here to ensure the SQL statement is complete.
sb.WriteByte(';')
if err != nil {
return nil, errors.Trace(err)
}
res = append(res, sb.String())
}

return res, nil
}

// transform ddl query based on sql mode.
func transformDDLJobQuery(job *model.Job) (string, error) {
p := parser.New()
Expand Down
27 changes: 24 additions & 3 deletions pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package event
import (
"encoding/binary"
"encoding/json"
"strings"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/apperror"
Expand Down Expand Up @@ -153,8 +152,10 @@ func (d *DDLEvent) GetEvents() []*DDLEvent {
}
case model.ActionCreateTables:
events := make([]*DDLEvent, 0, len(d.TableNameChange.AddName))
// TODO: don't use ; to split query, please use parser
queries := strings.Split(d.Query, ";")
queries, err := SplitQueries(d.Query)
if err != nil {
log.Panic("split queries failed", zap.Error(err))
}
if len(queries) != len(d.TableNameChange.AddName) {
log.Panic("queries length should be equal to addName length", zap.String("query", d.Query), zap.Any("addName", d.TableNameChange.AddName))
}
Expand All @@ -169,6 +170,26 @@ func (d *DDLEvent) GetEvents() []*DDLEvent {
})
}
return events
case model.ActionRenameTables:
events := make([]*DDLEvent, 0, len(d.TableNameChange.DropName))
queries, err := SplitQueries(d.Query)
if err != nil {
log.Panic("split queries failed", zap.Error(err))
}
if len(queries) != len(d.TableNameChange.DropName) {
log.Panic("queries length should be equal to dropName length", zap.String("query", d.Query), zap.Any("dropName", d.TableNameChange.DropName))
}
for i, schemaAndTable := range d.TableNameChange.DropName {
events = append(events, &DDLEvent{
Version: d.Version,
Type: d.Type,
PrevSchemaName: schemaAndTable.SchemaName,
PrevTableName: schemaAndTable.TableName,
Query: queries[i],
FinishedTs: d.FinishedTs,
})
}
return events
default:
}
return []*DDLEvent{d}
Expand Down
140 changes: 140 additions & 0 deletions pkg/common/event/ddl_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -64,3 +65,142 @@ func TestDDLEvent(t *testing.T) {
require.Equal(t, ddlEvent.FinishedTs, reverseEvent.FinishedTs)
require.Equal(t, ddlEvent.Err.Error(), reverseEvent.Err.Error())
}

// TestSplitQueries tests the SplitQueries function
func TestSplitQueries(t *testing.T) {
tests := []struct {
name string
input string
expected []string
expectedError bool
}{
{
name: "Empty input",
input: "",
expected: []string(nil),
expectedError: false,
},
{
name: "Single query without trailing semicolon",
input: "CREATE TABLE test (id INT)",
expected: []string{"CREATE TABLE `test` (`id` INT);"},
expectedError: false,
},
{
name: "Single query with trailing semicolon",
input: "CREATE TABLE test (id INT);",
expected: []string{"CREATE TABLE `test` (`id` INT);"},
expectedError: false,
},
{
name: "Multiple queries with trailing semicolons",
input: `
CREATE TABLE test1 (id INT);
CREATE TABLE test2 (name VARCHAR(20));
INSERT INTO test1 VALUES (1);
`,
expected: []string{
"CREATE TABLE `test1` (`id` INT);",
"CREATE TABLE `test2` (`name` VARCHAR(20));",
"INSERT INTO `test1` VALUES (1);",
},
expectedError: false,
},
{
name: "Query with semicolons inside column values",
input: `
CREATE TABLE test (name VARCHAR(50));
INSERT INTO test VALUES ('This; is; a test');
`,
expected: []string{
"CREATE TABLE `test` (`name` VARCHAR(50));",
"INSERT INTO `test` VALUES (_UTF8MB4'This; is; a test');",
},
expectedError: false,
},
{
name: "Query with escaped quotes inside strings",
input: `
CREATE TABLE test (name VARCHAR(50));
INSERT INTO test VALUES ('This ''is'' a test');
`,
expected: []string{
"CREATE TABLE `test` (`name` VARCHAR(50));",
"INSERT INTO `test` VALUES (_UTF8MB4'This ''is'' a test');",
},
expectedError: false,
},
{
name: "Nested queries or functions with semicolons",
input: `
CREATE TABLE test (id INT, name VARCHAR(50));
INSERT INTO test VALUES (1, CONCAT('Name;', 'Test'));
`,
expected: []string{
"CREATE TABLE `test` (`id` INT,`name` VARCHAR(50));",
"INSERT INTO `test` VALUES (1,CONCAT(_UTF8MB4'Name;', _UTF8MB4'Test'));",
},
expectedError: false,
},
{
name: "Malformed SQL query",
input: "CREATE TABLE test (id INT;",
expected: nil,
expectedError: true,
},
{
name: "SQL injection edge case",
input: `
CREATE TABLE users (id INT, name VARCHAR(50));
INSERT INTO users VALUES (1, 'test; DROP TABLE users; --');
`,
expected: []string{
"CREATE TABLE `users` (`id` INT,`name` VARCHAR(50));",
"INSERT INTO `users` VALUES (1,_UTF8MB4'test; DROP TABLE users; --');",
},
expectedError: false,
},
{
name: "Complex queries with comments",
input: `
-- This is a comment
CREATE TABLE test (id INT); -- Inline comment
/* Multi-line
comment */
INSERT INTO test VALUES (1);
`,
expected: []string{
"CREATE TABLE `test` (`id` INT);",
"INSERT INTO `test` VALUES (1);",
},
expectedError: false,
},
{
name: "Queries with whitespace and newlines",
input: `
CREATE TABLE test (id INT);
INSERT INTO test VALUES (1);
`,
expected: []string{
"CREATE TABLE `test` (`id` INT);",
"INSERT INTO `test` VALUES (1);",
},
expectedError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := SplitQueries(tt.input)
if tt.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expected, result)
}
})
}
}
41 changes: 41 additions & 0 deletions pkg/common/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ import (
"github.com/pingcap/tidb/pkg/kv"
timeta "github.com/pingcap/tidb/pkg/meta"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/format"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
// NOTE: Do not remove the `test_driver` import.
// For details, refer to: https://github.com/pingcap/parser/issues/43
_ "github.com/pingcap/tidb/pkg/parser/test_driver"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -248,3 +254,38 @@ func (s *EventTestHelper) Close() {
func toTableInfosKey(schema, table string) string {
return schema + "." + table
}

// SplitQueries takes a string containing multiple SQL statements and splits them into individual SQL statements.
// This function is designed for scenarios like batch creation of tables, where multiple `CREATE TABLE` statements
// might be combined into a single query string.
func SplitQueries(queries string) ([]string, error) {
// Note: The parser is not thread-safe, so we create a new instance of the parser for each use.
// However, the overhead of creating a new parser is minimal, so there is no need to worry about performance.
p := parser.New()
stmts, warns, err := p.ParseSQL(queries)
for _, w := range warns {
log.Warn("parse sql warnning", zap.Error(w))
}
if err != nil {
return nil, errors.WrapError(errors.ErrTiDBUnexpectedJobMeta, err)
}

var res []string
for _, stmt := range stmts {
var sb strings.Builder
err := stmt.Restore(&format.RestoreCtx{
Flags: format.DefaultRestoreFlags,
In: &sb,
})
if err != nil {
return nil, errors.WrapError(errors.ErrTiDBUnexpectedJobMeta, err)
}
// The (ast.Node).Restore function generates a SQL string representation of the AST (Abstract Syntax Tree) node.
// By default, the resulting SQL string does not include a trailing semicolon ";".
// Therefore, we explicitly append a semicolon here to ensure the SQL statement is complete.
sb.WriteByte(';')
res = append(res, sb.String())
}

return res, nil
}

0 comments on commit 93cc0d6

Please sign in to comment.