Skip to content

Commit

Permalink
Allow setting properties metadata for EventHubs target (closes #99)
Browse files Browse the repository at this point in the history
  • Loading branch information
TiganeteaRobert committed Aug 23, 2022
1 parent 54c45ba commit d832b17
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/models/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Message struct {
// any cleanup process for the source is actioned
AckFunc func()

// Metadata holds the message's metadata
Metadata map[string]interface{}

// If the message is invalid it can be decorated with an error
// message for logging and reporting
err error
Expand Down
6 changes: 6 additions & 0 deletions pkg/target/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ func (eht *EventHubTarget) process(messages []*models.Message) (*models.TargetWr
if eht.setEHPartitionKey {
ehEvent.PartitionKey = &msg.PartitionKey
}
// add the message's metadata to the event
if msg.Metadata != nil {
for key, val := range msg.Metadata {
ehEvent.Set(key, val)
}
}
ehBatch[i] = ehEvent
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/target/eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func TestWriteSuccess(t *testing.T) {
// Set the partition key all to the same value to ensure that batching behaviour is down to chunking rather than EH client batching (which we test elsewhere)
for _, msg := range messages {
msg.PartitionKey = "testPK"
// also set metadata
msg.Metadata = map[string]interface{}{
"key1": "value1",
}
}

var twres *models.TargetWriteResult
Expand All @@ -307,10 +311,12 @@ func TestWriteSuccess(t *testing.T) {
}()
res := getResults(m.results, 1*time.Second)

// Check that we got correct amonut of batches
// Check that we got correct amount of batches
assert.Equal(5, len(res))
// Check that we acked correct amount of times
assert.Equal(int64(100), ackOps)
assert.Equal("value1", twres.Sent[0].Metadata["key1"])

// Check that we got no error and the TargetWriteResult is as expected.
assert.Nil(err)
assert.Equal(100, len(twres.Sent))
Expand Down
39 changes: 39 additions & 0 deletions pkg/transform/snowplow_enriched_add_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// PROPRIETARY AND CONFIDENTIAL
//
// Unauthorized copying of this file via any medium is strictly prohibited.
//
// Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved.

package transform

import (
"fmt"

"github.com/snowplow-devops/stream-replicator/pkg/models"
)

// NewSpEnrichedAddMetadataFunction returns a TransformationFunction which adds metadata to a message from a field within a Snowplow enriched event
func NewSpEnrichedAddMetadataFunction(key, field string) TransformationFunction {
return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
// Evaluate intermediateState to parsedEvent
parsedMessage, parseErr := IntermediateAsSpEnrichedParsed(intermediateState, message)
if parseErr != nil {
message.SetError(parseErr)
return nil, nil, message, nil
}

value, err := parsedMessage.GetValue(field)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}
if message.Metadata == nil {
message.Metadata = map[string]interface{}{
key: fmt.Sprintf("%v", value),
}
} else {
message.Metadata[key] = fmt.Sprintf("%v", value)
}
return message, nil, nil, parsedMessage
}
}
63 changes: 63 additions & 0 deletions pkg/transform/snowplow_enriched_add_metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// PROPRIETARY AND CONFIDENTIAL
//
// Unauthorized copying of this file via any medium is strictly prohibited.
//
// Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved.

package transform

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/snowplow-devops/stream-replicator/pkg/models"
)

func TestNewSpEnrichedAddMetadata(t *testing.T) {
assert := assert.New(t)

var messageGood = models.Message{
Data: snowplowTsv3,
PartitionKey: "some-key",
}

var messageBad = models.Message{
Data: nonSnowplowString,
PartitionKey: "some-key4",
}

aidAddMetadata := NewSpEnrichedAddMetadataFunction("test-key", "app_id")

res, _, fail, intermediate := aidAddMetadata(&messageGood, nil)

assert.Equal("test-data3", res.Metadata["test-key"])
assert.Equal(spTsv3Parsed, intermediate)
assert.Nil(fail)

res, _, fail, intermediate = aidAddMetadata(&messageBad, nil)

assert.Nil(res)
assert.Nil(intermediate)
assert.NotNil(fail)
assert.NotNil(fail.GetError())
if fail.GetError() != nil {
assert.Equal("Cannot parse tsv event - wrong number of fields provided: 4", fail.GetError().Error())
}

ctstampAddMetadata := NewSpEnrichedAddMetadataFunction("test-key", "collector_tstamp")

tstampRes, _, fail, intermediate := ctstampAddMetadata(&messageGood, nil)

assert.Equal("2019-05-10 14:40:29.576 +0000 UTC", tstampRes.Metadata["test-key"])
assert.Equal(spTsv3Parsed, intermediate)
assert.Nil(fail)

pgurlportAddMetadata := NewSpEnrichedAddMetadataFunction("test-key", "page_urlport")

intRes, _, fail, intermediate := pgurlportAddMetadata(&messageGood, nil)

assert.Equal("80", intRes.Metadata["test-key"])
assert.Equal(spTsv3Parsed, intermediate)
assert.Nil(fail)
}
8 changes: 8 additions & 0 deletions pkg/transform/transformconfig/transform_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Transformation struct {
AtomicField string `hcl:"atomic_field,optional"`
Regex string `hcl:"regex,optional"`
RegexTimeout int `hcl:"regex_timeout,optional"`
MetadataKey string `hcl:"metadata_key,optional"`
// for JS and Lua transformations
SourceB64 string `hcl:"source_b64,optional"`
TimeoutSec int `hcl:"timeout_sec,optional"`
Expand Down Expand Up @@ -98,6 +99,13 @@ func ValidateTransformations(transformations []*Transformation) []error {
}
}
continue
case "spEnrichedAddMetadata":
if transformation.MetadataKey == `` {
validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedAddMetadata, empty key`, idx))
}
if transformation.AtomicField == `` {
validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedAddMetadata, empty field`, idx))
}
case "spEnrichedFilterContext":
if transformation.ContextFullName == `` {
validationErrors = append(validationErrors, fmt.Errorf(`validation error #%d spEnrichedFilterContext, empty context full name`, idx))
Expand Down
24 changes: 24 additions & 0 deletions pkg/transform/transformconfig/transform_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,30 @@ function notMain(x) {
}},
ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedFilterContext, empty context full name"), fmt.Errorf("validation error #0 spEnrichedFilterContext, empty custom field path")},
},
{
Name: "spEnrichedAddMetadata success",
Transformations: []*Transformation{{
Name: "spEnrichedAddMetadata",
AtomicField: "app_id",
MetadataKey: "some-key",
}},
},
{
Name: "spEnrichedAddMetadata empty atomic field",
Transformations: []*Transformation{{
Name: "spEnrichedAddMetadata",
MetadataKey: "some-key",
}},
ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedAddMetadata, empty field")},
},
{
Name: "spEnrichedAddMetadata empty metadata key",
Transformations: []*Transformation{{
Name: "spEnrichedAddMetadata",
AtomicField: "app_id",
}},
ExpectedErrs: []error{fmt.Errorf("validation error #0 spEnrichedAddMetadata, empty key")},
},
{
Name: "spEnrichedFilterContext empty regex",
Transformations: []*Transformation{{
Expand Down

0 comments on commit d832b17

Please sign in to comment.