diff --git a/output/clickhouse_output.go b/output/clickhouse_output.go index 4f2d910c..38a1631f 100644 --- a/output/clickhouse_output.go +++ b/output/clickhouse_output.go @@ -2,7 +2,6 @@ package output import ( "database/sql" - "encoding/json" "fmt" "math/rand" "strings" @@ -31,8 +30,6 @@ type ClickhouseOutput struct { fieldsLength int query string - desc map[string]*rowDesc - defaultValue map[string]interface{} // columnName -> defaultValue bulkChan chan []map[string]interface{} @@ -45,129 +42,6 @@ type ClickhouseOutput struct { wg sync.WaitGroup } -type rowDesc struct { - Name string `json:"name"` - Type string `json:"type"` - DefaultType string `json:"default_type"` - DefaultExpression string `json:"default_expression"` -} - -func (c *ClickhouseOutput) setTableDesc() { - c.desc = make(map[string]*rowDesc) - - query := fmt.Sprintf("desc table %s", c.table) - glog.V(5).Info(query) - - for i := 0; i < c.dbSelector.Size(); i++ { - nextdb := c.dbSelector.Next() - - db := nextdb.(*sql.DB) - - rows, err := db.Query(query) - if err != nil { - glog.Errorf("query %q error: %s", query, err) - continue - } - defer rows.Close() - - columns, err := rows.Columns() - if err != nil { - glog.Fatalf("could not get columns from query `%s`: %s", query, err) - } - glog.V(10).Infof("desc table columns: %v", columns) - - descMap := make(map[string]string) - for _, c := range columns { - descMap[c] = "" - } - - for rows.Next() { - values := make([]interface{}, 0) - for range columns { - var a string - values = append(values, &a) - } - - if err := rows.Scan(values...); err != nil { - glog.Fatalf("scan rows error: %s", err) - } - - descMap := make(map[string]string) - for i, c := range columns { - descMap[c] = *values[i].(*string) - } - - b, err := json.Marshal(descMap) - if err != nil { - glog.Fatalf("marshal desc error: %s", err) - } - - rowDesc := rowDesc{} - err = json.Unmarshal(b, &rowDesc) - if err != nil { - glog.Fatalf("marshal desc error: %s", err) - } - - glog.V(5).Infof("row desc: %#v", rowDesc) - - c.desc[rowDesc.Name] = &rowDesc - } - - return - } -} - -func (c *ClickhouseOutput) checkColumnDefault() { - fields := make(map[string]bool) - for _, f := range c.fields { - fields[f] = true - } - - for column, d := range c.desc { - if _, ok := fields[column]; !ok { - continue - } - - // TODO default expression should be supported - switch d.DefaultType { - case "MATERIALIZED", "ALIAS", "DEFAULT": - glog.Fatal("MATERIALIZED, ALIAS, DEFAULT field not supported") - } - } -} - -func (c *ClickhouseOutput) setColumnDefault() { - c.setTableDesc() - - c.defaultValue = make(map[string]interface{}) - - for columnName, d := range c.desc { - if d.DefaultType != "" { - c.defaultValue[columnName] = d.DefaultExpression - continue - } - switch d.Type { - case "String", "LowCardinality(String)": - c.defaultValue[columnName] = "" - case "Date", "DateTime": - c.defaultValue[columnName] = time.Unix(0, 0) - case "UInt8", "UInt16", "UInt32", "UInt64", "Int8", "Int16", "Int32", "Int64": - c.defaultValue[columnName] = 0 - case "Float32", "Float64": - c.defaultValue[columnName] = 0.0 - case "IPv4": - c.defaultValue[columnName] = "0.0.0.0" - case "IPv6": - c.defaultValue[columnName] = "::" - case "Array(String)": - c.defaultValue[columnName] = []string{} - default: - glog.Errorf("column: %s, type: %s. unsupported column type, ignore", columnName, d.Type) - continue - } - } -} - func (l *MethodLibrary) NewClickhouseOutput(config map[interface{}]interface{}) *ClickhouseOutput { rand.Seed(time.Now().UnixNano()) p := &ClickhouseOutput{ @@ -254,9 +128,6 @@ func (l *MethodLibrary) NewClickhouseOutput(config map[interface{}]interface{}) } p.dbSelector = NewRRHostSelector(dbsI, 3) - p.setColumnDefault() - p.checkColumnDefault() - concurrent := 1 if v, ok := config["concurrent"]; ok { concurrent = v.(int) @@ -332,15 +203,7 @@ func (p *ClickhouseOutput) innerFlush(events []map[string]interface{}) { for _, event := range events { args := make([]interface{}, p.fieldsLength) for i, field := range p.fields { - if v, ok := event[field]; ok && v != nil { - args[i] = v - } else { - if vv, ok := p.defaultValue[field]; ok { - args[i] = vv - } else { // this should not happen - args[i] = "" - } - } + args[i] = event[field] } if _, err := stmt.Exec(args...); err != nil { glog.Errorf("exec clickhouse insert %v error: %s", event, err)