diff --git a/pkg/models/message.go b/pkg/models/message.go index 653e081a..feecb8d8 100644 --- a/pkg/models/message.go +++ b/pkg/models/message.go @@ -29,6 +29,9 @@ type Message struct { // any cleanup process for the source is actioned AckFunc func() + // Metadata holds the message's metadata + Metadata map[string]interface{} + // If the message is invalid it can be decorated with an error // message for logging and reporting err error diff --git a/pkg/target/eventhub.go b/pkg/target/eventhub.go index 62a31abf..caa3b481 100644 --- a/pkg/target/eventhub.go +++ b/pkg/target/eventhub.go @@ -185,6 +185,12 @@ func (eht *EventHubTarget) process(messages []*models.Message) (*models.TargetWr if eht.setEHPartitionKey { ehEvent.PartitionKey = &msg.PartitionKey } + // add the message's metadata to the event + if msg.Metadata != nil { + for key, val := range msg.Metadata { + ehEvent.Set(key, val) + } + } ehBatch[i] = ehEvent } diff --git a/pkg/target/eventhub_test.go b/pkg/target/eventhub_test.go index 611c5c18..069b0875 100644 --- a/pkg/target/eventhub_test.go +++ b/pkg/target/eventhub_test.go @@ -297,6 +297,10 @@ func TestWriteSuccess(t *testing.T) { // Set the partition key all to the same value to ensure that batching behaviour is down to chunking rather than EH client batching (which we test elsewhere) for _, msg := range messages { msg.PartitionKey = "testPK" + // also set metadata + msg.Metadata = map[string]interface{}{ + "key1": "value1", + } } var twres *models.TargetWriteResult @@ -307,10 +311,12 @@ func TestWriteSuccess(t *testing.T) { }() res := getResults(m.results, 1*time.Second) - // Check that we got correct amonut of batches + // Check that we got correct amount of batches assert.Equal(5, len(res)) // Check that we acked correct amount of times assert.Equal(int64(100), ackOps) + assert.Equal("value1", twres.Sent[0].Metadata["key1"]) + // Check that we got no error and the TargetWriteResult is as expected. assert.Nil(err) assert.Equal(100, len(twres.Sent)) diff --git a/pkg/transform/snowplow_enriched_add_metadata.go b/pkg/transform/snowplow_enriched_add_metadata.go new file mode 100644 index 00000000..e8912d3d --- /dev/null +++ b/pkg/transform/snowplow_enriched_add_metadata.go @@ -0,0 +1,39 @@ +// PROPRIETARY AND CONFIDENTIAL +// +// Unauthorized copying of this file via any medium is strictly prohibited. +// +// Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved. + +package transform + +import ( + "fmt" + + "github.com/snowplow-devops/stream-replicator/pkg/models" +) + +// NewSpEnrichedAddMetadataFunction returns a TransformationFunction which adds metadata to a message from a field within a Snowplow enriched event +func NewSpEnrichedAddMetadataFunction(key, field string) TransformationFunction { + return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + // Evaluate intermediateState to parsedEvent + parsedMessage, parseErr := IntermediateAsSpEnrichedParsed(intermediateState, message) + if parseErr != nil { + message.SetError(parseErr) + return nil, nil, message, nil + } + + value, err := parsedMessage.GetValue(field) + if err != nil { + message.SetError(err) + return nil, nil, message, nil + } + if message.Metadata == nil { + message.Metadata = map[string]interface{}{ + key: fmt.Sprintf("%v", value), + } + } else { + message.Metadata[key] = fmt.Sprintf("%v", value) + } + return message, nil, nil, parsedMessage + } +} diff --git a/pkg/transform/snowplow_enriched_add_metadata_test.go b/pkg/transform/snowplow_enriched_add_metadata_test.go new file mode 100644 index 00000000..54228aa1 --- /dev/null +++ b/pkg/transform/snowplow_enriched_add_metadata_test.go @@ -0,0 +1,63 @@ +// PROPRIETARY AND CONFIDENTIAL +// +// Unauthorized copying of this file via any medium is strictly prohibited. +// +// Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved. + +package transform + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/snowplow-devops/stream-replicator/pkg/models" +) + +func TestNewSpEnrichedAddMetadata(t *testing.T) { + assert := assert.New(t) + + var messageGood = models.Message{ + Data: snowplowTsv3, + PartitionKey: "some-key", + } + + var messageBad = models.Message{ + Data: nonSnowplowString, + PartitionKey: "some-key4", + } + + aidAddMetadata := NewSpEnrichedAddMetadataFunction("test-key", "app_id") + + res, _, fail, intermediate := aidAddMetadata(&messageGood, nil) + + assert.Equal("test-data3", res.Metadata["test-key"]) + assert.Equal(spTsv3Parsed, intermediate) + assert.Nil(fail) + + res, _, fail, intermediate = aidAddMetadata(&messageBad, nil) + + assert.Nil(res) + assert.Nil(intermediate) + assert.NotNil(fail) + assert.NotNil(fail.GetError()) + if fail.GetError() != nil { + assert.Equal("Cannot parse tsv event - wrong number of fields provided: 4", fail.GetError().Error()) + } + + ctstampAddMetadata := NewSpEnrichedAddMetadataFunction("test-key", "collector_tstamp") + + tstampRes, _, fail, intermediate := ctstampAddMetadata(&messageGood, nil) + + assert.Equal("2019-05-10 14:40:29.576 +0000 UTC", tstampRes.Metadata["test-key"]) + assert.Equal(spTsv3Parsed, intermediate) + assert.Nil(fail) + + pgurlportAddMetadata := NewSpEnrichedAddMetadataFunction("test-key", "page_urlport") + + intRes, _, fail, intermediate := pgurlportAddMetadata(&messageGood, nil) + + assert.Equal("80", intRes.Metadata["test-key"]) + assert.Equal(spTsv3Parsed, intermediate) + assert.Nil(fail) +} diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/transformconfig/transform_config.go index fd4d8ee2..5c4c4968 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/transformconfig/transform_config.go @@ -29,6 +29,7 @@ type Transformation struct { AtomicField string `hcl:"atomic_field,optional"` Regex string `hcl:"regex,optional"` RegexTimeout int `hcl:"regex_timeout,optional"` + MetadataKey string `hcl:"metadata_key,optional"` // for JS and Lua transformations SourceB64 string `hcl:"source_b64,optional"` TimeoutSec int `hcl:"timeout_sec,optional"` @@ -98,6 +99,13 @@ func ValidateTransformations(transformations []*Transformation) []error { } } continue + case "spEnrichedAddMetadata": + if transformation.MetadataKey == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedAddMetadata, empty key`, idx)) + } + if transformation.AtomicField == `` { + validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedAddMetadata, empty field`, idx)) + } case "spEnrichedFilterContext": if transformation.ContextFullName == `` { validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterContext, empty context full name`, idx)) diff --git a/pkg/transform/transformconfig/transform_config_test.go b/pkg/transform/transformconfig/transform_config_test.go index c42c243e..1151ac0f 100644 --- a/pkg/transform/transformconfig/transform_config_test.go +++ b/pkg/transform/transformconfig/transform_config_test.go @@ -231,6 +231,30 @@ function notMain(x) { }}, ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterContext, empty context full name"), fmt.Errorf("validation error #0 spEnrichedFilterContext, empty custom field path")}, }, + { + Name: "spEnrichedAddMetadata success", + Transformations: []*Transformation{{ + Name: "spEnrichedAddMetadata", + AtomicField: "app_id", + MetadataKey: "some-key", + }}, + }, + { + Name: "spEnrichedAddMetadata empty atomic field", + Transformations: []*Transformation{{ + Name: "spEnrichedAddMetadata", + MetadataKey: "some-key", + }}, + ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedAddMetadata, empty field")}, + }, + { + Name: "spEnrichedAddMetadata empty metadata key", + Transformations: []*Transformation{{ + Name: "spEnrichedAddMetadata", + AtomicField: "app_id", + }}, + ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedAddMetadata, empty key")}, + }, { Name: "spEnrichedFilterContext empty regex", Transformations: []*Transformation{{