Skip to content

Commit

Permalink
Merge pull request #292 from YenchangChan/main
Browse files Browse the repository at this point in the history
distributed ddl queue
  • Loading branch information
YenchangChan authored Mar 21, 2024
2 parents f581646 + 5aae795 commit 378c870
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 21 deletions.
2 changes: 1 addition & 1 deletion ckconfig/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func profiles(userProfiles []model.Profile, info HostInfo) map[string]interface{
defaultProfile["max_query_size"] = 1073741824
defaultProfile["distributed_aggregation_memory_efficient"] = 1
defaultProfile["joined_subquery_requires_alias"] = 0
defaultProfile["distributed_ddl_task_timeout"] = 15
defaultProfile["distributed_ddl_task_timeout"] = 60
defaultProfile["allow_drop_detached"] = 1
defaultProfile["use_uncompressed_cache"] = 0
defaultProfile["max_execution_time"] = 3600 // 1 hour
Expand Down
2 changes: 2 additions & 0 deletions common/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"net"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -92,6 +93,7 @@ func ConnectClickHouse(host string, database string, opt model.ConnetOption) (*C
}
}
conn := Conn{
addr: net.JoinHostPort(host, fmt.Sprint(opt.Port)),
protocol: opt.Protocol,
ctx: context.Background(),
}
Expand Down
29 changes: 29 additions & 0 deletions common/ck_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ func (c *ColumnType) ScanType() reflect.Type {
}
}

type Row struct {
proto clickhouse.Protocol
r1 *sql.Row
r2 driver.Row
}

func (r *Row) Scan(dest ...any) error {
if r.proto == clickhouse.HTTP {
return r.r1.Scan(dest...)
} else {
return r.r2.Scan(dest...)
}
}

