diff --git a/pkg/transform/jq_common.go b/pkg/transform/jq_common.go index 467171fa..8d6c7c95 100644 --- a/pkg/transform/jq_common.go +++ b/pkg/transform/jq_common.go @@ -44,10 +44,9 @@ func GojqTransformationFunction(command string, timeoutMs int, spMode bool, jqOu return nil } - validTime, ok := a1.(time.Time) - - if !ok { - return errors.New("Not a valid time input to 'epoch' function") + validTime, err := parseTime(a1, a2) + if err != nil { + return err } return int(validTime.Unix()) @@ -59,12 +58,10 @@ func GojqTransformationFunction(command string, timeoutMs int, spMode bool, jqOu return nil } - validTime, ok := a1.(time.Time) - - if !ok { - return errors.New("Not a valid time input to 'epochMillis' function") + validTime, err := parseTime(a1, a2) + if err != nil { + return err } - return validTime.UnixMilli() }) @@ -76,6 +73,40 @@ func GojqTransformationFunction(command string, timeoutMs int, spMode bool, jqOu return runFunction(code, timeoutMs, spMode, jqOutputHandler), nil } +func parseTime(input any, params []any) (time.Time, error) { + switch v := input.(type) { + case string: + timeLayout, err := parseTimeLayout(params) + if err != nil { + return time.Time{}, err + } + + validTime, err := time.Parse(timeLayout, v) + if err != nil { + return time.Time{}, fmt.Errorf("Could not parse input - '%s' using provided time layout - '%s'", v, timeLayout) + } + return validTime, nil + case time.Time: + return v, nil + default: + return time.Time{}, fmt.Errorf("Not a valid time input to 'epochMillis' function - '%v'; expected string or time.Time", input) + } +} + +func parseTimeLayout(params []any) (string, error) { + if len(params) == 0 { + return "2006-01-02T15:04:05.999Z", nil + } else if len(params) == 1 { + str, ok := params[0].(string) + if !ok { + return "", fmt.Errorf("Function argument is invalid '%v'; expected string", params[0]) + } + return str, nil + } else { + return "", fmt.Errorf("Too many function arguments - %d; expected 1", len(params)) + } +} + func runFunction(jqcode *gojq.Code, timeoutMs int, spMode bool, jqOutputHandler JqOutputHandler) TransformationFunction { return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { input, parsedEvent, err := mkJQInput(message, interState, spMode) diff --git a/pkg/transform/jq_test.go b/pkg/transform/jq_test.go index a99c53c4..a36c69f5 100644 --- a/pkg/transform/jq_test.go +++ b/pkg/transform/jq_test.go @@ -52,6 +52,25 @@ func TestJQRunFunction_SpMode_true(t *testing.T) { ExpInterState: nil, Error: nil, }, + { + Scenario: "test_timestamp_to_epochMillis_context", + JQCommand: `{ sessionId: .contexts_com_snowplowanalytics_snowplow_client_session_1[0].firstEventTimestamp | epochMillis }`, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"sessionId":1730129267100}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, { Scenario: "test_timestamp_to_epoch", JQCommand: `{ foo: .collector_tstamp | epoch }`, @@ -273,6 +292,25 @@ func TestJQRunFunction_SpMode_false(t *testing.T) { ExpInterState: nil, Error: nil, }, + { + Scenario: "epochMillis_custom_timelayout", + JQCommand: `{ sessionId: .time | epochMillis("2006-01-02 15:04:05.999")}`, + InputMsg: &models.Message{ + Data: []byte(`{"time": "2024-10-28 15:27:47.100"}`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"sessionId":1730129267100}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, { Scenario: "epoch_on_nullable", JQCommand: ` @@ -563,7 +601,7 @@ func TestJQRunFunction_errors(t *testing.T) { }, }, ExpInterState: nil, - Error: errors.New("Not a valid time input to 'epochMillis' function"), + Error: errors.New("Could not parse input - 'value' using provided time layout - '2006-01-02T15:04:05.999Z'"), }, { Scenario: "epoch_on_non_time_type", @@ -586,7 +624,30 @@ func TestJQRunFunction_errors(t *testing.T) { }, }, ExpInterState: nil, - Error: errors.New("Not a valid time input to 'epoch' function"), + Error: errors.New("Could not parse input - 'value' using provided time layout - '2006-01-02T15:04:05.999Z'"), + }, + { + Scenario: "epochMillis_with_not_matching_timelayout", + JQConfig: &JQMapperConfig{ + JQCommand: `{ sessionId: .time | epochMillis("2006-01-02 15:04:05") }`, + RunTimeoutMs: 100, + SpMode: false, + }, + InputMsg: &models.Message{ + Data: []byte(`{"time": "2024-10-28T15:27:47.100"}`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": nil, + "filtered": nil, + "failed": { + Data: []byte(`{"time": "2024-10-28T15:27:47.100"}`), + PartitionKey: "some-key", + }, + }, + ExpInterState: nil, + Error: errors.New("Could not parse input - '2024-10-28T15:27:47.100' using provided time layout - '2006-01-02 15:04:05'"), }, } diff --git a/pkg/transform/transform_test_variables.go b/pkg/transform/transform_test_variables.go index baf800b5..f2895394 100644 --- a/pkg/transform/transform_test_variables.go +++ b/pkg/transform/transform_test_variables.go @@ -18,7 +18,7 @@ import ( ) // SnowplowTsv1 is test data -var SnowplowTsv1 = []byte(`test-data1 pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 0}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 1}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 2}},{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `) +var SnowplowTsv1 = []byte(`test-data1 pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 0}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 1}},{"schema":"iglu:com.acme/justInts/jsonschema/1-0-0", "data":{"integerField": 2}},{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}},{"schema":"iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-2","data":{"firstEventTimestamp":"2024-10-28T15:27:47.100Z"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `) // SpTsv1Parsed is test data var SpTsv1Parsed, _ = analytics.ParseEvent(string(SnowplowTsv1))