Skip to content

Commit

Permalink
Add JQ filter
Browse files Browse the repository at this point in the history
Adding JQ filter, which allows us to...filter messages based on a configured JQ command. Similar to already existing JQ mapper, but JQ filter requires output of a command to have boolean type.

As there is some shared logic between the mapper and the new filter, I extracted common stuff to `jq_common.go` module.
  • Loading branch information
pondzix committed Nov 6, 2024
1 parent 466ad1b commit 4fb64f0
Show file tree
Hide file tree
Showing 20 changed files with 441 additions and 158 deletions.
8 changes: 6 additions & 2 deletions assets/docs/configuration/overview-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,20 @@ stats_receiver {
buffer_sec = 20
}

// log level configuration (default: "info")
# log level configuration (default: "info")
log_level = "info"

// Specifies how failed writes to the target should be retried, depending on an error type
# Specifies how failed writes to the target should be retried, depending on an error type
retry {
transient {
# Initial delay (before first retry) for transient errors
delay_ms = 1000

# Maximum number of retries for transient errors
max_attempts = 5
}
setup {
# Initial delay (before first retry) for setup errors
delay_ms = 20000
}
}
Expand Down
4 changes: 4 additions & 0 deletions assets/docs/configuration/retry-example.hcl
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
retry {
transient {
# Initial delay (before first retry) for transient errors
delay_ms = 5000

# Maximum number of retries for transient errors
max_attempts = 10
}
setup {
# Initial delay (before first retry) for setup errors
delay_ms = 30000
}
}
3 changes: 2 additions & 1 deletion assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ target {
# Optional path to the file containing template which is used to build HTTP request based on a batch of input data
template_file = "myTemplate.file"

# 2 invalid + 1 setup error rules
# Optional HTTP response rules which are used to match HTTP response code/body and categorize it as either invalid data or target setup error.
# For example, we can have 2 invalid + 1 setup error rules:
response_rules {
# This one is a match when...
invalid {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"secret_key": "{{ env "SECRET_KEY" }}",
"data": [
{{ range $i, $data := . }} {{if $i}},{{end}} {{ prettyPrint .foo }} {{ end }}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ prettyPrint (index . 0)}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
transform {
use "jq" {
# Full JQ command which will be used to transform input data.
jq_command = <<JQEOT
{
my_app_id: .app_id,
Expand All @@ -9,7 +10,11 @@ transform {
}
JQEOT

# Optional. Timeout for execution of the script, in milliseconds.
timeout_ms = 800

# Optional, may be used when the input is a Snowplow enriched TSV.
# This will transform the data so that the root '.' JQ field contains JSON object representation of the event - with keys as returned by the Snowplow Analytics SDK.
snowplow_mode = true
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
transform {
use "jq" {
# Full JQ command which will be used to transform input data.
jq_command = "[.]"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
transform {
use "jqFilter" {
# Full JQ command which will be used to filter input data. The output must be boolean. If 'false', data is then discarded.
jq_command = "has(\"app_id\")"

# Optional. Timeout for execution of the script, in milliseconds.
timeout_ms = 800

# Optional, may be used when the input is a Snowplow enriched TSV.
# This will transform the data so that the root '.' JQ field contains JSON object representation of the event - with keys as returned by the Snowplow Analytics SDK.
snowplow_mode = true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
transform {
use "jqFilter" {
# Full JQ command which will be used to filter input data. The output must be boolean. If 'false', data is then discarded.
jq_command = "has(\"app_id\")"
}
}
4 changes: 3 additions & 1 deletion docs/configuration_transformations_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func TestBuiltinTransformationDocumentation(t *testing.T) {
transformationsToTest := []string{"base64Decode", "base64Encode", "jq"}
transformationsToTest := []string{"base64Decode", "base64Encode", "jq", "jqFilter"}

for _, tfm := range transformationsToTest {

Expand Down Expand Up @@ -161,6 +161,8 @@ func testTransformationConfig(t *testing.T, filepath string, fullExample bool) {
configObject = &engine.JSEngineConfig{}
case "jq":
configObject = &transform.JQMapperConfig{}
case "jqFilter":
configObject = &filter.JQFilterConfig{}
default:
assert.Fail(fmt.Sprint("Source not recognised: ", use.Name))
}
Expand Down
1 change: 0 additions & 1 deletion pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu

if findMatchingRule(response, ht.responseRules.Invalid) != nil {
for _, msg := range goodMsgs {
// can we use response body as an error message for invalid data?
msg.SetError(errors.New(response.Body))
}

Expand Down
79 changes: 79 additions & 0 deletions pkg/transform/filter/jq_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package filter

import (
"errors"
"fmt"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/transform"
)

// JQFilterConfig represents the configuration for the JQ filter transformation
type JQFilterConfig struct {
JQCommand string `hcl:"jq_command"`
RunTimeoutMs int `hcl:"timeout_ms,optional"`
SpMode bool `hcl:"snowplow_mode,optional"`
}

// JQFilterConfigPair is a configuration pair for the jq filter transformation
var JQFilterConfigPair = config.ConfigurationPair{
Name: "jqFilter",
Handle: jqFilterAdapterGenerator(jqFilterConfigFunction),
}

func jqFilterConfigFunction(cfg *JQFilterConfig) (transform.TransformationFunction, error) {
return transform.GojqTransformationFunction(cfg.JQCommand, cfg.RunTimeoutMs, cfg.SpMode, filterOutput)
}

func jqFilterAdapterGenerator(f func(*JQFilterConfig) (transform.TransformationFunction, error)) jqFilterAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*JQFilterConfig)
if !ok {
return nil, errors.New("invalid input, expected JQFilterConfig")
}

return f(cfg)
}
}

// This is where actual filtering is implemented, based on a JQ command output.
func filterOutput(jqOutput transform.JqCommandOutput) transform.TransformationFunction {
return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
shouldKeepMessage, isBoolean := jqOutput.(bool)

if !isBoolean {
message.SetError(fmt.Errorf("jq filter returned '%v'; expected boolean", jqOutput))
return nil, nil, message, nil
}

if !shouldKeepMessage {
return nil, message, nil, nil
}

return message, nil, nil, interState
}
}

type jqFilterAdapter func(i interface{}) (interface{}, error)

func (f jqFilterAdapter) ProvideDefault() (interface{}, error) {
return &JQFilterConfig{
RunTimeoutMs: 100,
}, nil
}

func (f jqFilterAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}
136 changes: 136 additions & 0 deletions pkg/transform/filter/jq_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package filter

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/transform"
)

func TestJQFilter_SpMode_true_keep(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowTsv1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `has("app_id")`, RunTimeoutMs: 100, SpMode: true}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(dropped)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowTsv1), string(kept.Data))
}

func TestJQFilter_SpMode_true_drop(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowTsv1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `has("non_existent_key")`, RunTimeoutMs: 100, SpMode: true}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(kept)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowTsv1), string(dropped.Data))
}

func TestJQFilter_SpMode_false_keep(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowJSON1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `has("app_id")`, RunTimeoutMs: 100, SpMode: false}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(dropped)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowJSON1), string(kept.Data))
}

