Skip to content

Commit

Permalink
syncer(dm): add option for explicit auto-id-cache handling in create …
Browse files Browse the repository at this point in the history
…table DDLs
  • Loading branch information
michaelmdeng authored and Michael Deng committed Jan 30, 2025
1 parent ddbfbf2 commit c43a3d9
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
4 changes: 4 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ type SyncerConfig struct {
Compact bool `yaml:"compact" toml:"compact" json:"compact"`
MultipleRows bool `yaml:"multiple-rows" toml:"multiple-rows" json:"multiple-rows"`

AutoIDCacheSize uint64 `yaml:"auto-id-cache" toml:"auto-id-cache" json:"auto-id-cache"`

// deprecated
MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"`

Expand Down Expand Up @@ -1175,6 +1177,7 @@ type SyncerConfigForDowngrade struct {
SafeModeDuration string `yaml:"safe-mode-duration,omitempty"`
Compact bool `yaml:"compact,omitempty"`
MultipleRows bool `yaml:"multipleRows,omitempty"`
AutoIDCacheSize uint64 `yaml:"auto-id-cache-size,omitempty"`
}

// NewSyncerConfigsForDowngrade converts SyncerConfig to SyncerConfigForDowngrade.
Expand All @@ -1195,6 +1198,7 @@ func NewSyncerConfigsForDowngrade(syncerConfigs map[string]*SyncerConfig) map[st
EnableANSIQuotes: syncerConfig.EnableANSIQuotes,
Compact: syncerConfig.Compact,
MultipleRows: syncerConfig.MultipleRows,
AutoIDCacheSize: syncerConfig.AutoIDCacheSize,
}
syncerConfigsForDowngrade[configName] = newSyncerConfig
}
Expand Down
37 changes: 37 additions & 0 deletions dm/syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type DDLWorker struct {
tableRouter *regexprrouter.RouteTable
sourceTableNamesFlavor conn.LowerCaseTableNamesFlavor
collationCompatible string
autoIDCacheSize uint64
charsetAndDefaultCollation map[string]string
idAndCollationMap map[int]string
baList *filter.Filter
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewDDLWorker(pLogger *log.Logger, syncer *Syncer) *DDLWorker {
tableRouter: syncer.tableRouter,
sourceTableNamesFlavor: syncer.SourceTableNamesFlavor,
collationCompatible: syncer.cfg.CollationCompatible,
autoIDCacheSize: syncer.cfg.AutoIDCacheSize,
charsetAndDefaultCollation: syncer.charsetAndDefaultCollation,
idAndCollationMap: syncer.idAndCollationMap,
baList: syncer.baList,
Expand Down Expand Up @@ -1322,6 +1324,8 @@ func (ddl *DDLWorker) genDDLInfo(qec *queryEventContext, sql string) (*ddlInfo,
ddl.adjustCollation(ddlInfo, qec.eventStatusVars, ddl.charsetAndDefaultCollation, ddl.idAndCollationMap)
}

ddl.adjustAutoIDCache(ddlInfo)

routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.stmtCache, ddlInfo.targetTables)
ddlInfo.routedDDL = routedDDL
return ddlInfo, err
Expand Down Expand Up @@ -1487,6 +1491,39 @@ ColumnLoop:
}
}

func (ddl *DDLWorker) adjustAutoIDCache(ddlInfo *ddlInfo) {
if ddl.autoIDCacheSize <= 0 {
return
}

if createStmt, ok := ddlInfo.stmtCache.(*ast.CreateTableStmt); ok {
addAutoIDCache := false
for _, col := range createStmt.Cols {
for _, opt := range col.Options {
if opt.Tp == ast.ColumnOptionAutoIncrement {
addAutoIDCache = true
}
}
}

for _, opt := range createStmt.Options {
switch opt.Tp {
case ast.TableOptionAutoIncrement:
addAutoIDCache = true
case ast.TableOptionAutoIdCache:
return
}
}

if addAutoIDCache {
createStmt.Options = append(createStmt.Options, &ast.TableOption{
Tp: ast.TableOptionAutoIdCache,
UintValue: ddl.autoIDCacheSize,
})
}
}
}

type ddlInfo struct {
originDDL string
routedDDL string
Expand Down
60 changes: 60 additions & 0 deletions dm/syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,66 @@ func TestAdjustCollation(t *testing.T) {
}
}

func TestAdjustAutoIDCache(t *testing.T) {
cases := []struct {
sql string
expectedSQL string
cacheSize uint64
}{
{
"CREATE TABLE `test`.`t1` (`id` INT)",
"CREATE TABLE `test`.`t1` (`id` INT)",
0,
},
{
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT)",
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT)",
0,
},
{
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT)",
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT) /*T![auto_id_cache] AUTO_ID_CACHE = 1 */",
1,
},
{
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT)",
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT) /*T![auto_id_cache] AUTO_ID_CACHE = 30 */",
30,
},
}

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestAdjustAutoIDCache")))
p := parser.New()
tab := &filter.Table{
Schema: "test",
Name: "t1",
}

for _, cse := range cases {
syncer := NewSyncer(&config.SubTaskConfig{
SyncerConfig: config.SyncerConfig{
AutoIDCacheSize: cse.cacheSize,
},
}, nil, nil)
syncer.tctx = tctx
ddlWorker := NewDDLWorker(&tctx.Logger, syncer)
ddlInfo := &ddlInfo{
originDDL: cse.sql,
routedDDL: cse.sql,
sourceTables: []*filter.Table{tab},
targetTables: []*filter.Table{tab},
}
stmt, err := p.ParseOneStmt(cse.sql, "", "")
require.NoError(t, err)
require.NotNil(t, stmt)
ddlInfo.stmtCache = stmt
ddlWorker.adjustAutoIDCache(ddlInfo)
routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.stmtCache, ddlInfo.targetTables)
require.NoError(t, err)
require.Equal(t, cse.expectedSQL, routedDDL)
}
}

type mockOnlinePlugin struct {
toFinish map[string]struct{}
}
Expand Down

0 comments on commit c43a3d9

Please sign in to comment.