Skip to content

Commit

Permalink
add: support for DEFAULT keyword for INSERT stmt
Browse files Browse the repository at this point in the history
close #106
  • Loading branch information
childe committed Oct 12, 2020
1 parent 06ba22a commit ba37a70
Showing 1 changed file with 1 addition and 138 deletions.
139 changes: 1 addition & 138 deletions output/clickhouse_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package output

import (
"database/sql"
"encoding/json"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -31,8 +30,6 @@ type ClickhouseOutput struct {

fieldsLength int
query string
desc map[string]*rowDesc
defaultValue map[string]interface{} // columnName -> defaultValue

bulkChan chan []map[string]interface{}

Expand All @@ -45,129 +42,6 @@ type ClickhouseOutput struct {
wg sync.WaitGroup
}

type rowDesc struct {
Name string `json:"name"`
Type string `json:"type"`
DefaultType string `json:"default_type"`
DefaultExpression string `json:"default_expression"`
}

func (c *ClickhouseOutput) setTableDesc() {
c.desc = make(map[string]*rowDesc)

query := fmt.Sprintf("desc table %s", c.table)
glog.V(5).Info(query)

for i := 0; i < c.dbSelector.Size(); i++ {
nextdb := c.dbSelector.Next()

db := nextdb.(*sql.DB)

rows, err := db.Query(query)
if err != nil {
glog.Errorf("query %q error: %s", query, err)
continue
}
defer rows.Close()

columns, err := rows.Columns()
if err != nil {
glog.Fatalf("could not get columns from query `%s`: %s", query, err)
}
glog.V(10).Infof("desc table columns: %v", columns)

descMap := make(map[string]string)
for _, c := range columns {
descMap[c] = ""
}

for rows.Next() {
values := make([]interface{}, 0)
for range columns {
var a string
values = append(values, &a)
}

if err := rows.Scan(values...); err != nil {
glog.Fatalf("scan rows error: %s", err)
}

descMap := make(map[string]string)
for i, c := range columns {
descMap[c] = *values[i].(*string)
}

b, err := json.Marshal(descMap)
if err != nil {
glog.Fatalf("marshal desc error: %s", err)
}

rowDesc := rowDesc{}
err = json.Unmarshal(b, &rowDesc)
if err != nil {
glog.Fatalf("marshal desc error: %s", err)
}

glog.V(5).Infof("row desc: %#v", rowDesc)

c.desc[rowDesc.Name] = &rowDesc
}

return
}
}

func (c *ClickhouseOutput) checkColumnDefault() {
fields := make(map[string]bool)
for _, f := range c.fields {
fields[f] = true
}

for column, d := range c.desc {
if _, ok := fields[column]; !ok {
continue
}

// TODO default expression should be supported
switch d.DefaultType {
case "MATERIALIZED", "ALIAS", "DEFAULT":
glog.Fatal("MATERIALIZED, ALIAS, DEFAULT field not supported")
}
}
}

func (c *ClickhouseOutput) setColumnDefault() {
c.setTableDesc()

c.defaultValue = make(map[string]interface{})

for columnName, d := range c.desc {
if d.DefaultType != "" {
c.defaultValue[columnName] = d.DefaultExpression
continue
}
switch d.Type {
case "String", "LowCardinality(String)":
c.defaultValue[columnName] = ""
case "Date", "DateTime":
c.defaultValue[columnName] = time.Unix(0, 0)
case "UInt8", "UInt16", "UInt32", "UInt64", "Int8", "Int16", "Int32", "Int64":
c.defaultValue[columnName] = 0
case "Float32", "Float64":
c.defaultValue[columnName] = 0.0
case "IPv4":
c.defaultValue[columnName] = "0.0.0.0"
case "IPv6":
c.defaultValue[columnName] = "::"
case "Array(String)":
c.defaultValue[columnName] = []string{}
default:
glog.Errorf("column: %s, type: %s. unsupported column type, ignore", columnName, d.Type)
continue
}
}
}

func (l *MethodLibrary) NewClickhouseOutput(config map[interface{}]interface{}) *ClickhouseOutput {
rand.Seed(time.Now().UnixNano())
p := &ClickhouseOutput{
Expand Down Expand Up @@ -254,9 +128,6 @@ func (l *MethodLibrary) NewClickhouseOutput(config map[interface{}]interface{})
}
p.dbSelector = NewRRHostSelector(dbsI, 3)

p.setColumnDefault()
p.checkColumnDefault()

concurrent := 1
if v, ok := config["concurrent"]; ok {
concurrent = v.(int)
Expand Down Expand Up @@ -332,15 +203,7 @@ func (p *ClickhouseOutput) innerFlush(events []map[string]interface{}) {
for _, event := range events {
args := make([]interface{}, p.fieldsLength)
for i, field := range p.fields {
if v, ok := event[field]; ok && v != nil {
args[i] = v
} else {
if vv, ok := p.defaultValue[field]; ok {
args[i] = vv
} else { // this should not happen
args[i] = ""
}
}
args[i] = event[field]
}
if _, err := stmt.Exec(args...); err != nil {
glog.Errorf("exec clickhouse insert %v error: %s", event, err)
Expand Down

0 comments on commit ba37a70

Please sign in to comment.