From 951819d4be0d5c0a5d3b3bffe14609d713637e80 Mon Sep 17 00:00:00 2001 From: Dj Gilcrease <d.gilcrease@f5.com> Date: Fri, 12 Oct 2018 11:15:08 -0700 Subject: [PATCH 01/12] Allow setting a struct field as both an influx tag and field --- decode.go | 8 ++++---- decode_test.go | 6 ++---- encode.go | 23 ++++++++++++----------- tag.go | 32 +++++++++++++++++++------------- tag_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 75 insertions(+), 32 deletions(-) create mode 100644 tag_test.go diff --git a/decode.go b/decode.go index a91df01..c35ae35 100644 --- a/decode.go +++ b/decode.go @@ -72,8 +72,8 @@ func decode(columns []string, values [][]interface{}, result interface{}) error continue } - tag = getInfluxFieldTagName(tag) - i, ok := colIndex[tag] + fieldData := getInfluxFieldTagData(f.String(), tag) + i, ok := colIndex[fieldData.fieldName] if !ok { continue @@ -86,12 +86,12 @@ func decode(columns []string, values [][]interface{}, result interface{}) error if f.Type() == reflect.TypeOf(time.Time{}) { timeS, ok := vIn[i].(string) if !ok { - e := errors.New("Time input is not string") + e := errors.New("time input is not string") errs = appendErrors(errs, e) } else { time, err := time.Parse(time.RFC3339, timeS) if err != nil { - e := errors.New("Error parsing time") + e := errors.New("error parsing time") errs = appendErrors(errs, e) } else { vIn[i] = time diff --git a/decode_test.go b/decode_test.go index a30fbc2..cb670fc 100644 --- a/decode_test.go +++ b/decode_test.go @@ -88,10 +88,8 @@ func TestDecodeMissingColumn(t *testing.T) { decoded := []DecodeType{} err := decode(columns, values, &decoded) - if err == nil { - t.Error("Expected error decoding: ", err) - } else { - fmt.Println("Got expected error: ", err) + if err != nil { + t.Error("UnExpected error decoding: ", columns, values, &decoded) } if !reflect.DeepEqual(expected, decoded) { diff --git a/encode.go b/encode.go index 8ac928a..22fd9e7 100644 --- a/encode.go +++ b/encode.go @@ -2,12 +2,12 @@ package influxdbhelper import ( "errors" + "fmt" "reflect" "time" ) -func encode(d interface{}) (t time.Time, tags map[string]string, - fields map[string]interface{}, err error) { +func encode(d interface{}) (t time.Time, tags map[string]string, fields map[string]interface{}, err error) { tags = make(map[string]string) fields = make(map[string]interface{}) dValue := reflect.ValueOf(d) @@ -18,25 +18,26 @@ func encode(d interface{}) (t time.Time, tags map[string]string, for i := 0; i < dValue.NumField(); i++ { f := dValue.Field(i) + fieldName := dValue.Type().Field(i).Name fieldTag := dValue.Type().Field(i).Tag.Get("influx") + fieldData := getInfluxFieldTagData(fieldName, fieldTag) - isTag := isInfluxTag(fieldTag) - name := getInfluxFieldTagName(fieldTag) - - if name == "-" { + if fieldData.fieldName == "-" { continue } - if name == "time" { + if fieldData.fieldName == "time" { // TODO error checking t = f.Interface().(time.Time) continue } - if isTag { - tags[name] = f.String() - } else { - fields[name] = f.Interface() + if fieldData.isTag { + tags[fieldData.fieldName] = fmt.Sprintf("%v", f) + } + + if fieldData.isField { + fields[fieldData.fieldName] = f.Interface() } } diff --git a/tag.go b/tag.go index c8ced08..813d8bd 100644 --- a/tag.go +++ b/tag.go @@ -2,26 +2,32 @@ package influxdbhelper import "strings" -func isInfluxTag(structTag string) bool { +type influxFieldTagData struct { + fieldName string + isTag bool + isField bool +} + +func getInfluxFieldTagData(fieldName, structTag string) (fieldData *influxFieldTagData) { + fieldData = &influxFieldTagData{fieldName: fieldName} parts := strings.Split(structTag, ",") + fieldName, parts = parts[0], parts[1:] + if fieldName != "" { + fieldData.fieldName = fieldName + } for _, part := range parts { if part == "tag" { - return true + fieldData.isTag = true + } + if part == "field" { + fieldData.isField = true } } - return false -} - -func getInfluxFieldTagName(structTag string) string { - parts := strings.Split(structTag, ",") - - for _, part := range parts { - if part != "tag" { - return part - } + if !fieldData.isField && !fieldData.isTag { + fieldData.isField = true } - return "" + return } diff --git a/tag_test.go b/tag_test.go new file mode 100644 index 0000000..dc0355d --- /dev/null +++ b/tag_test.go @@ -0,0 +1,38 @@ +package influxdbhelper + +import "testing" + +func TestTag(t *testing.T) { + data := []struct { + fieldTag string + structFieldName string + fieldName string + isTag bool + isField bool + }{ + {"", "Test", "Test", false, true}, + {"", "Test", "Test", false, true}, + {",tag", "Test", "Test", true, false}, + {",field,tag", "Test", "Test", true, true}, + {",tag,field", "Test", "Test", true, true}, + {",field", "Test", "Test", false, true}, + {"test", "Test", "test", false, true}, + {"test,tag", "Test", "test", true, false}, + {"test,field,tag", "Test", "test", true, true}, + {"test,tag,field", "Test", "test", true, true}, + {"test,field", "Test", "test", false, true}, + } + + for _, testData := range data { + fieldData := getInfluxFieldTagData(testData.structFieldName, testData.fieldTag) + if fieldData.fieldName != testData.fieldName { + t.Errorf("%v != %v", fieldData.fieldName, testData.fieldName) + } + if fieldData.isField != testData.isField { + t.Errorf("%v != %v", fieldData.isField, testData.isField) + } + if fieldData.isTag != testData.isTag { + t.Errorf("%v != %v", fieldData.isTag, testData.isTag) + } + } +} From 740d51bb26a7433ed513aa02c24db6da4545b7b8 Mon Sep 17 00:00:00 2001 From: Dj Gilcrease <d.gilcrease@f5.com> Date: Fri, 12 Oct 2018 11:30:36 -0700 Subject: [PATCH 02/12] Add more tests to encode to cover the new cases --- encode_test.go | 31 +++++++++++++++++++------------ examples/writeread.go | 2 +- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/encode_test.go b/encode_test.go index 1bc53e2..0c01784 100644 --- a/encode_test.go +++ b/encode_test.go @@ -15,36 +15,43 @@ func TestEncodeDataNotStruct(t *testing.T) { func TestEncode(t *testing.T) { type MyType struct { - Time time.Time `influx:"time"` - TagValue string `influx:"tagValue,tag"` - IntValue int `influx:"intValue"` - FloatValue float64 `influx:"floatValue"` - BoolValue bool `influx:"boolValue"` - StringValue string `influx:"stringValue"` - IgnoredValue string `influx:"-"` + Time time.Time `influx:"time"` + TagValue string `influx:"tagValue,tag"` + TagAndFieldValue string `influx:"tagAndFieldValue,tag,field"` + IntValue int `influx:"intValue"` + FloatValue float64 `influx:"floatValue"` + BoolValue bool `influx:"boolValue"` + StringValue string `influx:"stringValue"` + StructFieldName string `influx:""` + IgnoredValue string `influx:"-"` } d := MyType{ time.Now(), "tag-value", + "tag-and-field-value", 10, 10.5, true, "string", + "struct-field", "ignored", } timeExp := d.Time tagsExp := map[string]string{ - "tagValue": "tag-value", + "tagValue": "tag-value", + "tagAndFieldValue": "tag-and-field-value", } fieldsExp := map[string]interface{}{ - "intValue": d.IntValue, - "floatValue": d.FloatValue, - "boolValue": d.BoolValue, - "stringValue": d.StringValue, + "tagAndFieldValue": d.TagAndFieldValue, + "intValue": d.IntValue, + "floatValue": d.FloatValue, + "boolValue": d.BoolValue, + "stringValue": d.StringValue, + "StructFieldName": d.StructFieldName, } tm, tags, fields, err := encode(d) diff --git a/examples/writeread.go b/examples/writeread.go index 81f0e09..b88ec7a 100644 --- a/examples/writeread.go +++ b/examples/writeread.go @@ -48,7 +48,7 @@ type envSample struct { type envSampleRead struct { Time time.Time `influx:"time"` Location string `influx:"location,tag"` - City string `influx:"city,tag"` + City string `influx:"city,tag,field"` Temperature float64 `influx:"temperature"` Humidity float64 `influx:"humidity"` Cycles float64 `influx:"cycles"` From 24ff6935936e0c2799bbde18082521b381663727 Mon Sep 17 00:00:00 2001 From: Dj Gilcrease <d.gilcrease@f5.com> Date: Fri, 12 Oct 2018 16:07:49 -0700 Subject: [PATCH 03/12] Refactor to start v2 * Make the client an interface that also implements the influx client interface * refactor the decode to use the mapstruct decode instead of a fully custom decoder * decode now works on a influxdb models.Row directly * Add a Measurment type that has a required field name of InfluxMeasurement similar to XMLName, xml.Name in structs --- README.md | 9 +- client.go | 136 +++++++++++++++++++++++------ client_test.go | 4 +- decode.go | 164 ++++++++-------------------------- decode_test.go | 166 ++++++++++++++++++++++------------- encode.go | 19 +++- encode_test.go | 32 ++++++- examples/writeread.go | 198 +++++++++++++++++++++--------------------- tag.go | 6 +- 9 files changed, 408 insertions(+), 326 deletions(-) diff --git a/README.md b/README.md index e1490a8..499e471 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ const ( db = "dbhelper" ) -var c *influxdbhelper.Client +var c influxdbhelper.Client func Init() (err error) { c, err = influxdbhelper.NewClient(influxUrl, "", "", "ns") @@ -44,6 +44,7 @@ func Init() (err error) { } type EnvSample struct { + InfluxMeasurement influxdbhelper.Measurement Time time.Time `influx:"time"` Location string `influx:"location,tag"` Temperature float64 `influx:"temperature"` @@ -56,6 +57,7 @@ func generateSampleData() []EnvSample { for i, _ := range ret { ret[i] = EnvSample{ + InfluxMeasurement: "test" Time: time.Now(), Location: "Rm 243", Temperature: 70 + float64(i), @@ -75,8 +77,9 @@ func main() { // write sample data to database samples := generateSampleData() + c = c.UseDB(db) for _, p := range samples { - err := c.WritePoint(db, "test", p) + err := c.WritePoint(p) if err != nil { log.Fatal("Error writing point: ", err) } @@ -86,7 +89,7 @@ func main() { samplesRead := []EnvSample{} q := `SELECT * FROM test ORDER BY time DESC LIMIT 10` - err = c.Query(db, q, &samplesRead) + err = c.UseDB(db).Query(q, &samplesRead) if err != nil { log.Fatal("Query error: ", err) } diff --git a/client.go b/client.go index 35b26f3..4e7d884 100644 --- a/client.go +++ b/client.go @@ -1,11 +1,12 @@ package influxdbhelper import ( + "fmt" "regexp" "strings" "time" - client "github.com/influxdata/influxdb/client/v2" + influxClient "github.com/influxdata/influxdb/client/v2" ) var reRemoveExtraSpace = regexp.MustCompile(`\s\s+`) @@ -18,28 +19,54 @@ func CleanQuery(query string) string { return ret } -// A Client represents an influxdbhelper client connection to +type Client interface { + influxClient.Client + + // UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields + UseDB(db string) Client + + // UseMeasurement sets the measurment to use for WritePoint, and WritePointTagsFields + UseMeasurement(measurement string) Client + + // Query executes an InfluxDb query, and unpacks the result into the + // result data structure. + DecodeQuery(query string, result interface{}) error + + // WritePoint is used to write arbitrary data into InfluxDb. + WritePoint(data interface{}) error + + // WritePointTagsFields is used to write a point specifying tags and fields. + WritePointTagsFields(tags map[string]string, fields map[string]interface{}, t time.Time) error +} + +// A Client represents an influxdbhelper influxClient connection to // an InfluxDb server. -type Client struct { +type helperClient struct { url string - client client.Client + client influxClient.Client precision string + using *helperUsing +} + +type helperUsing struct { + db string + measurement string } -// NewClient returns a new influxdbhelper client given a url, user, +// NewClient returns a new influxdbhelper influxClient given a url, user, // password, and precision strings. // // url is typically something like: http://localhost:8086 // // precision can be ‘h’, ‘m’, ‘s’, ‘ms’, ‘u’, or ‘ns’ and is // used during write operations. -func NewClient(url, user, passwd, precision string) (*Client, error) { - ret := Client{ +func NewClient(url, user, passwd, precision string) (Client, error) { + ret := &helperClient{ url: url, precision: precision, } - client, err := client.NewHTTPClient(client.HTTPConfig{ + client, err := influxClient.NewHTTPClient(influxClient.HTTPConfig{ Addr: url, Username: user, Password: passwd, @@ -47,15 +74,52 @@ func NewClient(url, user, passwd, precision string) (*Client, error) { ret.client = client - return &ret, err + return ret, err } -// InfluxClient returns the influxdb/client/v2 client if low level -// queries or writes need to be executed. -func (c Client) InfluxClient() client.Client { - return c.client +// Ping checks that status of cluster, and will always return 0 time and no +// error for UDP clients. +func (c *helperClient) Ping(timeout time.Duration) (time.Duration, string, error) { + return c.client.Ping(timeout) } +// Write takes a BatchPoints object and writes all Points to InfluxDB. +func (c *helperClient) Write(bp influxClient.BatchPoints) error { + return c.client.Write(bp) +} + +// Query makes an InfluxDB Query on the database. This will fail if using +// the UDP client. +func (c *helperClient) Query(q influxClient.Query) (*influxClient.Response, error) { + return c.client.Query(q) +} + +// Close releases any resources a Client may be using. +func (c *helperClient) Close() error { + return c.client.Close() +} + +// UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields +func (c *helperClient) UseDB(db string) Client { + if c.using == nil { + c.using = &helperUsing{} + } + + c.using.db = db + return c +} + +// UseMeasurement sets the DB to use for Query, WritePoint, and WritePointTagsFields +func (c *helperClient) UseMeasurement(measurement string) Client { + if c.using == nil { + c.using = &helperUsing{} + } + + c.using.measurement = measurement + return c +} + + // Query executes an InfluxDb query, and unpacks the result into the // result data structure. // @@ -67,16 +131,21 @@ func (c Client) InfluxClient() client.Client { // and InfluxDb field/tag names typically start with a lower case letter. // The struct field tag can be set to '-' which indicates this field // should be ignored. -func (c Client) Query(db, cmd string, result interface{}) (err error) { - query := client.Query{ - Command: cmd, - Database: db, +func (c *helperClient) DecodeQuery(q string, result interface{}) (err error) { + if c.using == nil { + return fmt.Errorf("no db set for query") + } + + query := influxClient.Query{ + Command: q, + Database: c.using.db, Chunked: false, ChunkSize: 100, } - var response *client.Response + var response *influxClient.Response response, err = c.client.Query(query) + c.using = nil if response.Error() != nil { return response.Error() @@ -92,8 +161,7 @@ func (c Client) Query(db, cmd string, result interface{}) (err error) { } series := results[0].Series[0] - - err = decode(series.Columns, series.Values, result) + err = decode(series, result) return } @@ -105,20 +173,33 @@ func (c Client) Query(db, cmd string, result interface{}) (err error) { // struct field should be an InfluxDb tag (vs field). A tag of '-' indicates // the struct field should be ignored. A struct field of Time is required and // is used for the time of the sample. -func (c Client) WritePoint(db, measurement string, data interface{}) error { - t, tags, fields, err := encode(data) +func (c *helperClient) WritePoint(data interface{}) error { + if c.using == nil { + return fmt.Errorf("no db set for query") + } + + t, tags, fields, measurement, err := encode(data) + + if c.using.measurement == "" { + c.using.measurement = measurement + } if err != nil { return err } - return c.WritePointTagsFields(db, measurement, tags, fields, t) + return c.WritePointTagsFields( tags, fields, t) } + // WritePointTagsFields is used to write a point specifying tags and fields. -func (c Client) WritePointTagsFields(db, measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error { - bp, err := client.NewBatchPoints(client.BatchPointsConfig{ - Database: db, +func (c *helperClient) WritePointTagsFields(tags map[string]string, fields map[string]interface{}, t time.Time) (err error) { + if c.using == nil { + return fmt.Errorf("no db set for query") + } + + bp, err := influxClient.NewBatchPoints(influxClient.BatchPointsConfig{ + Database: c.using.db, Precision: c.precision, }) @@ -126,7 +207,8 @@ func (c Client) WritePointTagsFields(db, measurement string, tags map[string]str return err } - pt, err := client.NewPoint(measurement, tags, fields, t) + pt, err := influxClient.NewPoint(c.using.measurement, tags, fields, t) + c.using = nil if err != nil { return err diff --git a/client_test.go b/client_test.go index d5b6981..27299c1 100644 --- a/client_test.go +++ b/client_test.go @@ -23,7 +23,7 @@ func ExampleClient_WritePoint() { Id: "12432as32", } - c.WritePoint("myDb", "test", s) + c.UseDB("myDb").UseMeasurement("test").WritePoint(s) } func ExampleClient_Query() { @@ -41,7 +41,7 @@ func ExampleClient_Query() { q := `SELECT * FROM test ORDER BY time DESC LIMIT 10` - c.Query("myDb", q, &samplesRead) + c.UseDB("myDb").DecodeQuery(q, &samplesRead) // samplesRead is now populated with data from InfluxDb } diff --git a/decode.go b/decode.go index c35ae35..1e55d1a 100644 --- a/decode.go +++ b/decode.go @@ -1,150 +1,56 @@ package influxdbhelper import ( - "encoding/json" - "errors" - "fmt" "reflect" - "strconv" "time" + + influxModels "github.com/influxdata/influxdb/models" + "github.com/mitchellh/mapstructure" ) + + // Decode is used to process data returned by an InfluxDb query and uses reflection // to transform it into an array of structs of type result. // // This function is used internally by the Query function. -func decode(columns []string, values [][]interface{}, result interface{}) error { - colIndex := map[string]int{} - for i, col := range columns { - colIndex[col] = i - } - - resultV := reflect.ValueOf(result) - if resultV.Kind() != reflect.Ptr { - return errors.New("result must be ptr") - } - - resultSlice := resultV.Elem() - - if !resultSlice.CanAddr() { - return errors.New("result must be addressable (a pointer)") - } - - if resultSlice.Kind() != reflect.Slice { - return errors.New("result must be ptr to slice") - } - - resultStruct := resultSlice.Type().Elem() - if resultStruct.Kind() != reflect.Struct { - return errors.New("result must be slice of structs") - } - - numFields := resultStruct.NumField() - resultStructFields := []reflect.StructField{} - resultStructTags := []string{} - - for i := 0; i < numFields; i++ { - f := resultStruct.Field(i) - resultStructFields = append(resultStructFields, - f) - resultStructTags = append(resultStructTags, - f.Tag.Get("influx")) - } - - // not sure why we need to do this, but we need to Set resultSlice - // at the end of this function for things to work - resultSliceRet := resultSlice - - // Accumulate any errors - errs := make([]string, 0) - - typeError := false - - for _, vIn := range values { - vOut := reflect.Indirect(reflect.New(resultStruct)) - valueCount := 0 - for i := 0; i < vOut.NumField(); i++ { - f := vOut.Field(i) - // FIXME, not sure how to get the tags - // from vOut - tag := resultStructTags[i] - if tag == "-" { - continue - } - - fieldData := getInfluxFieldTagData(f.String(), tag) - i, ok := colIndex[fieldData.fieldName] - - if !ok { - continue - } - - if vIn[i] == nil { - continue - } - - if f.Type() == reflect.TypeOf(time.Time{}) { - timeS, ok := vIn[i].(string) - if !ok { - e := errors.New("time input is not string") - errs = appendErrors(errs, e) - } else { - time, err := time.Parse(time.RFC3339, timeS) - if err != nil { - e := errors.New("error parsing time") - errs = appendErrors(errs, e) - } else { - vIn[i] = time - } - } - } - - if reflect.TypeOf(vIn[i]) == reflect.TypeOf(json.Number("1")) { - if f.Type() == reflect.TypeOf(1.0) { - vInJSONNum, _ := vIn[i].(json.Number) - vInFloat, err := strconv.ParseFloat(string(vInJSONNum), 64) - if err != nil { - es := "error converting json.Number" - errs = appendErrors(errs, errors.New(es)) - } - vIn[i] = vInFloat - } else { - vInJSONNum, _ := vIn[i].(json.Number) - vInFloat, err := strconv.Atoi(string(vInJSONNum)) - if err != nil { - es := "error converting json.Number" - errs = appendErrors(errs, errors.New(es)) - } - vIn[i] = vInFloat - } +func decode(influxResult influxModels.Row, result interface{}) error { + influxData := make([]map[string]interface{}, 0) + + for _, v := range influxResult.Values { + r := make(map[string]interface{}) + for i, c := range influxResult.Columns { + if len(v) >= i+1 { + r[c] = v[i] } + } + for tag, val := range influxResult.Tags { + r[tag] = val + } + r["InfluxMeasurement"] = influxResult.Name - if reflect.TypeOf(vIn[i]) != f.Type() { - if !typeError { - es := fmt.Sprintf("Type mismatch on decode of %v: %v != %v", - vIn[i], - reflect.TypeOf(vIn[i]).String(), - f.Type().String()) + influxData = append(influxData, r) + } - errs = appendErrors(errs, errors.New(es)) - typeError = true - } - continue + config := &mapstructure.DecoderConfig{ + Metadata: nil, + Result: result, + TagName: "influx", + WeaklyTypedInput: false, + ZeroFields: false, + DecodeHook: func(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + if t == reflect.TypeOf(time.Time{}) && f == reflect.TypeOf("") { + return time.Parse(time.RFC3339, data.(string)) } - f.Set(reflect.ValueOf(vIn[i])) - valueCount = 1 - } - if valueCount > 0 { - resultSliceRet = reflect.Append(resultSliceRet, vOut) - } + return data, nil + }, } - resultSlice.Set(resultSliceRet) - - if len(errs) > 0 { - return &Error{errs} + decoder, err := mapstructure.NewDecoder(config) + if err != nil { + return err } - return nil + return decoder.Decode(influxData) } diff --git a/decode_test.go b/decode_test.go index cb670fc..c7eb97e 100644 --- a/decode_test.go +++ b/decode_test.go @@ -2,27 +2,28 @@ package influxdbhelper import ( "encoding/json" - "fmt" "math" "reflect" "strconv" "testing" "time" + + influxModels "github.com/influxdata/influxdb/models" ) func TestDecode(t *testing.T) { - columns := []string{ - "tagValue", - "intValue", - "floatValue", - "boolValue", - "stringValue", + data := influxModels.Row{ + Name: "bla", + Columns: []string{ + "intValue", + "floatValue", + "boolValue", + "stringValue", + }, + Values: make([][]interface{}, 0), + Tags: map[string]string{"tagValue": "tag-value"}, } - _ = columns - - values := [][]interface{}{} - type DecodeType struct { TagValue string `influx:"tagValue,tag"` IntValue int `influx:"intValue"` @@ -45,7 +46,6 @@ func TestDecode(t *testing.T) { } vI := []interface{}{ - v.TagValue, v.IntValue, v.FloatValue, v.BoolValue, @@ -53,13 +53,13 @@ func TestDecode(t *testing.T) { } expected = append(expected, v) - values = append(values, vI) + data.Values = append(data.Values, vI) } decoded := []DecodeType{} - err := decode(columns, values, &decoded) + err := decode(data, &decoded) if err != nil { t.Error("Error decoding: ", err) } @@ -70,26 +70,27 @@ func TestDecode(t *testing.T) { } func TestDecodeMissingColumn(t *testing.T) { - columns := []string{ - "val1", + data := influxModels.Row{ + Name: "bla", + Columns: []string{ + "val1", + }, + Values: make([][]interface{}, 0), + Tags: map[string]string{}, } - _ = columns - type DecodeType struct { Val1 int `influx:"val1"` Val2 int `influx:"val2"` } expected := []DecodeType{{1, 0}} - - values := [][]interface{}{{1}} - + data.Values = append(data.Values, []interface{}{1}) decoded := []DecodeType{} + err := decode(data, &decoded) - err := decode(columns, values, &decoded) if err != nil { - t.Error("UnExpected error decoding: ", columns, values, &decoded) + t.Error("UnExpected error decoding: ", data, &decoded) } if !reflect.DeepEqual(expected, decoded) { @@ -98,60 +99,62 @@ func TestDecodeMissingColumn(t *testing.T) { } func TestDecodeWrongType(t *testing.T) { - columns := []string{ - "val1", "val2", + data := influxModels.Row{ + Name: "bla", + Columns: []string{ + "val1", + "val2", + }, + Values: make([][]interface{}, 0), + Tags: map[string]string{}, } - _ = columns - type DecodeType struct { Val1 int `influx:"val1"` Val2 float64 `influx:"val2"` } - expected := []DecodeType{} - - values := [][]interface{}{{1.0, 2}} - + expected := []DecodeType{{1, 2.0}} + data.Values = append(data.Values, []interface{}{1.0, 2}) decoded := []DecodeType{} - - err := decode(columns, values, &decoded) - if err == nil { - t.Error("Expected error decoding: ", err) - } else { - fmt.Println("Got expected error: ", err) + err := decode(data, &decoded) + if err != nil { + t.Error("Unexpected error decoding: ", err, data, decoded) } if !reflect.DeepEqual(expected, decoded) { - t.Error("decoded value is not right") + t.Error("decoded value is not right", expected, decoded) } } + func TestDecodeTime(t *testing.T) { - columns := []string{ - "time", "value", + data := influxModels.Row{ + Name: "bla", + Columns: []string{ + "time", + "value", + }, + Values: make([][]interface{}, 0), + Tags: map[string]string{}, } - _ = columns - type DecodeType struct { Time time.Time `influx:"time"` Value float64 `influx:"value"` } timeS := "2018-06-14T21:47:11Z" - time, err := time.Parse(time.RFC3339, timeS) + ti, err := time.Parse(time.RFC3339, timeS) if err != nil { t.Error("error parsing expected time: ", err) } - expected := []DecodeType{{time, 2.0}} - - values := [][]interface{}{{timeS, 2.0}} - + expected := []DecodeType{{ti, 2.0}} + data.Values = append(data.Values, []interface{}{timeS, 2.0}) decoded := []DecodeType{} + err = decode(data, &decoded) - err = decode(columns, values, &decoded) if err != nil { t.Error("Error decoding: ", err) } @@ -162,24 +165,26 @@ func TestDecodeTime(t *testing.T) { } func TestDecodeJsonNumber(t *testing.T) { - columns := []string{ - "val1", "val2", + data := influxModels.Row{ + Name: "bla", + Columns: []string{ + "val1", + "val2", + }, + Values: make([][]interface{}, 0), + Tags: map[string]string{}, } - _ = columns - type DecodeType struct { Val1 int `influx:"val1"` Val2 float64 `influx:"val2"` } expected := []DecodeType{{1, 2.0}} - - values := [][]interface{}{{json.Number("1"), json.Number("2.0")}} - + data.Values = append(data.Values, []interface{}{json.Number("1"), json.Number("2.0")}) decoded := []DecodeType{} + err := decode(data, &decoded) - err := decode(columns, values, &decoded) if err != nil { t.Error("Error decoding: ", err) } @@ -190,24 +195,61 @@ func TestDecodeJsonNumber(t *testing.T) { } func TestDecodeUnsedStructValue(t *testing.T) { - columns := []string{ - "val1", "val2", + data := influxModels.Row{ + Name: "bla", + Columns: []string{ + "val1", + "val2", + }, + Values: make([][]interface{}, 0), + Tags: map[string]string{}, } - _ = columns - type DecodeType struct { Val1 int `influx:"val1"` Val2 float64 `influx:"-"` } expected := []DecodeType{{1, 0}} + data.Values = append(data.Values, []interface{}{1, 1.1}) + decoded := []DecodeType{} + err := decode(data, &decoded) + + if err != nil { + t.Error("Error decoding: ", err) + } + + if !reflect.DeepEqual(expected, decoded) { + t.Error("decoded value is not right") + } +} - values := [][]interface{}{{1}} +func TestDecodeMeasure(t *testing.T) { + data := influxModels.Row{ + Name: "bla", + Columns: []string{ + "val1", + "val2", + }, + Values: make([][]interface{}, 0), + Tags: map[string]string{}, + } + + type DecodeType struct { + InfluxMeasurement Measurement + Val1 int `influx:"val1"` + Val2 float64 `influx:"-"` + } + expected := []DecodeType{{"bla", 1, 0}} + data.Values = append(data.Values, []interface{}{1, 1.1}) decoded := []DecodeType{} + err := decode(data, &decoded) + + if decoded[0].InfluxMeasurement != expected[0].InfluxMeasurement { + t.Error("Decoded Wrong measure") + } - err := decode(columns, values, &decoded) if err != nil { t.Error("Error decoding: ", err) } diff --git a/encode.go b/encode.go index 22fd9e7..4a6ca59 100644 --- a/encode.go +++ b/encode.go @@ -7,10 +7,15 @@ import ( "time" ) -func encode(d interface{}) (t time.Time, tags map[string]string, fields map[string]interface{}, err error) { +func encode(d interface{}) (t time.Time, tags map[string]string, fields map[string]interface{}, measurement string, err error) { tags = make(map[string]string) fields = make(map[string]interface{}) dValue := reflect.ValueOf(d) + + if dValue.Kind() == reflect.Ptr { + dValue = reflect.Indirect(dValue) + } + if dValue.Kind() != reflect.Struct { err = errors.New("data must be a struct") return @@ -18,9 +23,13 @@ func encode(d interface{}) (t time.Time, tags map[string]string, fields map[stri for i := 0; i < dValue.NumField(); i++ { f := dValue.Field(i) - fieldName := dValue.Type().Field(i).Name + structFieldName := dValue.Type().Field(i).Name + if structFieldName == "InfluxMeasurement" { + measurement = f.String() + continue + } fieldTag := dValue.Type().Field(i).Tag.Get("influx") - fieldData := getInfluxFieldTagData(fieldName, fieldTag) + fieldData := getInfluxFieldTagData(structFieldName, fieldTag) if fieldData.fieldName == "-" { continue @@ -41,5 +50,9 @@ func encode(d interface{}) (t time.Time, tags map[string]string, fields map[stri } } + if measurement == "" { + measurement = dValue.Type().Name() + } + return } diff --git a/encode_test.go b/encode_test.go index 0c01784..0878baa 100644 --- a/encode_test.go +++ b/encode_test.go @@ -7,14 +7,33 @@ import ( ) func TestEncodeDataNotStruct(t *testing.T) { - _, _, _, err := encode([]int{1, 2, 3}) + _, _, _, _, err := encode([]int{1, 2, 3}) if err == nil { t.Error("Expected error") } } +func TestEncodeSetsMesurment(t *testing.T) { + type MyType struct { + Val string `influx:"val"` + } + + d := &MyType{"test-data"} + _, _, _, mesurement, err := encode(d) + + if err != nil { + t.Error("Error encoding: ", err) + } + + if mesurement != "MyType" { + t.Errorf("%v != %v", mesurement, "MyType") + } + +} + func TestEncode(t *testing.T) { type MyType struct { + InfluxMeasurement Measurement Time time.Time `influx:"time"` TagValue string `influx:"tagValue,tag"` TagAndFieldValue string `influx:"tagAndFieldValue,tag,field"` @@ -27,6 +46,7 @@ func TestEncode(t *testing.T) { } d := MyType{ + "test", time.Now(), "tag-value", "tag-and-field-value", @@ -54,12 +74,20 @@ func TestEncode(t *testing.T) { "StructFieldName": d.StructFieldName, } - tm, tags, fields, err := encode(d) + tm, tags, fields, mesurement, err := encode(d) if err != nil { t.Error("Error encoding: ", err) } + if mesurement != d.InfluxMeasurement { + t.Errorf("%v != %v", mesurement, d.InfluxMeasurement) + } + + if _, ok := fields["InfluxMeasurement"]; ok { + t.Errorf("Found InfluxMeasurement in the fields!") + } + if !tm.Equal(timeExp) { t.Error("Time does not match") } diff --git a/examples/writeread.go b/examples/writeread.go index b88ec7a..927b6c9 100644 --- a/examples/writeread.go +++ b/examples/writeread.go @@ -1,98 +1,102 @@ package main - -import ( - "log" - "time" - - "github.com/cbrake/influxdbhelper" - client "github.com/influxdata/influxdb/client/v2" -) - -const ( - // Static connection configuration - influxURL = "http://localhost:8086" - db = "dbhelper" -) - -var c *influxdbhelper.Client - -// Init initializes the database connection -func Init() (err error) { - c, err = influxdbhelper.NewClient(influxURL, "", "", "ns") - if err != nil { - return - } - // Create MM database if it doesn't already exist - q := client.NewQuery("CREATE DATABASE "+db, "", "") - res, err := c.InfluxClient().Query(q) - if err != nil { - return err - } - if res.Error() != nil { - return res.Error() - } - log.Println("dbhelper db initialized") - return nil -} - -type envSample struct { - Time time.Time `influx:"time"` - Location string `influx:"location,tag"` - Temperature float64 `influx:"temperature"` - Humidity float64 `influx:"humidity"` - ID string `influx:"-"` -} - -// we populate a few more fields when reading back -// date to verify unused fields are handled correctly -type envSampleRead struct { - Time time.Time `influx:"time"` - Location string `influx:"location,tag"` - City string `influx:"city,tag,field"` - Temperature float64 `influx:"temperature"` - Humidity float64 `influx:"humidity"` - Cycles float64 `influx:"cycles"` - ID string `influx:"-"` -} - -func generateSampleData() []envSample { - ret := make([]envSample, 10) - - for i := range ret { - ret[i] = envSample{ - Time: time.Now(), - Location: "Rm 243", - Temperature: 70 + float64(i), - Humidity: 60 - float64(i), - ID: "12432as32", - } - } - - return ret -} - -func main() { - err := Init() - if err != nil { - log.Fatal("Failed to initialize db") - } - - // write sample data to database - samples := generateSampleData() - for _, p := range samples { - err := c.WritePoint(db, "test", p) - if err != nil { - log.Fatal("Error writing point: ", err) - } - } - - // query data from db - samplesRead := []envSampleRead{} - - q := `SELECT * FROM test ORDER BY time DESC LIMIT 10` - err = c.Query(db, q, &samplesRead) - if err != nil { - log.Fatal("Query error: ", err) - } - log.Printf("Samples read: %+v\n", samplesRead) -} +// +//import ( +// "log" +// "time" +// +// "github.com/cbrake/influxdbhelper" +// client "github.com/influxdata/influxdb/client/v2" +//) +// +//const ( +// // Static connection configuration +// influxURL = "http://localhost:8086" +// db = "dbhelper" +//) +// +//var c influxdbhelper.Client +// +//// Init initializes the database connection +//func Init() (err error) { +// c, err = influxdbhelper.NewClient(influxURL, "", "", "ns") +// if err != nil { +// return +// } +// // Create MM database if it doesn't already exist +// q := client.NewQuery("CREATE DATABASE "+db, "", "") +// res, err := c.InfluxClient().Query(q) +// if err != nil { +// return err +// } +// if res.Error() != nil { +// return res.Error() +// } +// log.Println("dbhelper db initialized") +// return nil +//} +// +//type envSample struct { +// InfluxMeasurement influxdbhelper.Measurement +// Time time.Time `influx:"time"` +// Location string `influx:"location,tag"` +// Temperature float64 `influx:"temperature"` +// Humidity float64 `influx:"humidity"` +// ID string `influx:"-"` +//} +// +//// we populate a few more fields when reading back +//// date to verify unused fields are handled correctly +//type envSampleRead struct { +// InfluxMeasurement influxdbhelper.Measurement +// Time time.Time `influx:"time"` +// Location string `influx:"location,tag"` +// City string `influx:"city,tag,field"` +// Temperature float64 `influx:"temperature"` +// Humidity float64 `influx:"humidity"` +// Cycles float64 `influx:"cycles"` +// ID string `influx:"-"` +//} +// +//func generateSampleData() []envSample { +// ret := make([]envSample, 10) +// +// for i := range ret { +// ret[i] = envSample{ +// InfluxMeasurement: "test", +// Time: time.Now(), +// Location: "Rm 243", +// Temperature: 70 + float64(i), +// Humidity: 60 - float64(i), +// ID: "12432as32", +// } +// } +// +// return ret +//} +// +//func main() { +// err := Init() +// if err != nil { +// log.Fatal("Failed to initialize db") +// } +// +// // write sample data to database +// samples := generateSampleData() +// c = c.UseDB(db) +// for _, p := range samples { +// err := c.WritePoint(p) +// if err != nil { +// log.Fatal("Error writing point: ", err) +// } +// } +// +// // query data from db +// samplesRead := []envSampleRead{} +// +// q := `SELECT * FROM test ORDER BY time DESC LIMIT 10` +// err = c.UseDB(db).Query(q, &samplesRead) +// if err != nil { +// log.Fatal("Query error: ", err) +// } +// log.Printf("Samples read: %+v\n", samplesRead) +//} diff --git a/tag.go b/tag.go index 813d8bd..8a08828 100644 --- a/tag.go +++ b/tag.go @@ -1,6 +1,10 @@ package influxdbhelper -import "strings" +import ( + "strings" +) + +type Measurement = string type influxFieldTagData struct { fieldName string From 767ba662f421b99d6d9c93374cfb1e29f0d4a5bf Mon Sep 17 00:00:00 2001 From: Dj Gilcrease <d.gilcrease@f5.com> Date: Mon, 15 Oct 2018 12:52:10 -0700 Subject: [PATCH 04/12] Fix an issue that would now allow calling UseDB outside a loop --- client.go | 2 - encode_test.go | 6 +- examples/writeread.go | 202 +++++++++++++++++++++--------------------- 3 files changed, 104 insertions(+), 106 deletions(-) diff --git a/client.go b/client.go index 4e7d884..aa3f398 100644 --- a/client.go +++ b/client.go @@ -145,7 +145,6 @@ func (c *helperClient) DecodeQuery(q string, result interface{}) (err error) { var response *influxClient.Response response, err = c.client.Query(query) - c.using = nil if response.Error() != nil { return response.Error() @@ -208,7 +207,6 @@ func (c *helperClient) WritePointTagsFields(tags map[string]string, fields map[s } pt, err := influxClient.NewPoint(c.using.measurement, tags, fields, t) - c.using = nil if err != nil { return err diff --git a/encode_test.go b/encode_test.go index 0878baa..2efd0b2 100644 --- a/encode_test.go +++ b/encode_test.go @@ -74,14 +74,14 @@ func TestEncode(t *testing.T) { "StructFieldName": d.StructFieldName, } - tm, tags, fields, mesurement, err := encode(d) + tm, tags, fields, measurement, err := encode(d) if err != nil { t.Error("Error encoding: ", err) } - if mesurement != d.InfluxMeasurement { - t.Errorf("%v != %v", mesurement, d.InfluxMeasurement) + if measurement != d.InfluxMeasurement { + t.Errorf("%v != %v", measurement, d.InfluxMeasurement) } if _, ok := fields["InfluxMeasurement"]; ok { diff --git a/examples/writeread.go b/examples/writeread.go index 927b6c9..dc36a5d 100644 --- a/examples/writeread.go +++ b/examples/writeread.go @@ -1,102 +1,102 @@ package main -// -//import ( -// "log" -// "time" -// -// "github.com/cbrake/influxdbhelper" -// client "github.com/influxdata/influxdb/client/v2" -//) -// -//const ( -// // Static connection configuration -// influxURL = "http://localhost:8086" -// db = "dbhelper" -//) -// -//var c influxdbhelper.Client -// -//// Init initializes the database connection -//func Init() (err error) { -// c, err = influxdbhelper.NewClient(influxURL, "", "", "ns") -// if err != nil { -// return -// } -// // Create MM database if it doesn't already exist -// q := client.NewQuery("CREATE DATABASE "+db, "", "") -// res, err := c.InfluxClient().Query(q) -// if err != nil { -// return err -// } -// if res.Error() != nil { -// return res.Error() -// } -// log.Println("dbhelper db initialized") -// return nil -//} -// -//type envSample struct { -// InfluxMeasurement influxdbhelper.Measurement -// Time time.Time `influx:"time"` -// Location string `influx:"location,tag"` -// Temperature float64 `influx:"temperature"` -// Humidity float64 `influx:"humidity"` -// ID string `influx:"-"` -//} -// -//// we populate a few more fields when reading back -//// date to verify unused fields are handled correctly -//type envSampleRead struct { -// InfluxMeasurement influxdbhelper.Measurement -// Time time.Time `influx:"time"` -// Location string `influx:"location,tag"` -// City string `influx:"city,tag,field"` -// Temperature float64 `influx:"temperature"` -// Humidity float64 `influx:"humidity"` -// Cycles float64 `influx:"cycles"` -// ID string `influx:"-"` -//} -// -//func generateSampleData() []envSample { -// ret := make([]envSample, 10) -// -// for i := range ret { -// ret[i] = envSample{ -// InfluxMeasurement: "test", -// Time: time.Now(), -// Location: "Rm 243", -// Temperature: 70 + float64(i), -// Humidity: 60 - float64(i), -// ID: "12432as32", -// } -// } -// -// return ret -//} -// -//func main() { -// err := Init() -// if err != nil { -// log.Fatal("Failed to initialize db") -// } -// -// // write sample data to database -// samples := generateSampleData() -// c = c.UseDB(db) -// for _, p := range samples { -// err := c.WritePoint(p) -// if err != nil { -// log.Fatal("Error writing point: ", err) -// } -// } -// -// // query data from db -// samplesRead := []envSampleRead{} -// -// q := `SELECT * FROM test ORDER BY time DESC LIMIT 10` -// err = c.UseDB(db).Query(q, &samplesRead) -// if err != nil { -// log.Fatal("Query error: ", err) -// } -// log.Printf("Samples read: %+v\n", samplesRead) -//} + +import ( + "log" + "time" + + "github.com/cbrake/influxdbhelper" + client "github.com/influxdata/influxdb/client/v2" +) + +const ( + // Static connection configuration + influxURL = "http://localhost:8086" + db = "dbhelper" +) + +var c influxdbhelper.Client + +// Init initializes the database connection +func Init() (err error) { + c, err = influxdbhelper.NewClient(influxURL, "", "", "ns") + if err != nil { + return + } + // Create MM database if it doesn't already exist + q := client.NewQuery("CREATE DATABASE "+db, "", "") + res, err := c.Query(q) + if err != nil { + return err + } + if res.Error() != nil { + return res.Error() + } + log.Println("dbhelper db initialized") + return nil +} + +type envSample struct { + InfluxMeasurement influxdbhelper.Measurement + Time time.Time `influx:"time"` + Location string `influx:"location,tag"` + Temperature float64 `influx:"temperature"` + Humidity float64 `influx:"humidity"` + ID string `influx:"-"` +} + +// we populate a few more fields when reading back +// date to verify unused fields are handled correctly +type envSampleRead struct { + InfluxMeasurement influxdbhelper.Measurement + Time time.Time `influx:"time"` + Location string `influx:"location,tag"` + City string `influx:"city,tag,field"` + Temperature float64 `influx:"temperature"` + Humidity float64 `influx:"humidity"` + Cycles float64 `influx:"cycles"` + ID string `influx:"-"` +} + +func generateSampleData() []envSample { + ret := make([]envSample, 10) + + for i := range ret { + ret[i] = envSample{ + InfluxMeasurement: "test", + Time: time.Now(), + Location: "Rm 243", + Temperature: 70 + float64(i), + Humidity: 60 - float64(i), + ID: "12432as32", + } + } + + return ret +} + +func main() { + err := Init() + if err != nil { + log.Fatal("Failed to initialize db") + } + + // write sample data to database + samples := generateSampleData() + c = c.UseDB(db) + for _, p := range samples { + err := c.WritePoint(p) + if err != nil { + log.Fatal("Error writing point: ", err) + } + } + + // query data from db + samplesRead := []envSampleRead{} + + q := `SELECT * FROM test ORDER BY time DESC LIMIT 10` + err = c.UseDB(db).DecodeQuery(q, &samplesRead) + if err != nil { + log.Fatal("Query error: ", err) + } + log.Printf("Samples read: %+v\n", samplesRead) +} From 9c4f19629695c2ce1f7335204e133179d37dcdab Mon Sep 17 00:00:00 2001 From: Dj Gilcrease <d.gilcrease@f5.com> Date: Mon, 15 Oct 2018 14:01:58 -0700 Subject: [PATCH 05/12] Cleanup how the Use* feature works --- client.go | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/client.go b/client.go index aa3f398..c274e0a 100644 --- a/client.go +++ b/client.go @@ -48,9 +48,14 @@ type helperClient struct { using *helperUsing } +type usingValue struct { + value string + retain bool +} + type helperUsing struct { - db string - measurement string + db *usingValue + measurement *usingValue } // NewClient returns a new influxdbhelper influxClient given a url, user, @@ -105,7 +110,7 @@ func (c *helperClient) UseDB(db string) Client { c.using = &helperUsing{} } - c.using.db = db + c.using.db = &usingValue{db, true} return c } @@ -115,7 +120,7 @@ func (c *helperClient) UseMeasurement(measurement string) Client { c.using = &helperUsing{} } - c.using.measurement = measurement + c.using.measurement = &usingValue{measurement, true} return c } @@ -132,19 +137,22 @@ func (c *helperClient) UseMeasurement(measurement string) Client { // The struct field tag can be set to '-' which indicates this field // should be ignored. func (c *helperClient) DecodeQuery(q string, result interface{}) (err error) { - if c.using == nil { + if c.using == nil || c.using.db == nil { return fmt.Errorf("no db set for query") } query := influxClient.Query{ Command: q, - Database: c.using.db, + Database: c.using.db.value, Chunked: false, ChunkSize: 100, } var response *influxClient.Response response, err = c.client.Query(query) + if !c.using.db.retain { + c.using.db = nil + } if response.Error() != nil { return response.Error() @@ -173,14 +181,14 @@ func (c *helperClient) DecodeQuery(q string, result interface{}) (err error) { // the struct field should be ignored. A struct field of Time is required and // is used for the time of the sample. func (c *helperClient) WritePoint(data interface{}) error { - if c.using == nil { + if c.using == nil || c.using.db == nil { return fmt.Errorf("no db set for query") } t, tags, fields, measurement, err := encode(data) - if c.using.measurement == "" { - c.using.measurement = measurement + if c.using.measurement == nil { + c.using.measurement = &usingValue{measurement, false} } if err != nil { @@ -193,12 +201,16 @@ func (c *helperClient) WritePoint(data interface{}) error { // WritePointTagsFields is used to write a point specifying tags and fields. func (c *helperClient) WritePointTagsFields(tags map[string]string, fields map[string]interface{}, t time.Time) (err error) { - if c.using == nil { + if c.using == nil || c.using.db == nil { return fmt.Errorf("no db set for query") } + if c.using.measurement == nil { + return fmt.Errorf("no measurement set for query") + } + bp, err := influxClient.NewBatchPoints(influxClient.BatchPointsConfig{ - Database: c.using.db, + Database: c.using.db.value, Precision: c.precision, }) @@ -206,7 +218,13 @@ func (c *helperClient) WritePointTagsFields(tags map[string]string, fields map[s return err } - pt, err := influxClient.NewPoint(c.using.measurement, tags, fields, t) + pt, err := influxClient.NewPoint(c.using.measurement.value, tags, fields, t) + if !c.using.db.retain { + c.using.db = nil + } + if !c.using.measurement.retain { + c.using.measurement = nil + } if err != nil { return err From 8d4a7d928ec3da46ef44e2e63f5b4fce3940f888 Mon Sep 17 00:00:00 2001 From: Dj Gilcrease <d.gilcrease@f5.com> Date: Mon, 15 Oct 2018 14:22:23 -0700 Subject: [PATCH 06/12] Allow specifying the time field instead of hard coding to 'time' --- client.go | 18 ++++++++++++++++-- encode.go | 8 ++++++-- encode_test.go | 29 ++++++++++++++++++++++++----- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index c274e0a..294254d 100644 --- a/client.go +++ b/client.go @@ -56,6 +56,7 @@ type usingValue struct { type helperUsing struct { db *usingValue measurement *usingValue + timeField *usingValue } // NewClient returns a new influxdbhelper influxClient given a url, user, @@ -124,6 +125,16 @@ func (c *helperClient) UseMeasurement(measurement string) Client { return c } +// UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields +func (c *helperClient) UseTimeField(fieldName string) Client { + if c.using == nil { + c.using = &helperUsing{} + } + + c.using.timeField = &usingValue{fieldName, true} + return c +} + // Query executes an InfluxDb query, and unpacks the result into the // result data structure. @@ -185,7 +196,7 @@ func (c *helperClient) WritePoint(data interface{}) error { return fmt.Errorf("no db set for query") } - t, tags, fields, measurement, err := encode(data) + t, tags, fields, measurement, err := encode(data, c.using.timeField) if c.using.measurement == nil { c.using.measurement = &usingValue{measurement, false} @@ -195,7 +206,7 @@ func (c *helperClient) WritePoint(data interface{}) error { return err } - return c.WritePointTagsFields( tags, fields, t) + return c.WritePointTagsFields(tags, fields, t) } @@ -225,6 +236,9 @@ func (c *helperClient) WritePointTagsFields(tags map[string]string, fields map[s if !c.using.measurement.retain { c.using.measurement = nil } + if !c.using.timeField.retain { + c.using.timeField = nil + } if err != nil { return err diff --git a/encode.go b/encode.go index 4a6ca59..1cb0261 100644 --- a/encode.go +++ b/encode.go @@ -7,7 +7,7 @@ import ( "time" ) -func encode(d interface{}) (t time.Time, tags map[string]string, fields map[string]interface{}, measurement string, err error) { +func encode(d interface{}, timeField *usingValue) (t time.Time, tags map[string]string, fields map[string]interface{}, measurement string, err error) { tags = make(map[string]string) fields = make(map[string]interface{}) dValue := reflect.ValueOf(d) @@ -21,6 +21,10 @@ func encode(d interface{}) (t time.Time, tags map[string]string, fields map[stri return } + if timeField == nil { + timeField = &usingValue{"time", false} + } + for i := 0; i < dValue.NumField(); i++ { f := dValue.Field(i) structFieldName := dValue.Type().Field(i).Name @@ -35,7 +39,7 @@ func encode(d interface{}) (t time.Time, tags map[string]string, fields map[stri continue } - if fieldData.fieldName == "time" { + if fieldData.fieldName == timeField.value { // TODO error checking t = f.Interface().(time.Time) continue diff --git a/encode_test.go b/encode_test.go index 2efd0b2..de4ec11 100644 --- a/encode_test.go +++ b/encode_test.go @@ -7,7 +7,7 @@ import ( ) func TestEncodeDataNotStruct(t *testing.T) { - _, _, _, _, err := encode([]int{1, 2, 3}) + _, _, _, _, err := encode([]int{1, 2, 3}, nil) if err == nil { t.Error("Expected error") } @@ -19,16 +19,35 @@ func TestEncodeSetsMesurment(t *testing.T) { } d := &MyType{"test-data"} - _, _, _, mesurement, err := encode(d) + _, _, _, measurement, err := encode(d, nil) if err != nil { t.Error("Error encoding: ", err) } - if mesurement != "MyType" { - t.Errorf("%v != %v", mesurement, "MyType") + if measurement != "MyType" { + t.Errorf("%v != %v", measurement, "MyType") } +} + +func TestEncodeUsesTimeField(t *testing.T) { + type MyType struct { + MyTimeField time.Time `influx:"my_time_field"` + Val string `influx:"val"` + } + + td, _ := time.Parse(time.RFC822, "27 Oct 78 15:04 PST") + d := &MyType{td,"test-data"} + tv, _, _, _, err := encode(d, &usingValue{"my_time_field", false}) + + if tv != td { + t.Error("Did not properly use the time field specified") + } + + if err != nil { + t.Error("Error encoding: ", err) + } } func TestEncode(t *testing.T) { @@ -74,7 +93,7 @@ func TestEncode(t *testing.T) { "StructFieldName": d.StructFieldName, } - tm, tags, fields, measurement, err := encode(d) + tm, tags, fields, measurement, err := encode(d, nil) if err != nil { t.Error("Error encoding: ", err) From 874fa071a2e6b586e1d5f9bc46930889e7f61767 Mon Sep 17 00:00:00 2001 From: Dj Gilcrease <d.gilcrease@f5.com> Date: Mon, 15 Oct 2018 14:38:16 -0700 Subject: [PATCH 07/12] Add UseTimeField to the interface --- client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client.go b/client.go index 294254d..05288ca 100644 --- a/client.go +++ b/client.go @@ -25,9 +25,12 @@ type Client interface { // UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields UseDB(db string) Client - // UseMeasurement sets the measurment to use for WritePoint, and WritePointTagsFields + // UseMeasurement sets the measurement to use for WritePoint, and WritePointTagsFields UseMeasurement(measurement string) Client + // UseTimeField sets the time field to use for WritePoint, and WritePointTagsFields + UseTimeField(fieldName string) Client + // Query executes an InfluxDb query, and unpacks the result into the // result data structure. DecodeQuery(query string, result interface{}) error From 5b2d949b6f7e458b4e925bb913dc312d7e5fb63c Mon Sep 17 00:00:00 2001 From: Cliff Brake <cbrake@bec-systems.com> Date: Thu, 25 Oct 2018 06:39:31 -0400 Subject: [PATCH 08/12] linting fixes --- client.go | 12 +++++------- client_test.go | 6 +++--- examples/writeread.go | 40 ++++++++++++++++++++-------------------- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/client.go b/client.go index 05288ca..e130cf0 100644 --- a/client.go +++ b/client.go @@ -19,6 +19,8 @@ func CleanQuery(query string) string { return ret } +// A Client represents an influxdbhelper influxClient connection to +// an InfluxDb server. type Client interface { influxClient.Client @@ -42,8 +44,6 @@ type Client interface { WritePointTagsFields(tags map[string]string, fields map[string]interface{}, t time.Time) error } -// A Client represents an influxdbhelper influxClient connection to -// an InfluxDb server. type helperClient struct { url string client influxClient.Client @@ -52,14 +52,14 @@ type helperClient struct { } type usingValue struct { - value string + value string retain bool } type helperUsing struct { - db *usingValue + db *usingValue measurement *usingValue - timeField *usingValue + timeField *usingValue } // NewClient returns a new influxdbhelper influxClient given a url, user, @@ -138,7 +138,6 @@ func (c *helperClient) UseTimeField(fieldName string) Client { return c } - // Query executes an InfluxDb query, and unpacks the result into the // result data structure. // @@ -212,7 +211,6 @@ func (c *helperClient) WritePoint(data interface{}) error { return c.WritePointTagsFields(tags, fields, t) } - // WritePointTagsFields is used to write a point specifying tags and fields. func (c *helperClient) WritePointTagsFields(tags map[string]string, fields map[string]interface{}, t time.Time) (err error) { if c.using == nil || c.using.db == nil { diff --git a/client_test.go b/client_test.go index 27299c1..22c2a5c 100644 --- a/client_test.go +++ b/client_test.go @@ -12,7 +12,7 @@ func ExampleClient_WritePoint() { Location string `influx:"location,tag"` Temperature float64 `influx:"temperature"` Humidity float64 `influx:"humidity"` - Id string `influx:"-"` + ID string `influx:"-"` } s := EnvSample{ @@ -20,7 +20,7 @@ func ExampleClient_WritePoint() { Location: "Rm 243", Temperature: 70.0, Humidity: 60.0, - Id: "12432as32", + ID: "12432as32", } c.UseDB("myDb").UseMeasurement("test").WritePoint(s) @@ -34,7 +34,7 @@ func ExampleClient_Query() { Location string `influx:"location,tag"` Temperature float64 `influx:"temperature"` Humidity float64 `influx:"humidity"` - Id string `influx:"-"` + ID string `influx:"-"` } samplesRead := []EnvSample{} diff --git a/examples/writeread.go b/examples/writeread.go index dc36a5d..95c1f18 100644 --- a/examples/writeread.go +++ b/examples/writeread.go @@ -22,7 +22,7 @@ func Init() (err error) { if err != nil { return } - // Create MM database if it doesn't already exist + // Create test database if it doesn't already exist q := client.NewQuery("CREATE DATABASE "+db, "", "") res, err := c.Query(q) if err != nil { @@ -36,25 +36,25 @@ func Init() (err error) { } type envSample struct { - InfluxMeasurement influxdbhelper.Measurement - Time time.Time `influx:"time"` - Location string `influx:"location,tag"` - Temperature float64 `influx:"temperature"` - Humidity float64 `influx:"humidity"` - ID string `influx:"-"` + InfluxMeasurement influxdbhelper.Measurement + Time time.Time `influx:"time"` + Location string `influx:"location,tag"` + Temperature float64 `influx:"temperature"` + Humidity float64 `influx:"humidity"` + ID string `influx:"-"` } // we populate a few more fields when reading back // date to verify unused fields are handled correctly type envSampleRead struct { - InfluxMeasurement influxdbhelper.Measurement - Time time.Time `influx:"time"` - Location string `influx:"location,tag"` - City string `influx:"city,tag,field"` - Temperature float64 `influx:"temperature"` - Humidity float64 `influx:"humidity"` - Cycles float64 `influx:"cycles"` - ID string `influx:"-"` + InfluxMeasurement influxdbhelper.Measurement + Time time.Time `influx:"time"` + Location string `influx:"location,tag"` + City string `influx:"city,tag,field"` + Temperature float64 `influx:"temperature"` + Humidity float64 `influx:"humidity"` + Cycles float64 `influx:"cycles"` + ID string `influx:"-"` } func generateSampleData() []envSample { @@ -63,11 +63,11 @@ func generateSampleData() []envSample { for i := range ret { ret[i] = envSample{ InfluxMeasurement: "test", - Time: time.Now(), - Location: "Rm 243", - Temperature: 70 + float64(i), - Humidity: 60 - float64(i), - ID: "12432as32", + Time: time.Now(), + Location: "Rm 243", + Temperature: 70 + float64(i), + Humidity: 60 - float64(i), + ID: "12432as32", } } From 40bf4c3f6c0b825e10303b3e5996dee2eafd5cbd Mon Sep 17 00:00:00 2001 From: Cliff Brake <cbrake@bec-systems.com> Date: Thu, 25 Oct 2018 06:44:03 -0400 Subject: [PATCH 09/12] autoformatting and cleanup --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 499e471..9d04ee8 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ func Init() (err error) { if err != nil { return } - // Create MM database if it doesn't already exist + // Create database if it doesn't already exist q := client.NewQuery("CREATE DATABASE "+db, "", "") res, err := c.InfluxClient().Query(q) if err != nil { @@ -137,14 +137,14 @@ libraries that do similiar things, I would be very interested in learning about Todo: -* [x] handle larger query datasets (multiple series, etc) -* [x] add write capability (directly write Go structs into influxdb) -* [x] add godoc documentation -* [ ] decode/encode val0, val1, val2 fields in influx to Go array -* [ ] use Go struct field tags to help build SELECT statement -* [ ] optimize query for performace (pre-allocate slices, etc) -* [ ] come up with a better name (indecode, etc) -* [ ] finish error checking +- [x] handle larger query datasets (multiple series, etc) +- [x] add write capability (directly write Go structs into influxdb) +- [x] add godoc documentation +- [ ] decode/encode val0, val1, val2 fields in influx to Go array +- [ ] use Go struct field tags to help build SELECT statement +- [ ] optimize query for performace (pre-allocate slices, etc) +- [ ] come up with a better name (indecode, etc) +- [ ] finish error checking Review/Pull requests welcome! From 098b74c554073b3c2f96f54a0fa2071136a0b997 Mon Sep 17 00:00:00 2001 From: Cliff Brake <cbrake@bec-systems.com> Date: Thu, 25 Oct 2018 06:44:30 -0400 Subject: [PATCH 10/12] linting and autoformatting cleanup --- decode_test.go | 19 +++++++++---------- tag.go | 1 + 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/decode_test.go b/decode_test.go index c7eb97e..8b861b9 100644 --- a/decode_test.go +++ b/decode_test.go @@ -21,7 +21,7 @@ func TestDecode(t *testing.T) { "stringValue", }, Values: make([][]interface{}, 0), - Tags: map[string]string{"tagValue": "tag-value"}, + Tags: map[string]string{"tagValue": "tag-value"}, } type DecodeType struct { @@ -76,7 +76,7 @@ func TestDecodeMissingColumn(t *testing.T) { "val1", }, Values: make([][]interface{}, 0), - Tags: map[string]string{}, + Tags: map[string]string{}, } type DecodeType struct { @@ -106,7 +106,7 @@ func TestDecodeWrongType(t *testing.T) { "val2", }, Values: make([][]interface{}, 0), - Tags: map[string]string{}, + Tags: map[string]string{}, } type DecodeType struct { @@ -127,7 +127,6 @@ func TestDecodeWrongType(t *testing.T) { } } - func TestDecodeTime(t *testing.T) { data := influxModels.Row{ Name: "bla", @@ -136,7 +135,7 @@ func TestDecodeTime(t *testing.T) { "value", }, Values: make([][]interface{}, 0), - Tags: map[string]string{}, + Tags: map[string]string{}, } type DecodeType struct { @@ -172,7 +171,7 @@ func TestDecodeJsonNumber(t *testing.T) { "val2", }, Values: make([][]interface{}, 0), - Tags: map[string]string{}, + Tags: map[string]string{}, } type DecodeType struct { @@ -202,7 +201,7 @@ func TestDecodeUnsedStructValue(t *testing.T) { "val2", }, Values: make([][]interface{}, 0), - Tags: map[string]string{}, + Tags: map[string]string{}, } type DecodeType struct { @@ -232,13 +231,13 @@ func TestDecodeMeasure(t *testing.T) { "val2", }, Values: make([][]interface{}, 0), - Tags: map[string]string{}, + Tags: map[string]string{}, } type DecodeType struct { InfluxMeasurement Measurement - Val1 int `influx:"val1"` - Val2 float64 `influx:"-"` + Val1 int `influx:"val1"` + Val2 float64 `influx:"-"` } expected := []DecodeType{{"bla", 1, 0}} diff --git a/tag.go b/tag.go index 8a08828..c9528e0 100644 --- a/tag.go +++ b/tag.go @@ -4,6 +4,7 @@ import ( "strings" ) +// Measurement is a type that defines the influx db measurement. type Measurement = string type influxFieldTagData struct { From a9a2363a4dc9adc967d162d2d00f659137e4423a Mon Sep 17 00:00:00 2001 From: Cliff Brake <cbrake@bec-systems.com> Date: Thu, 25 Oct 2018 14:18:36 -0400 Subject: [PATCH 11/12] add check for null timeField --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index e130cf0..fc4101d 100644 --- a/client.go +++ b/client.go @@ -237,7 +237,7 @@ func (c *helperClient) WritePointTagsFields(tags map[string]string, fields map[s if !c.using.measurement.retain { c.using.measurement = nil } - if !c.using.timeField.retain { + if c.using.timeField != nil && !c.using.timeField.retain { c.using.timeField = nil } From 1678fdd8c5dccaf0975548ec46847415136ac456 Mon Sep 17 00:00:00 2001 From: Cliff Brake <cbrake@bec-systems.com> Date: Thu, 25 Oct 2018 14:49:31 -0400 Subject: [PATCH 12/12] document client functions a little more --- client.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index fc4101d..1bdfb9e 100644 --- a/client.go +++ b/client.go @@ -24,13 +24,18 @@ func CleanQuery(query string) string { type Client interface { influxClient.Client - // UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields + // UseDB sets the DB to use for Query, WritePoint, and WritePointTagsFields. + // This field must be set before WritePoint... calls. UseDB(db string) Client - // UseMeasurement sets the measurement to use for WritePoint, and WritePointTagsFields + // UseMeasurement sets the measurement to use for WritePoint, and WritePointTagsFields. + // If this is not set, a struct field with named InfluxMeasurement is required + // in the write data. The data passed in this call has priority over data fields in + // writes. UseMeasurement(measurement string) Client - // UseTimeField sets the time field to use for WritePoint, and WritePointTagsFields + // UseTimeField sets the time field to use for WritePoint, and WritePointTagsFields. This + // call is optional, and a data struct field with a `influx:"time"` tag can also be used. UseTimeField(fieldName string) Client // Query executes an InfluxDb query, and unpacks the result into the