Skip to content

Commit

Permalink
Add second-granularity epoch function
Browse files Browse the repository at this point in the history
Including a test of chaining jq commands
  • Loading branch information
colmsnowplow committed Sep 10, 2024
1 parent cd8f9f5 commit 115996e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
19 changes: 18 additions & 1 deletion pkg/transform/jq.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,23 @@ func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) {
return nil, fmt.Errorf("error parsing jq command: %s", err)
}

// epoch converts a time.Time to an epoch in seconds, as integer type.
// It must be an integer in order to chain with jq-native time functions
withEpochFunction := gojq.WithFunction("epoch", 0, 1, func(a1 any, a2 []any) any {
if a1 == nil {
return nil
}

validTime, ok := a1.(time.Time)

if !ok {
return errors.New("Not a valid time input to 'epoch' function")
}

return int(validTime.Unix())
})

// epochMillis converts a time.Time to an epoch in milliseconds
withEpochMillisFunction := gojq.WithFunction("epochMillis", 0, 1, func(a1 any, a2 []any) any {
if a1 == nil {
return nil
Expand All @@ -125,7 +142,7 @@ func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) {
return validTime.UnixMilli()
})

code, err := gojq.Compile(query, withEpochMillisFunction)
code, err := gojq.Compile(query, withEpochMillisFunction, withEpochFunction)
if err != nil {
return nil, fmt.Errorf("error compiling jq query: %s", err)
}
Expand Down
87 changes: 86 additions & 1 deletion pkg/transform/jq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,44 @@ func TestJQRunFunction_SpMode_true(t *testing.T) {
ExpInterState: nil,
Error: nil,
},
{
Scenario: "test_timestamp_to_epoch",
JQCommand: `{ foo: .collector_tstamp | epoch }`,
InputMsg: &models.Message{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"foo":1557499235}`),
PartitionKey: "some-key",
},
"filtered": nil,
"failed": nil,
},
ExpInterState: nil,
Error: nil,
},
{
Scenario: "test_timestamp_to_epoch_chained",
JQCommand: `{ foo: .collector_tstamp | epoch | todateiso8601 }`,
InputMsg: &models.Message{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"foo":"2019-05-10T14:40:35Z"}`),
PartitionKey: "some-key",
},
"filtered": nil,
"failed": nil,
},
ExpInterState: nil,
Error: nil,
},
{
Scenario: "happy_path",
JQCommand: `{foo: .app_id}`,
Expand Down Expand Up @@ -218,6 +256,30 @@ func TestJQRunFunction_SpMode_false(t *testing.T) {
explicit_null: .explicit | epochMillis,
no_such_field: .nonexistent | epochMillis,
non_null: .non_null
}`,
InputMsg: &models.Message{
Data: []byte(`{"explicit": null, "non_null": "hello"}`),
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"non_null":"hello"}`),
PartitionKey: "some-key",
},
"filtered": nil,
"failed": nil,
},
ExpInterState: nil,
Error: nil,
},
{
Scenario: "epoch_on_nullable",
JQCommand: `
{
explicit_null: .explicit | epoch,
no_such_field: .nonexistent | epoch,
non_null: .non_null
}`,
InputMsg: &models.Message{
Data: []byte(`{"explicit": null, "non_null": "hello"}`),
Expand Down Expand Up @@ -481,7 +543,7 @@ func TestJQRunFunction_errors(t *testing.T) {
Error: errors.New("jq query got no output"),
},
{
Scenario: "epoch_on_non_time_type",
Scenario: "epochMillis_on_non_time_type",
JQConfig: &JQMapperConfig{
JQCommand: `.str | epochMillis`,
RunTimeoutMs: 100,
Expand All @@ -503,6 +565,29 @@ func TestJQRunFunction_errors(t *testing.T) {
ExpInterState: nil,
Error: errors.New("Not a valid time input to 'epochMillis' function"),
},
{
Scenario: "epoch_on_non_time_type",
JQConfig: &JQMapperConfig{
JQCommand: `.str | epoch`,
RunTimeoutMs: 100,
SpMode: false,
},
InputMsg: &models.Message{
Data: []byte(`{"str": "value"}`),
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": nil,
"filtered": nil,
"failed": {
Data: []byte(`{"str": "value"}`),
PartitionKey: "some-key",
},
},
ExpInterState: nil,
Error: errors.New("Not a valid time input to 'epoch' function"),
},
}

for _, tt := range testCases {
Expand Down

0 comments on commit 115996e

Please sign in to comment.