type Rows struct {
protocol clickhouse.Protocol
rs1 *sql.Rows
Expand Down Expand Up @@ -87,6 +101,7 @@ func (r *Rows) ColumnTypes() ([]*ColumnType, error) {
}

type Conn struct {
addr string
protocol clickhouse.Protocol
c driver.Conn
db *sql.DB
Expand All @@ -95,6 +110,7 @@ type Conn struct {

func (c *Conn) Query(query string, args ...any) (*Rows, error) {
var rs Rows
//log.Logger.Debugf("[%s]%s", c.addr, query)
rs.protocol = c.protocol
if c.protocol == clickhouse.HTTP {
rows, err := c.db.Query(query, args...)
Expand All @@ -114,7 +130,20 @@ func (c *Conn) Query(query string, args ...any) (*Rows, error) {
return &rs, nil
}

func (c *Conn) QueryRow(query string, args ...any) *Row {
var row Row
//log.Logger.Debugf("[%s]%s", c.addr, query)
row.proto = c.protocol
if c.protocol == clickhouse.HTTP {
row.r1 = c.db.QueryRow(query, args...)
} else {
row.r2 = c.c.QueryRow(c.ctx, query, args...)
}
return &row
}

func (c *Conn) Exec(query string, args ...any) error {
//log.Logger.Debugf("[%s]%s", c.addr, query)
if c.protocol == clickhouse.HTTP {
_, err := c.db.Exec(query, args...)
return err
Expand Down
45 changes: 43 additions & 2 deletions controller/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2043,15 +2043,16 @@ func (controller *ClickHouseController) GetOpenSessions(c *gin.Context) {
func (controller *ClickHouseController) KillOpenSessions(c *gin.Context) {
clusterName := c.Param(ClickHouseClusterPath)
host := c.Query("host")
queryId := c.Query("query_id")
queryId := c.Query("queryId")
typ := c.Query("type")

conf, err := repository.Ps.GetClusterbyName(clusterName)
if err != nil {
controller.wrapfunc(c, model.E_RECORD_NOT_FOUND, fmt.Sprintf("cluster %s does not exist", clusterName))
return
}

err = clickhouse.KillCkOpenSessions(&conf, host, queryId)
err = clickhouse.KillCkOpenSessions(&conf, host, queryId, typ)
if err != nil {
controller.wrapfunc(c, model.E_DATA_UPDATE_FAILED, err)
return
Expand Down Expand Up @@ -2121,6 +2122,46 @@ func (controller *ClickHouseController) GetSlowSessions(c *gin.Context) {
controller.wrapfunc(c, model.E_SUCCESS, sessions)
}

// @Summary 查询分布式DDL
// @Description 查询分布式DDL
// @version 1.0
// @Security ApiKeyAuth
// @Tags clickhouse
// @Accept json
// @Param clusterName path string true "cluster name" default(test)
// @Failure 200 {string} json "{"code":"5800","msg":"集群不存在","data":""}"
// @Failure 200 {string} json "{"code":"5804","msg":"数据查询失败","data":""}"
// @Success 200 {string} json "{"code":"0000","msg":"ok","data":[{"startTime":1609986493,"queryDuration":145,"query":"select * from dist_sensor_dt_result_online limit 10000","user":"default","queryId":"8aa3de08-92c4-4102-a83d-2f5d88569dab","address":"::1","threads":2}]}"
// @Router /api/v2/ck/ddl_queue/{clusterName} [get]
func (controller *ClickHouseController) GetDistDDLQueue(c *gin.Context) {
clusterName := c.Param(ClickHouseClusterPath)
conf, err := repository.Ps.GetClusterbyName(clusterName)
if err != nil {
controller.wrapfunc(c, model.E_RECORD_NOT_FOUND, fmt.Sprintf("cluster %s does not exist", clusterName))
return
}

var gotError bool
ddlQueue, err := clickhouse.GetDistibutedDDLQueue(&conf)
if err != nil {
gotError = true
err = common.ClikHouseExceptionDecode(err)
var exception *client.Exception
if errors.As(err, &exception) {
if exception.Code == 60 {
// we do not return error when system.query_log is not exist
gotError = false
}
}
}
if gotError {
controller.wrapfunc(c, model.E_DATA_SELECT_FAILED, err)
return
}

controller.wrapfunc(c, model.E_SUCCESS, ddlQueue)
}

// @Summary Ping集群是否健康
// @Description 探测集群是否可以正常对外提供服务
// @version 1.0
Expand Down
35 changes: 35 additions & 0 deletions docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,41 @@ var doc = `{
}
}
},
"/api/v2/ck/ddl_queue/{clusterName}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "查询分布式DDL",
"consumes": [
"application/json"
],
"tags": [
"clickhouse"
],
"summary": "查询分布式DDL",
"parameters": [
{
"type": "string",
"default": "test",
"description": "cluster name",
"name": "clusterName",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "{\"code\":\"0000\",\"msg\":\"ok\",\"data\":[{\"startTime\":1609986493,\"queryDuration\":145,\"query\":\"select * from dist_sensor_dt_result_online limit 10000\",\"user\":\"default\",\"queryId\":\"8aa3de08-92c4-4102-a83d-2f5d88569dab\",\"address\":\"::1\",\"threads\":2}]}",
"schema": {
"type": "string"
}
}
}
}
},
"/api/v2/ck/destroy/{clusterName}": {
"put": {
"security": [
Expand Down
35 changes: 35 additions & 0 deletions docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,41 @@
}
}
},
"/api/v2/ck/ddl_queue/{clusterName}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "查询分布式DDL",
"consumes": [
"application/json"
],
"tags": [
"clickhouse"
],
"summary": "查询分布式DDL",
"parameters": [
{
"type": "string",
"default": "test",
"description": "cluster name",
"name": "clusterName",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "{\"code\":\"0000\",\"msg\":\"ok\",\"data\":[{\"startTime\":1609986493,\"queryDuration\":145,\"query\":\"select * from dist_sensor_dt_result_online limit 10000\",\"user\":\"default\",\"queryId\":\"8aa3de08-92c4-4102-a83d-2f5d88569dab\",\"address\":\"::1\",\"threads\":2}]}",
"schema": {
"type": "string"
}
}
}
}
},
"/api/v2/ck/destroy/{clusterName}": {
"put": {
"security": [
Expand Down
23 changes: 23 additions & 0 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,29 @@ paths:
summary: 修改集群配置
tags:
- clickhouse
/api/v2/ck/ddl_queue/{clusterName}:
get:
consumes:
- application/json
description: 查询分布式DDL
parameters:
- default: test
description: cluster name
in: path
name: clusterName
required: true
type: string
responses:
"200":
description: '{"code":"0000","msg":"ok","data":[{"startTime":1609986493,"queryDuration":145,"query":"select
* from dist_sensor_dt_result_online limit 10000","user":"default","queryId":"8aa3de08-92c4-4102-a83d-2f5d88569dab","address":"::1","threads":2}]}'
schema:
type: string
security:
- ApiKeyAuth: []
summary: 查询分布式DDL
tags:
- clickhouse
/api/v2/ck/destroy/{clusterName}:
put:
consumes:
Expand Down
2 changes: 1 addition & 1 deletion frontend
2 changes: 1 addition & 1 deletion model/ck_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type CkSessionInfo struct {
QueryId string `json:"queryId"`
Address string `json:"address"`
Threads int `json:"threads"`
Host string `json:"host"` //sql running in which node
Host string `json:"host"` //sql running in which node
}

type SessionCond struct {
Expand Down
1 change: 1 addition & 0 deletions router/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func InitRouterV1(groupV1 *gin.RouterGroup, config *config.CKManConfig, signal c
groupV1.GET(fmt.Sprintf("/ck/open_sessions/:%s", controller.ClickHouseClusterPath), ckController.GetOpenSessions)
groupV1.PUT(fmt.Sprintf("/ck/open_sessions/:%s", controller.ClickHouseClusterPath), ckController.KillOpenSessions)
groupV1.GET(fmt.Sprintf("/ck/slow_sessions/:%s", controller.ClickHouseClusterPath), ckController.GetSlowSessions)
groupV1.GET(fmt.Sprintf("/ck/ddl_queue/:%s", controller.ClickHouseClusterPath), ckController.GetDistDDLQueue)
groupV1.POST(fmt.Sprintf("/ck/node/:%s", controller.ClickHouseClusterPath), ckController.AddNode)
groupV1.DELETE(fmt.Sprintf("/ck/node/:%s", controller.ClickHouseClusterPath), ckController.DeleteNode)
groupV1.PUT(fmt.Sprintf("/ck/node/start/:%s", controller.ClickHouseClusterPath), ckController.StartNode)
Expand Down
1 change: 1 addition & 0 deletions router/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func InitRouterV2(groupV2 *gin.RouterGroup, config *config.CKManConfig, signal c
groupV2.GET(fmt.Sprintf("/ck/open-sessions/:%s", controller.ClickHouseClusterPath), ckController.GetOpenSessions)
groupV2.PUT(fmt.Sprintf("/ck/open-sessions/:%s", controller.ClickHouseClusterPath), ckController.KillOpenSessions)
groupV2.GET(fmt.Sprintf("/ck/slow-sessions/:%s", controller.ClickHouseClusterPath), ckController.GetSlowSessions)
groupV2.GET(fmt.Sprintf("/ck/ddl_queue/:%s", controller.ClickHouseClusterPath), ckController.GetDistDDLQueue)
groupV2.POST(fmt.Sprintf("/ck/node/:%s", controller.ClickHouseClusterPath), ckController.AddNode)
groupV2.DELETE(fmt.Sprintf("/ck/node/:%s", controller.ClickHouseClusterPath), ckController.DeleteNode)
groupV2.PUT(fmt.Sprintf("/ck/node/start/:%s", controller.ClickHouseClusterPath), ckController.StartNode)
Expand Down
Loading

0 comments on commit 378c870

Please sign in to comment.