func TestJQFilter_SpMode_false_drop(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowJSON1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `has("non_existent_key")`, RunTimeoutMs: 100, SpMode: false}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(kept)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowJSON1), string(dropped.Data))
}

func TestJQFilter_epoch(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowTsv1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `.collector_tstamp | epoch | . < 10`, RunTimeoutMs: 100, SpMode: true}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)
assert.Empty(kept)
assert.Empty(invalid)
assert.Equal(string(transform.SnowplowTsv1), string(dropped.Data))
}

func TestJQFilter_non_boolean_output(t *testing.T) {
assert := assert.New(t)
input := &models.Message{
Data: transform.SnowplowTsv1,
PartitionKey: "some-key",
}

config := &JQFilterConfig{JQCommand: `.collector_tstamp | epoch`, RunTimeoutMs: 100, SpMode: true}
filter := createFilter(t, config)

kept, dropped, invalid, _ := filter(input, nil)

assert.Empty(kept)
assert.Empty(dropped)
assert.Equal("jq filter returned '1557499235'; expected boolean", invalid.GetError().Error())
}

func TestJQFilter_invalid_jq_command(t *testing.T) {
assert := assert.New(t)

config := &JQFilterConfig{JQCommand: `blabla`, RunTimeoutMs: 100, SpMode: true}
filter, err := jqFilterConfigFunction(config)

assert.Nil(filter)
assert.Equal("error compiling jq query: function not defined: blabla/0", err.Error())
}

func createFilter(t *testing.T, config *JQFilterConfig) transform.TransformationFunction {
filter, err := jqFilterConfigFunction(config)
if err != nil {
t.Fatalf("failed to create transformation function with error: %q", err.Error())
}
return filter
}
Loading

0 comments on commit 4fb64f0

Please sign in to comment.