Skip to content

Commit

Permalink
fix: clickhouse not support alter_sync before v23.3
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed May 24, 2024
1 parent db3a1c1 commit 9d18d7a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 30 deletions.
7 changes: 7 additions & 0 deletions common/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,10 @@ func Execute(conf *model.CKManClickHouseConfig, sql string) error {

return lastErr
}

func WithAlterSync(version string) string {
if CompareClickHouseVersion(version, "23.3") >= 0 {
return "SETTINGS alter_sync = 0"
}
return ""
}
26 changes: 13 additions & 13 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,11 @@ func (ck *CkService) AlterTable(params *model.AlterCkTableParams) error {
for _, value := range params.Add {
add := ""
if value.After != "" {
add = fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` ADD COLUMN IF NOT EXISTS `%s` %s %s AFTER `%s` SETTINGS alter_sync = 0",
params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "), value.After)
add = fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` ADD COLUMN IF NOT EXISTS `%s` %s %s AFTER `%s` %s",
params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "), value.After, common.WithAlterSync(ck.Config.Version))
} else {
add = fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` ADD COLUMN IF NOT EXISTS `%s` %s %s SETTINGS alter_sync = 0",
params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "))
add = fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` ADD COLUMN IF NOT EXISTS `%s` %s %s %s",
params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "), common.WithAlterSync(ck.Config.Version))
}
log.Logger.Debugf(add)
if err := ck.Conn.Exec(add); err != nil {
Expand All @@ -461,8 +461,8 @@ func (ck *CkService) AlterTable(params *model.AlterCkTableParams) error {
rows.Close()
}

modify := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` MODIFY COLUMN IF EXISTS `%s` %s %s SETTINGS alter_sync = 0",
params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "))
modify := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` MODIFY COLUMN IF EXISTS `%s` %s %s %s",
params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "), common.WithAlterSync(ck.Config.Version))
log.Logger.Debugf(modify)
if err := ck.Conn.Exec(modify); err != nil {
return errors.Wrap(err, "")
Expand All @@ -471,8 +471,8 @@ func (ck *CkService) AlterTable(params *model.AlterCkTableParams) error {

// delete column
for _, value := range params.Drop {
drop := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` DROP COLUMN IF EXISTS `%s` SETTINGS alter_sync = 0",
params.DB, local, params.Cluster, value)
drop := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` DROP COLUMN IF EXISTS `%s` %s",
params.DB, local, params.Cluster, value, common.WithAlterSync(ck.Config.Version))
log.Logger.Debugf(drop)
if err := ck.Conn.Exec(drop); err != nil {
return errors.Wrap(err, "")
Expand All @@ -484,8 +484,8 @@ func (ck *CkService) AlterTable(params *model.AlterCkTableParams) error {
if value.From == "" || value.To == "" {
return errors.Errorf("form %s or to %s must not be empty", value.From, value.To)
}
rename := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` RENAME COLUMN IF EXISTS `%s` TO `%s` SETTINGS alter_sync = 0",
params.DB, local, params.Cluster, value.From, value.To)
rename := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` RENAME COLUMN IF EXISTS `%s` TO `%s` %s",
params.DB, local, params.Cluster, value.From, value.To, common.WithAlterSync(ck.Config.Version))

log.Logger.Debugf(rename)
if err := ck.Conn.Exec(rename); err != nil {
Expand Down Expand Up @@ -590,14 +590,14 @@ func (ck *CkService) AlterTableTTL(req *model.AlterTblsTTLReq) error {
if req.TTLType != "" {
if req.TTLType == model.TTLTypeModify {
if req.TTLExpr != "" {
ttl := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` MODIFY TTL %s SETTINGS alter_sync = 0", table.Database, local, ck.Config.Cluster, req.TTLExpr)
ttl := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` MODIFY TTL %s %s", table.Database, local, ck.Config.Cluster, req.TTLExpr, common.WithAlterSync(ck.Config.Version))
log.Logger.Debugf(ttl)
if err := ck.Conn.Exec(ttl); err != nil {
return errors.Wrap(err, "")
}
}
} else if req.TTLType == model.TTLTypeRemove {
ttl := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` REMOVE TTL SETTINGS alter_sync = 0", table.Database, local, ck.Config.Cluster)
ttl := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` REMOVE TTL %s", table.Database, local, ck.Config.Cluster, common.WithAlterSync(ck.Config.Version))
log.Logger.Debugf(ttl)
if err := ck.Conn.Exec(ttl); err != nil {
return errors.Wrap(err, "")
Expand Down Expand Up @@ -1987,7 +1987,7 @@ func MoveExceptToOthers(conf *model.CKManClickHouseConfig, except, target, datab
if err != nil {
return err
}
query = fmt.Sprintf("TRUNCATE TABLE `%s`.`%s` SETTINGS alter_sync = 0", database, table)
query = fmt.Sprintf("TRUNCATE TABLE `%s`.`%s` %s", database, table, common.WithAlterSync(conf.Version))
log.Logger.Debugf("[%s] %s", except, query)
conn = common.GetConnection(except)
err = conn.Exec(query)
Expand Down
44 changes: 27 additions & 17 deletions service/cron/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,17 @@ func syncLogicbyTable(clusters []string, database, localTable string) error {
columnsExpr := strings.Join(columns, ",")
// local table
onCluster := fmt.Sprintf("ON CLUSTER `%s`", cluster)
if err = alterTable(ckService.Conn, database, localTable, onCluster, columnsExpr); err != nil {
if err = alterTable(ckService.Conn, database, localTable, onCluster, columnsExpr, ckService.Config.Version); err != nil {
return err
}

// distributed table
if err = alterTable(ckService.Conn, database, common.ClickHouseDistributedTablePrefix+localTable, onCluster, columnsExpr); err != nil {
if err = alterTable(ckService.Conn, database, common.ClickHouseDistributedTablePrefix+localTable, onCluster, columnsExpr, ckService.Config.Version); err != nil {
return err
}

// logic table
if err = alterTable(ckService.Conn, database, common.ClickHouseDistTableOnLogicPrefix+localTable, onCluster, columnsExpr); err != nil {
if err = alterTable(ckService.Conn, database, common.ClickHouseDistTableOnLogicPrefix+localTable, onCluster, columnsExpr, ckService.Config.Version); err != nil {
return err
}

Expand Down Expand Up @@ -233,13 +233,15 @@ func SyncDistSchema() error {
if err != nil {
continue
}
query := `SELECT
query := fmt.Sprintf(`SELECT
database,
name,
(extractAllGroups(engine_full, '(Distributed\\(\')(.*)\',\\s+\'(.*)\',\\s+\'(.*)\'(.*)')[1])[2] AS cluster,
(extractAllGroups(engine_full, '(Distributed\\(\')(.*)\',\\s+\'(.*)\',\\s+\'(.*)\'(.*)')[1])[4] AS local
FROM system.tables
WHERE match(engine, 'Distributed') AND (database NOT IN ('system', 'information_schema', 'INFORMATION_SCHEMA'))`
WHERE match(engine, 'Distributed') AND (database NOT IN ('system', 'information_schema', 'INFORMATION_SCHEMA'))
AND cluster = '%s'`, conf.Cluster)
log.Logger.Debugf("[%s]%s", conf.Cluster, query)
rows, err := ckService.Conn.Query(query)
if err != nil {
continue
Expand Down Expand Up @@ -293,7 +295,7 @@ func syncDistTable(distTable, localTable, database string, conf model.CKManClick
if needAlter {
log.Logger.Debugf("need alter table, table %s.%s have different columns on cluster %s", database, localTable, conf.Cluster)
for host, cols := range tableLists {
if err := syncSchema(dbLists[host], allCols, cols, database, localTable, ""); err != nil {
if err := syncSchema(dbLists[host], allCols, cols, database, localTable, "", conf.Version); err != nil {
return err
}
}
Expand All @@ -310,8 +312,8 @@ func syncDistTable(distTable, localTable, database string, conf model.CKManClick
return errors.Wrap(err, host)
}
onCluster := fmt.Sprintf("ON CLUSTER %s", conf.Cluster)
if err = syncSchema(conn, allCols, distCols, database, distTable, onCluster); err != nil {
return errors.Wrap(err, host)
if err = syncSchema(conn, allCols, distCols, database, distTable, onCluster, conf.Version); err != nil {
return errors.Wrap(err, "dist table")
}

logicTable := common.ClickHouseDistTableOnLogicPrefix + localTable
Expand All @@ -328,8 +330,16 @@ func syncDistTable(distTable, localTable, database string, conf model.CKManClick
return errors.Wrap(err, host)
}

if err = syncSchema(conn, allCols, logicCols, database, logicTable, onCluster); err != nil {
return errors.Wrap(err, host)
if err = syncSchema(conn, allCols, logicCols, database, logicTable, onCluster, conf.Version); err != nil {
err = common.ClikHouseExceptionDecode(err)
var exception *client.Exception
if errors.As(err, &exception) {
// 逻辑表不存在没关系,不报错
if exception.Code == 60 {
continue
}
}
return errors.Wrap(err, "logic table")
}

}
Expand All @@ -347,11 +357,11 @@ func initCKConns(conf model.CKManClickHouseConfig) (err error) {
return
}

func alterTable(conn *common.Conn, database, table, onCluster, col string) error {
func alterTable(conn *common.Conn, database, table, onCluster, col, version string) error {
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s %s",
database, table, onCluster, col)
if onCluster != "" {
query += "SETTINGS alter_sync = 0"
query += " " + common.WithAlterSync(version)
}
log.Logger.Debug(query)
return conn.Exec(query)
Expand All @@ -375,20 +385,20 @@ func getColumns(conn *common.Conn, database, table string) (common.Map, error) {
return tblMap, nil
}

func syncSchema(conn *common.Conn, allCols, cols common.Map, database, table, oncluster string) error {
func syncSchema(conn *common.Conn, allCols, cols common.Map, database, table, oncluster, version string) error {
needAdds := allCols.Difference(cols).(common.Map)
var columns []string
for k, v := range needAdds {
columns = append(columns, fmt.Sprintf("ADD COLUMN IF NOT EXISTS `%s` %s ", k, v))
}

// 当前节点是全量的列, 无需更新
if len(columns) == 0 {
// 当前节点是全量的列, 无需更新, columns 和allCols相等, 说明有表不存在,也不管了
if len(columns) == 0 || len(columns) == len(allCols) {
return nil
}
// local table
if err := alterTable(conn, database, table, oncluster, strings.Join(columns, ",")); err != nil {
return err
if err := alterTable(conn, database, table, oncluster, strings.Join(columns, ","), version); err != nil {
return errors.Wrapf(err, table)
}
return nil
}

0 comments on commit 9d18d7a

Please sign in to comment.