Skip to content

Commit

Permalink
Configurable time layout in epoch/epochMillis JQ functions
Browse files Browse the repository at this point in the history
Before this commit `epoch`-like functions required `time.Time` type on input.
It worked well for atomic Snowplow fields like `collector_tstamp`.

It becomes problematic when some nested context fields representing time
are passed. Such fields don't use `time.Time`, they are plain strings.

This commit makes `epoch`/`epochMillis` functions more flexible:

* When input is `time.Time` - just use it as is (atomic fields).
* When input is a string - try to parse it as `time.Time`.
* When input is something different - return an error.

In case of a string there are various time layouts that could be used
for parsing. That's why `epoch`/`epochMillis` also accept additional
 string parameter representing time layout. Layout must be valid [GO
 layout](https://pkg.go.dev/time#pkg-constants). Default value is `2006-01-02T15:04:05.999Z`.
  • Loading branch information
pondzix committed Nov 7, 2024
1 parent f55fc5c commit 3e4f51e
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 12 deletions.
49 changes: 40 additions & 9 deletions pkg/transform/jq_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ func GojqTransformationFunction(command string, timeoutMs int, spMode bool, jqOu
return nil
}

validTime, ok := a1.(time.Time)

if !ok {
return errors.New("Not a valid time input to 'epoch' function")
validTime, err := parseTime(a1, a2)
if err != nil {
return err
}

return int(validTime.Unix())
Expand All @@ -59,12 +58,10 @@ func GojqTransformationFunction(command string, timeoutMs int, spMode bool, jqOu
return nil
}

validTime, ok := a1.(time.Time)

if !ok {
return errors.New("Not a valid time input to 'epochMillis' function")
validTime, err := parseTime(a1, a2)
if err != nil {
return err
}

return validTime.UnixMilli()
})

Expand All @@ -76,6 +73,40 @@ func GojqTransformationFunction(command string, timeoutMs int, spMode bool, jqOu
return runFunction(code, timeoutMs, spMode, jqOutputHandler), nil
}

func parseTime(input any, params []any) (time.Time, error) {
switch v := input.(type) {
case string:
timeLayout, err := parseTimeLayout(params)
if err != nil {
return time.Time{}, err
}

validTime, err := time.Parse(timeLayout, v)
if err != nil {
return time.Time{}, fmt.Errorf("Could not parse input - '%s' using provided time layout - '%s'", v, timeLayout)
}
return validTime, nil
case time.Time:
return v, nil
default:
return time.Time{}, fmt.Errorf("Not a valid time input to 'epochMillis' function - '%v'; expected string or time.Time", input)
}
}

func parseTimeLayout(params []any) (string, error) {
if len(params) == 0 {
return "2006-01-02T15:04:05.999Z", nil
} else if len(params) == 1 {
str, ok := params[0].(string)
if !ok {
return "", fmt.Errorf("Function argument is invalid '%v'; expected string", params[0])
}
return str, nil
} else {
return "", fmt.Errorf("Too many function arguments - %d; expected 1", len(params))
}
}

func runFunction(jqcode *gojq.Code, timeoutMs int, spMode bool, jqOutputHandler JqOutputHandler) TransformationFunction {
return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
input, parsedEvent, err := mkJQInput(message, interState, spMode)
Expand Down
65 changes: 63 additions & 2 deletions pkg/transform/jq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ func TestJQRunFunction_SpMode_true(t *testing.T) {
ExpInterState: nil,
Error: nil,
},
{
Scenario: "test_timestamp_to_epochMillis_context",
JQCommand: `{ sessionId: .contexts_com_snowplowanalytics_snowplow_client_session_1[0].firstEventTimestamp | epochMillis }`,
InputMsg: &models.Message{
Data: SnowplowTsv1,
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"sessionId":1730129267100}`),
PartitionKey: "some-key",
},
"filtered": nil,
"failed": nil,
},
ExpInterState: nil,
Error: nil,
},
{
Scenario: "test_timestamp_to_epoch",
JQCommand: `{ foo: .collector_tstamp | epoch }`,
Expand Down Expand Up @@ -273,6 +292,25 @@ func TestJQRunFunction_SpMode_false(t *testing.T) {
ExpInterState: nil,
Error: nil,
},
{
Scenario: "epochMillis_custom_timelayout",
JQCommand: `{ sessionId: .time | epochMillis("2006-01-02 15:04:05.999")}`,
InputMsg: &models.Message{
Data: []byte(`{"time": "2024-10-28 15:27:47.100"}`),
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": {
Data: []byte(`{"sessionId":1730129267100}`),
PartitionKey: "some-key",
},
"filtered": nil,
"failed": nil,
},
ExpInterState: nil,
Error: nil,
},
{
Scenario: "epoch_on_nullable",
JQCommand: `
Expand Down Expand Up @@ -563,7 +601,7 @@ func TestJQRunFunction_errors(t *testing.T) {
},
},
ExpInterState: nil,
Error: errors.New("Not a valid time input to 'epochMillis' function"),
Error: errors.New("Could not parse input - 'value' using provided time layout - '2006-01-02T15:04:05.999Z'"),
},
{
Scenario: "epoch_on_non_time_type",
Expand All @@ -586,7 +624,30 @@ func TestJQRunFunction_errors(t *testing.T) {
},
},
ExpInterState: nil,
Error: errors.New("Not a valid time input to 'epoch' function"),
Error: errors.New("Could not parse input - 'value' using provided time layout - '2006-01-02T15:04:05.999Z'"),
},
{
Scenario: "epochMillis_with_not_matching_timelayout",
JQConfig: &JQMapperConfig{
JQCommand: `{ sessionId: .time | epochMillis("2006-01-02 15:04:05") }`,
RunTimeoutMs: 100,
SpMode: false,
},
InputMsg: &models.Message{
Data: []byte(`{"time": "2024-10-28T15:27:47.100"}`),
PartitionKey: "some-key",
},
InputInterState: nil,
Expected: map[string]*models.Message{
"success": nil,
"filtered": nil,
"failed": {
Data: []byte(`{"time": "2024-10-28T15:27:47.100"}`),
PartitionKey: "some-key",
},
},
ExpInterState: nil,
Error: errors.New("Could not parse input - '2024-10-28T15:27:47.100' using provided time layout - '2006-01-02 15:04:05'"),
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/transform/transform_test_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// SnowplowTsv1 is test data
var SnowplowTsv1 = []byte(`test-data1 pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user<built-in function input> 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 0}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 1}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 2}},{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `)
var SnowplowTsv1 = []byte(`test-data1 pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user<built-in function input> 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 0}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 1}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 2}},{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}},{"schema":"iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-2","data":{"firstEventTimestamp":"2024-10-28T15:27:47.100Z"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `)

// SpTsv1Parsed is test data
var SpTsv1Parsed, _ = analytics.ParseEvent(string(SnowplowTsv1))
Expand Down

0 comments on commit 3e4f51e

Please sign in to comment.