From 115996e7c6cc8746344ccdc17db65f13ab223e3b Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Tue, 10 Sep 2024 18:33:42 +0100 Subject: [PATCH] Add second-granularity epoch function Including a test of chaining jq commands --- pkg/transform/jq.go | 19 ++++++++- pkg/transform/jq_test.go | 87 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/pkg/transform/jq.go b/pkg/transform/jq.go index 10327b0b..228b983f 100644 --- a/pkg/transform/jq.go +++ b/pkg/transform/jq.go @@ -111,6 +111,23 @@ func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) { return nil, fmt.Errorf("error parsing jq command: %s", err) } + // epoch converts a time.Time to an epoch in seconds, as integer type. + // It must be an integer in order to chain with jq-native time functions + withEpochFunction := gojq.WithFunction("epoch", 0, 1, func(a1 any, a2 []any) any { + if a1 == nil { + return nil + } + + validTime, ok := a1.(time.Time) + + if !ok { + return errors.New("Not a valid time input to 'epoch' function") + } + + return int(validTime.Unix()) + }) + + // epochMillis converts a time.Time to an epoch in milliseconds withEpochMillisFunction := gojq.WithFunction("epochMillis", 0, 1, func(a1 any, a2 []any) any { if a1 == nil { return nil @@ -125,7 +142,7 @@ func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) { return validTime.UnixMilli() }) - code, err := gojq.Compile(query, withEpochMillisFunction) + code, err := gojq.Compile(query, withEpochMillisFunction, withEpochFunction) if err != nil { return nil, fmt.Errorf("error compiling jq query: %s", err) } diff --git a/pkg/transform/jq_test.go b/pkg/transform/jq_test.go index 3cd77dab..f9178bfb 100644 --- a/pkg/transform/jq_test.go +++ b/pkg/transform/jq_test.go @@ -52,6 +52,44 @@ func TestJQRunFunction_SpMode_true(t *testing.T) { ExpInterState: nil, Error: nil, }, + { + Scenario: "test_timestamp_to_epoch", + JQCommand: `{ foo: .collector_tstamp | epoch }`, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"foo":1557499235}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, + { + Scenario: "test_timestamp_to_epoch_chained", + JQCommand: `{ foo: .collector_tstamp | epoch | todateiso8601 }`, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"foo":"2019-05-10T14:40:35Z"}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, { Scenario: "happy_path", JQCommand: `{foo: .app_id}`, @@ -218,6 +256,30 @@ func TestJQRunFunction_SpMode_false(t *testing.T) { explicit_null: .explicit | epochMillis, no_such_field: .nonexistent | epochMillis, non_null: .non_null + }`, + InputMsg: &models.Message{ + Data: []byte(`{"explicit": null, "non_null": "hello"}`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"non_null":"hello"}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, + { + Scenario: "epoch_on_nullable", + JQCommand: ` + { + explicit_null: .explicit | epoch, + no_such_field: .nonexistent | epoch, + non_null: .non_null }`, InputMsg: &models.Message{ Data: []byte(`{"explicit": null, "non_null": "hello"}`), @@ -481,7 +543,7 @@ func TestJQRunFunction_errors(t *testing.T) { Error: errors.New("jq query got no output"), }, { - Scenario: "epoch_on_non_time_type", + Scenario: "epochMillis_on_non_time_type", JQConfig: &JQMapperConfig{ JQCommand: `.str | epochMillis`, RunTimeoutMs: 100, @@ -503,6 +565,29 @@ func TestJQRunFunction_errors(t *testing.T) { ExpInterState: nil, Error: errors.New("Not a valid time input to 'epochMillis' function"), }, + { + Scenario: "epoch_on_non_time_type", + JQConfig: &JQMapperConfig{ + JQCommand: `.str | epoch`, + RunTimeoutMs: 100, + SpMode: false, + }, + InputMsg: &models.Message{ + Data: []byte(`{"str": "value"}`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": nil, + "filtered": nil, + "failed": { + Data: []byte(`{"str": "value"}`), + PartitionKey: "some-key", + }, + }, + ExpInterState: nil, + Error: errors.New("Not a valid time input to 'epoch' function"), + }, } for _, tt := range testCases {