From d688908a98c9de6daebaa8f8fa2d0ff56f45ac7b Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 11 Sep 2024 10:52:16 +0100 Subject: [PATCH] Add jq function for epoch in seconds (#367) * Rename `epoch` to `epochMillis` * Add second-granularity epoch function Including a test of chaining jq commands --- pkg/transform/jq.go | 19 ++++++++- pkg/transform/jq_test.go | 87 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/pkg/transform/jq.go b/pkg/transform/jq.go index 0d6f729b..228b983f 100644 --- a/pkg/transform/jq.go +++ b/pkg/transform/jq.go @@ -111,6 +111,8 @@ func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) { return nil, fmt.Errorf("error parsing jq command: %s", err) } + // epoch converts a time.Time to an epoch in seconds, as integer type. + // It must be an integer in order to chain with jq-native time functions withEpochFunction := gojq.WithFunction("epoch", 0, 1, func(a1 any, a2 []any) any { if a1 == nil { return nil @@ -122,10 +124,25 @@ func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) { return errors.New("Not a valid time input to 'epoch' function") } + return int(validTime.Unix()) + }) + + // epochMillis converts a time.Time to an epoch in milliseconds + withEpochMillisFunction := gojq.WithFunction("epochMillis", 0, 1, func(a1 any, a2 []any) any { + if a1 == nil { + return nil + } + + validTime, ok := a1.(time.Time) + + if !ok { + return errors.New("Not a valid time input to 'epochMillis' function") + } + return validTime.UnixMilli() }) - code, err := gojq.Compile(query, withEpochFunction) + code, err := gojq.Compile(query, withEpochMillisFunction, withEpochFunction) if err != nil { return nil, fmt.Errorf("error compiling jq query: %s", err) } diff --git a/pkg/transform/jq_test.go b/pkg/transform/jq_test.go index 62a31e53..f9178bfb 100644 --- a/pkg/transform/jq_test.go +++ b/pkg/transform/jq_test.go @@ -33,6 +33,25 @@ func TestJQRunFunction_SpMode_true(t *testing.T) { ExpInterState interface{} Error error }{ + { + Scenario: "test_timestamp_to_epochMillis", + JQCommand: `{ foo: .collector_tstamp | epochMillis }`, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"foo":1557499235972}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, { Scenario: "test_timestamp_to_epoch", JQCommand: `{ foo: .collector_tstamp | epoch }`, @@ -43,7 +62,26 @@ func TestJQRunFunction_SpMode_true(t *testing.T) { InputInterState: nil, Expected: map[string]*models.Message{ "success": { - Data: []byte(`{"foo":1557499235972}`), + Data: []byte(`{"foo":1557499235}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, + { + Scenario: "test_timestamp_to_epoch_chained", + JQCommand: `{ foo: .collector_tstamp | epoch | todateiso8601 }`, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"foo":"2019-05-10T14:40:35Z"}`), PartitionKey: "some-key", }, "filtered": nil, @@ -211,6 +249,30 @@ func TestJQRunFunction_SpMode_false(t *testing.T) { ExpInterState: nil, Error: nil, }, + { + Scenario: "epochMillis_on_nullable", + JQCommand: ` + { + explicit_null: .explicit | epochMillis, + no_such_field: .nonexistent | epochMillis, + non_null: .non_null + }`, + InputMsg: &models.Message{ + Data: []byte(`{"explicit": null, "non_null": "hello"}`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"non_null":"hello"}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, { Scenario: "epoch_on_nullable", JQCommand: ` @@ -480,6 +542,29 @@ func TestJQRunFunction_errors(t *testing.T) { ExpInterState: nil, Error: errors.New("jq query got no output"), }, + { + Scenario: "epochMillis_on_non_time_type", + JQConfig: &JQMapperConfig{ + JQCommand: `.str | epochMillis`, + RunTimeoutMs: 100, + SpMode: false, + }, + InputMsg: &models.Message{ + Data: []byte(`{"str": "value"}`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": nil, + "filtered": nil, + "failed": { + Data: []byte(`{"str": "value"}`), + PartitionKey: "some-key", + }, + }, + ExpInterState: nil, + Error: errors.New("Not a valid time input to 'epochMillis' function"), + }, { Scenario: "epoch_on_non_time_type", JQConfig: &JQMapperConfig{