From f2d34c82e01f1ccaaa0254f65b1dc74e18fdffff Mon Sep 17 00:00:00 2001 From: jbeemster Date: Mon, 23 Jan 2023 10:10:07 +1100 Subject: [PATCH] rename --- ...configuration_transformations_docs_test.go | 8 +- ...owplow_collector_payload_thrift_to_json.go | 83 ++++++++++++++++++ ...nowplow_collector_payload_thrift_to_raw.go | 76 ----------------- ...owplow_json_to_collector_payload_thrift.go | 84 +++++++++++++++++++ ...nowplow_raw_to_collector_payload_thrift.go | 84 ------------------- .../transformconfig/transform_config.go | 4 +- .../collectorpayload/collector_payload.go | 19 ++++- 7 files changed, 191 insertions(+), 167 deletions(-) create mode 100644 pkg/transform/snowplow_collector_payload_thrift_to_json.go delete mode 100644 pkg/transform/snowplow_collector_payload_thrift_to_raw.go create mode 100644 pkg/transform/snowplow_json_to_collector_payload_thrift.go delete mode 100644 pkg/transform/snowplow_raw_to_collector_payload_thrift.go diff --git a/docs/configuration_transformations_docs_test.go b/docs/configuration_transformations_docs_test.go index 8ced239d..52703729 100644 --- a/docs/configuration_transformations_docs_test.go +++ b/docs/configuration_transformations_docs_test.go @@ -141,10 +141,10 @@ func testTransformationConfig(t *testing.T, filepath string, fullExample bool) { configObject = &transform.SetPkConfig{} case "spEnrichedToJson": configObject = &transform.EnrichedToJSONConfig{} - case "spCollectorPayloadThriftToRaw": - configObject = &transform.CollectorPayloadThriftToRawConfig{} - case "spRawToCollectorPayloadThrift": - configObject = &transform.RawToCollectorPayloadThriftConfig{} + case "spCollectorPayloadThriftToJSON": + configObject = &transform.CollectorPayloadThriftToJSONConfig{} + case "spJSONToCollectorPayloadThrift": + configObject = &transform.jsonToCollectorPayloadThriftConfig{} case "js": configObject = &engine.JSEngineConfig{} case "lua": diff --git a/pkg/transform/snowplow_collector_payload_thrift_to_json.go b/pkg/transform/snowplow_collector_payload_thrift_to_json.go new file mode 100644 index 00000000..b4bef34c --- /dev/null +++ b/pkg/transform/snowplow_collector_payload_thrift_to_json.go @@ -0,0 +1,83 @@ +// +// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Snowplow Community License Version 1.0, +// and you may not use this file except in compliance with the Snowplow Community License Version 1.0. +// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + +package transform + +import ( + "context" + "errors" + + "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/models" + + collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload" +) + +// CollectorPayloadThriftToJSONConfig is a configuration object for the spCollectorPayloadThriftToJSON transformation +type CollectorPayloadThriftToJSONConfig struct { +} + +type collectorPayloadThriftToJSONAdapter func(i interface{}) (interface{}, error) + +// Create implements the ComponentCreator interface. +func (f collectorPayloadThriftToJSONAdapter) Create(i interface{}) (interface{}, error) { + return f(i) +} + +// ProvideDefault implements the ComponentConfigurable interface +func (f collectorPayloadThriftToJSONAdapter) ProvideDefault() (interface{}, error) { + // Provide defaults + cfg := &CollectorPayloadThriftToJSONConfig{} + + return cfg, nil +} + +// adapterGenerator returns a spCollectorPayloadThriftToJSON transformation adapter. +func collectorPayloadThriftToJSONAdapterGenerator(f func(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error)) collectorPayloadThriftToJSONAdapter { + return func(i interface{}) (interface{}, error) { + cfg, ok := i.(*CollectorPayloadThriftToJSONConfig) + if !ok { + return nil, errors.New("invalid input, expected collectorPayloadThriftToJSONConfig") + } + + return f(cfg) + } +} + +// collectorPayloadThriftToJSONConfigFunction returns an spCollectorPayloadThriftToJSON transformation function, from an collectorPayloadThriftToJSONConfig. +func collectorPayloadThriftToJSONConfigFunction(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error) { + return SpCollectorPayloadThriftToJSON, nil +} + +// CollectorPayloadThriftToJSONConfigPair is a configuration pair for the spCollectorPayloadThriftToJSON transformation +var CollectorPayloadThriftToJSONConfigPair = config.ConfigurationPair{ + Name: "spCollectorPayloadThriftToJSON", + Handle: collectorPayloadThriftToJSONAdapterGenerator(collectorPayloadThriftToJSONConfigFunction), +} + +// SpCollectorPayloadThriftToJSON is a specific transformation implementation to transform a Thrift encoded Collector Payload +// to a JSON string representation. +func SpCollectorPayloadThriftToJSON(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + ctx := context.Background() + + // Deserialize the Collector Payload to a struct + res, deserializeErr := collectorpayload.BinaryDeserializer(ctx, message.Data) + if deserializeErr != nil { + message.SetError(deserializeErr) + return nil, nil, message, nil + } + + // Re-encode as a JSON string to be able to leverage it downstream + resJSON, jsonErr := collectorpayload.ToJSON(res) + if jsonErr != nil { + message.SetError(jsonErr) + return nil, nil, message, nil + } + + message.Data = resJSON + return message, nil, nil, intermediateState +} diff --git a/pkg/transform/snowplow_collector_payload_thrift_to_raw.go b/pkg/transform/snowplow_collector_payload_thrift_to_raw.go deleted file mode 100644 index fddfccfb..00000000 --- a/pkg/transform/snowplow_collector_payload_thrift_to_raw.go +++ /dev/null @@ -1,76 +0,0 @@ -// -// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. -// -// This program is licensed to you under the Snowplow Community License Version 1.0, -// and you may not use this file except in compliance with the Snowplow Community License Version 1.0. -// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 - -package transform - -import ( - "context" - "errors" - "fmt" - - "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/models" - - collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload" -) - -// CollectorPayloadThriftToRawConfig is a configuration object for the spCollectorPayloadThriftToRaw transformation -type CollectorPayloadThriftToRawConfig struct { -} - -type collectorPayloadThriftToRawAdapter func(i interface{}) (interface{}, error) - -// Create implements the ComponentCreator interface. -func (f collectorPayloadThriftToRawAdapter) Create(i interface{}) (interface{}, error) { - return f(i) -} - -// ProvideDefault implements the ComponentConfigurable interface -func (f collectorPayloadThriftToRawAdapter) ProvideDefault() (interface{}, error) { - // Provide defaults - cfg := &CollectorPayloadThriftToRawConfig{} - - return cfg, nil -} - -// adapterGenerator returns a spCollectorPayloadThriftToRaw transformation adapter. -func collectorPayloadThriftToRawAdapterGenerator(f func(c *CollectorPayloadThriftToRawConfig) (TransformationFunction, error)) collectorPayloadThriftToRawAdapter { - return func(i interface{}) (interface{}, error) { - cfg, ok := i.(*CollectorPayloadThriftToRawConfig) - if !ok { - return nil, errors.New("invalid input, expected collectorPayloadThriftToRawConfig") - } - - return f(cfg) - } -} - -// collectorPayloadThriftToRawConfigFunction returns an spCollectorPayloadThriftToRaw transformation function, from an collectorPayloadThriftToRawConfig. -func collectorPayloadThriftToRawConfigFunction(c *CollectorPayloadThriftToRawConfig) (TransformationFunction, error) { - return SpCollectorPayloadThriftToRaw, nil -} - -// CollectorPayloadThriftToRawConfigPair is a configuration pair for the spCollectorPayloadThriftToRaw transformation -var CollectorPayloadThriftToRawConfigPair = config.ConfigurationPair{ - Name: "spCollectorPayloadThriftToRaw", - Handle: collectorPayloadThriftToRawAdapterGenerator(collectorPayloadThriftToRawConfigFunction), -} - -// SpCollectorPayloadThriftToRaw is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload -// so that it can be pushed directly into the egress stream of a Collector. -func SpCollectorPayloadThriftToRaw(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { - ctx := context.Background() - - res, deserializeErr := collectorpayload.BinaryDeserializer(ctx, message.Data) - if deserializeErr != nil { - message.SetError(deserializeErr) - return nil, nil, message, nil - } - - message.Data = []byte(fmt.Sprintf("%#v", res)) - return message, nil, nil, intermediateState -} diff --git a/pkg/transform/snowplow_json_to_collector_payload_thrift.go b/pkg/transform/snowplow_json_to_collector_payload_thrift.go new file mode 100644 index 00000000..f0ddda21 --- /dev/null +++ b/pkg/transform/snowplow_json_to_collector_payload_thrift.go @@ -0,0 +1,84 @@ +// +// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Snowplow Community License Version 1.0, +// and you may not use this file except in compliance with the Snowplow Community License Version 1.0. +// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + +package transform + +import ( + "context" + "encoding/json" + "errors" + + "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/models" + + collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload" + collectorpayloadmodel1 "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload/gen-go/model1" +) + +// JSONToCollectorPayloadThriftConfig is a configuration object for the spJSONToCollectorPayloadThrift transformation +type JSONToCollectorPayloadThriftConfig struct { +} + +type jsonToCollectorPayloadThriftAdapter func(i interface{}) (interface{}, error) + +// Create implements the ComponentCreator interface. +func (f jsonToCollectorPayloadThriftAdapter) Create(i interface{}) (interface{}, error) { + return f(i) +} + +// ProvideDefault implements the ComponentConfigurable interface +func (f jsonToCollectorPayloadThriftAdapter) ProvideDefault() (interface{}, error) { + // Provide defaults + cfg := &JSONToCollectorPayloadThriftConfig{} + + return cfg, nil +} + +// adapterGenerator returns a spJSONToCollectorPayloadThrift transformation adapter. +func jsonToCollectorPayloadThriftAdapterGenerator(f func(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error)) jsonToCollectorPayloadThriftAdapter { + return func(i interface{}) (interface{}, error) { + cfg, ok := i.(*JSONToCollectorPayloadThriftConfig) + if !ok { + return nil, errors.New("invalid input, expected jsonToCollectorPayloadThriftConfig") + } + + return f(cfg) + } +} + +// jsonToCollectorPayloadThriftConfigFunction returns an spJSONToCollectorPayloadThrift transformation function, from an jsonToCollectorPayloadThriftConfig. +func jsonToCollectorPayloadThriftConfigFunction(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error) { + return SpJSONToCollectorPayloadThrift, nil +} + +// JSONToCollectorPayloadThriftConfigPair is a configuration pair for the spJSONToCollectorPayloadThrift transformation +var JSONToCollectorPayloadThriftConfigPair = config.ConfigurationPair{ + Name: "spJSONToCollectorPayloadThrift", + Handle: jsonToCollectorPayloadThriftAdapterGenerator(jsonToCollectorPayloadThriftConfigFunction), +} + +// SpJSONToCollectorPayloadThrift is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload +// so that it can be pushed directly into the egress stream of a Collector. +func SpJSONToCollectorPayloadThrift(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + var p *collectorpayloadmodel1.CollectorPayload + unmarshallErr := json.Unmarshal(message.Data, &p) + if unmarshallErr != nil { + message.SetError(unmarshallErr) + return nil, nil, message, nil + } + + ctx := context.Background() + + res, serializeErr := collectorpayload.BinarySerializer(ctx, p) + if serializeErr != nil { + message.SetError(serializeErr) + return nil, nil, message, nil + } + + message.Data = res + return message, nil, nil, intermediateState +} diff --git a/pkg/transform/snowplow_raw_to_collector_payload_thrift.go b/pkg/transform/snowplow_raw_to_collector_payload_thrift.go deleted file mode 100644 index b05addb3..00000000 --- a/pkg/transform/snowplow_raw_to_collector_payload_thrift.go +++ /dev/null @@ -1,84 +0,0 @@ -// -// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. -// -// This program is licensed to you under the Snowplow Community License Version 1.0, -// and you may not use this file except in compliance with the Snowplow Community License Version 1.0. -// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 - -package transform - -import ( - "context" - "errors" - "encoding/json" - - "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/models" - - collectorpayloadmodel1 "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload/gen-go/model1" - collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload" -) - -// RawToCollectorPayloadThriftConfig is a configuration object for the spRawToCollectorPayloadThrift transformation -type RawToCollectorPayloadThriftConfig struct { -} - -type rawToCollectorPayloadThriftAdapter func(i interface{}) (interface{}, error) - -// Create implements the ComponentCreator interface. -func (f rawToCollectorPayloadThriftAdapter) Create(i interface{}) (interface{}, error) { - return f(i) -} - -// ProvideDefault implements the ComponentConfigurable interface -func (f rawToCollectorPayloadThriftAdapter) ProvideDefault() (interface{}, error) { - // Provide defaults - cfg := &RawToCollectorPayloadThriftConfig{} - - return cfg, nil -} - -// adapterGenerator returns a spRawToCollectorPayloadThrift transformation adapter. -func rawToCollectorPayloadThriftAdapterGenerator(f func(c *RawToCollectorPayloadThriftConfig) (TransformationFunction, error)) rawToCollectorPayloadThriftAdapter { - return func(i interface{}) (interface{}, error) { - cfg, ok := i.(*RawToCollectorPayloadThriftConfig) - if !ok { - return nil, errors.New("invalid input, expected rawToCollectorPayloadThriftConfig") - } - - return f(cfg) - } -} - -// rawToCollectorPayloadThriftConfigFunction returns an spRawToCollectorPayloadThrift transformation function, from an rawToCollectorPayloadThriftConfig. -func rawToCollectorPayloadThriftConfigFunction(c *RawToCollectorPayloadThriftConfig) (TransformationFunction, error) { - return SpRawToCollectorPayloadThrift, nil -} - -// RawToCollectorPayloadThriftConfigPair is a configuration pair for the spRawToCollectorPayloadThrift transformation -var RawToCollectorPayloadThriftConfigPair = config.ConfigurationPair{ - Name: "spRawToCollectorPayloadThrift", - Handle: rawToCollectorPayloadThriftAdapterGenerator(rawToCollectorPayloadThriftConfigFunction), -} - -// SpRawToCollectorPayloadThrift is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload -// so that it can be pushed directly into the egress stream of a Collector. -func SpRawToCollectorPayloadThrift(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { - var p *collectorpayloadmodel1.CollectorPayload - unmarshallErr := json.Unmarshal(message.Data, &p) - if unmarshallErr != nil { - message.SetError(unmarshallErr) - return nil, nil, message, nil - } - - ctx := context.Background() - - res, serializeErr := collectorpayload.BinarySerializer(ctx, p) - if serializeErr != nil { - message.SetError(serializeErr) - return nil, nil, message, nil - } - - message.Data = res - return message, nil, nil, intermediateState -} diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/transformconfig/transform_config.go index d9a25990..e7806c1f 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/transformconfig/transform_config.go @@ -23,8 +23,8 @@ var SupportedTransformations = []config.ConfigurationPair{ filter.ContextFilterConfigPair, transform.SetPkConfigPair, transform.EnrichedToJSONConfigPair, - transform.CollectorPayloadThriftToRawConfigPair, - transform.RawToCollectorPayloadThriftConfigPair, + transform.CollectorPayloadThriftToJSONConfigPair, + transform.JSONToCollectorPayloadThriftConfigPair, engine.LuaConfigPair, engine.JSConfigPair, } diff --git a/third_party/snowplow/collectorpayload/collector_payload.go b/third_party/snowplow/collectorpayload/collector_payload.go index a54784e7..4f3fb468 100644 --- a/third_party/snowplow/collectorpayload/collector_payload.go +++ b/third_party/snowplow/collectorpayload/collector_payload.go @@ -9,6 +9,8 @@ package collectorpayload import ( "context" + "encoding/base64" + "encoding/json" thrift "github.com/apache/thrift/lib/go/thrift" @@ -30,6 +32,16 @@ func BinarySerializer(ctx context.Context, collectorPayload *model1.CollectorPay // BinaryDeserializer deserializes a CollectorPayload byte array back to a struct func BinaryDeserializer(ctx context.Context, collectorPayloadBytes []byte) (*model1.CollectorPayload, error) { + var inputBytes []byte + + // Attempt to decode from base64 as most payloads will arrive with the thrift string re-encoded + base64DecodedCollectorPayload, base64Err := base64.StdEncoding.DecodeString(string(collectorPayloadBytes)) + if base64Err != nil { + inputBytes = collectorPayloadBytes + } else { + inputBytes = []byte(base64DecodedCollectorPayload) + } + t := thrift.NewTMemoryBufferLen(1024) p := thrift.NewTBinaryProtocolFactoryDefault().GetProtocol(t) @@ -39,7 +51,12 @@ func BinaryDeserializer(ctx context.Context, collectorPayloadBytes []byte) (*mod } collectorPayload := model1.NewCollectorPayload() - err := deserializer.Read(ctx, collectorPayload, collectorPayloadBytes) + err := deserializer.Read(ctx, collectorPayload, inputBytes) return collectorPayload, err } + +// ToJSON converts the collector payload struct to a JSON representation for simpler portability +func ToJSON(collectorPayload *model1.CollectorPayload) ([]byte, error) { + return json.Marshal(collectorPayload) +}