diff --git a/.chloggen/regex-replace.yaml b/.chloggen/regex-replace.yaml new file mode 100644 index 0000000000000..9854e3d9a1c2b --- /dev/null +++ b/.chloggen/regex-replace.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add 'regex_replace' operator + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37443] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/adapter/register.go b/pkg/stanza/adapter/register.go index 426e456decfae..76362dd829de6 100644 --- a/pkg/stanza/adapter/register.go +++ b/pkg/stanza/adapter/register.go @@ -26,6 +26,7 @@ import ( _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/move" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/noop" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/recombine" + _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/regexreplace" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/remove" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/retain" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/router" diff --git a/pkg/stanza/docs/operators/README.md b/pkg/stanza/docs/operators/README.md index a79058c277674..dfde70ecb8af6 100644 --- a/pkg/stanza/docs/operators/README.md +++ b/pkg/stanza/docs/operators/README.md @@ -41,6 +41,7 @@ General purpose: - [move](./move.md) - [noop](./noop.md) - [recombine](./recombine.md) +- [regex_replace](./regex_replace.md) - [remove](./remove.md) - [retain](./retain.md) - [router](./router.md) diff --git a/pkg/stanza/docs/operators/regex_replace.md b/pkg/stanza/docs/operators/regex_replace.md new file mode 100644 index 0000000000000..644e1885b9161 --- /dev/null +++ b/pkg/stanza/docs/operators/regex_replace.md @@ -0,0 +1,142 @@ +## `regex_replace` operator + +The `regex_replace` operator parses the string-typed field selected by `field` with the given user-defined or well-known regular expression. +Optionally, it replaces the matched string. + +#### Regex Syntax + +This operator makes use of [Go regular expression](https://github.com/google/re2/wiki/Syntax). When writing a regex, consider using a tool such as [regex101](https://regex101.com/?flavor=golang). + +### Configuration Fields + +| Field | Default | Description | +| --- | --- | --- | +| `id` | `regex_replace` | A unique identifier for the operator. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `field` | required | The [field](../types/field.md) to strip. Must be a string. | +| `regex` | `regex` or `regex_name` required | A [Go regular expression](https://github.com/google/re2/wiki/Syntax). | +| `regex_name` | `regex` or `regex_name` required | A well-known regex to use. See below for a list of possible values. | +| `replace_with` | optional | The [field](../types/field.md) to strip. Must be a string. | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). | +| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | + +#### Well-known regular expressions + +| Name | Description | +| --- | --- | +| `ansi_control_sequences` | ANSI "Control Sequence Introducer (CSI)" escape codes starting with `ESC [` | + +### Example Configurations + +#### Collapse spaces + +Configuration: +```yaml +- type: regex_replace + regex: " +" + replace_with: " " + field: body +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "attributes": { }, + "body": "Hello World" +} +``` + + + +```json +{ + "resource": { }, + "attributes": { }, + "body": "Hello World" +} +``` + +
+ +#### Match and replace with groups + +Configuration: +```yaml +- type: regex_replace + regex: "{(.*)}" + replace_with: "${1}" + field: body +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "attributes": { }, + "body": "{a}{bb}{ccc}" +} +``` + + + +```json +{ + "resource": { }, + "attributes": { }, + "body": "abbccc" +} +``` + +
+ +#### Remove all ANSI color escape codes from the body + +Configuration: +```yaml +- type: regex_replace + regex_name: ansi_control_sequences + field: body +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "attributes": { }, + "body": "\x1b[31mred\x1b[0m" +} +``` + + + +```json +{ + "resource": { }, + "attributes": { }, + "body": "red" +} +``` + +
diff --git a/pkg/stanza/operator/transformer/regexreplace/config.go b/pkg/stanza/operator/transformer/regexreplace/config.go new file mode 100644 index 0000000000000..55f8fa55f0f9d --- /dev/null +++ b/pkg/stanza/operator/transformer/regexreplace/config.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package regexreplace // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/regexreplace" + +import ( + "fmt" + "regexp" + + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" +) + +const operatorType = "regex_replace" + +// derived from https://en.wikipedia.org/wiki/ANSI_escape_code#CSIsection +var ansiCsiEscapeRegex = regexp.MustCompile(`\x1B\[[\x30-\x3F]*[\x20-\x2F]*[\x40-\x7E]`) + +func init() { + operator.Register(operatorType, func() operator.Builder { return NewConfig() }) +} + +// NewConfig creates a new ansi_control_sequences config with default values +func NewConfig() *Config { + return NewConfigWithID(operatorType) +} + +// NewConfigWithID creates a new ansi_control_sequences config with default values +func NewConfigWithID(operatorID string) *Config { + return &Config{ + TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType), + } +} + +// Config is the configuration of an ansi_control_sequences operator. +type Config struct { + helper.TransformerConfig `mapstructure:",squash"` + RegexName string `mapstructure:"regex_name"` + Regex string `mapstructure:"regex"` + ReplaceWith string `mapstructure:"replace_with"` + Field entry.Field `mapstructure:"field"` +} + +func (c *Config) getRegexp() (*regexp.Regexp, error) { + if (c.RegexName == "") == (c.Regex == "") { + return nil, fmt.Errorf("either regex or regex_name must be set") + } + + switch c.RegexName { + case "ansi_control_sequences": + return ansiCsiEscapeRegex, nil + case "": + return regexp.Compile(c.Regex) + default: + return nil, fmt.Errorf("regex_name %s is unknown", c.RegexName) + } +} + +// Build will build an ansi_control_sequences operator. +func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error) { + transformerOperator, err := c.TransformerConfig.Build(set) + if err != nil { + return nil, err + } + + regexp, err := c.getRegexp() + if err != nil { + return nil, err + } + + return &Transformer{ + TransformerOperator: transformerOperator, + field: c.Field, + regexp: regexp, + replaceWith: c.ReplaceWith, + }, nil +} diff --git a/pkg/stanza/operator/transformer/regexreplace/config_test.go b/pkg/stanza/operator/transformer/regexreplace/config_test.go new file mode 100644 index 0000000000000..e569ffddd9f3c --- /dev/null +++ b/pkg/stanza/operator/transformer/regexreplace/config_test.go @@ -0,0 +1,151 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package regexreplace + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" +) + +// test unmarshalling of values into config struct +func TestUnmarshal(t *testing.T) { + operatortest.ConfigUnmarshalTests{ + DefaultConfig: NewConfig(), + TestsFile: filepath.Join(".", "testdata", "config.yaml"), + Tests: []operatortest.ConfigUnmarshalTest{ + { + Name: "some_regex_replace", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField("nested") + cfg.Regex = "a" + cfg.ReplaceWith = "b" + return cfg + }(), + }, + { + Name: "ansi_control_sequences_body", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField("nested") + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + }, + { + Name: "ansi_control_sequences_single_attribute", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewAttributeField("key") + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + }, + { + Name: "ansi_control_sequences_single_resource", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewResourceField("key") + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + }, + { + Name: "ansi_control_sequences_nested_body", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField("one", "two") + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + }, + { + Name: "ansi_control_sequences_nested_attribute", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewAttributeField("one", "two") + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + }, + { + Name: "ansi_control_sequences_nested_resource", + Expect: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewResourceField("one", "two") + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + }, + }, + }.Run(t) +} + +type invalidConfigCase struct { + name string + cfg *Config + expectErr string +} + +func TestInvalidConfig(t *testing.T) { + cases := []invalidConfigCase{ + { + name: "neither_regex_nor_regexname", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "" + cfg.Regex = "" + return cfg + }(), + expectErr: "either regex or regex_name must be set", + }, + { + name: "both_regex_and_regexname", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + cfg.Regex = ".*" + return cfg + }(), + expectErr: "either regex or regex_name must be set", + }, + { + name: "unknown_regex_name", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "i_do_not_exist" + return cfg + }(), + expectErr: "regex_name i_do_not_exist is unknown", + }, + { + name: "invalid_regex", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.Regex = ")" + return cfg + }(), + expectErr: "error parsing regexp: unexpected ): `)`", + }, + } + for _, tc := range cases { + t.Run("InvalidConfig/"+tc.name, func(t *testing.T) { + cfg := tc.cfg + cfg.OutputIDs = []string{"fake"} + cfg.OnError = "send" + set := componenttest.NewNopTelemetrySettings() + _, err := cfg.Build(set) + require.Equal(t, tc.expectErr, err.Error()) + }) + } +} diff --git a/pkg/stanza/operator/transformer/regexreplace/package_test.go b/pkg/stanza/operator/transformer/regexreplace/package_test.go new file mode 100644 index 0000000000000..127f3f5efc80c --- /dev/null +++ b/pkg/stanza/operator/transformer/regexreplace/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package regexreplace + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/pkg/stanza/operator/transformer/regexreplace/testdata/config.yaml b/pkg/stanza/operator/transformer/regexreplace/testdata/config.yaml new file mode 100644 index 0000000000000..6514c8d30184f --- /dev/null +++ b/pkg/stanza/operator/transformer/regexreplace/testdata/config.yaml @@ -0,0 +1,29 @@ +some_regex_replace: + type: regex_replace + field: body.nested + regex: a + replace_with: b +ansi_control_sequences_body: + type: regex_replace + regex_name: ansi_control_sequences + field: body.nested +ansi_control_sequences_nested_attribute: + type: regex_replace + regex_name: ansi_control_sequences + field: attributes.one.two +ansi_control_sequences_nested_body: + type: regex_replace + regex_name: ansi_control_sequences + field: body.one.two +ansi_control_sequences_nested_resource: + type: regex_replace + regex_name: ansi_control_sequences + field: resource.one.two +ansi_control_sequences_single_attribute: + type: regex_replace + regex_name: ansi_control_sequences + field: attributes.key +ansi_control_sequences_single_resource: + type: regex_replace + regex_name: ansi_control_sequences + field: resource.key diff --git a/pkg/stanza/operator/transformer/regexreplace/transformer.go b/pkg/stanza/operator/transformer/regexreplace/transformer.go new file mode 100644 index 0000000000000..22fc0bfbc5d81 --- /dev/null +++ b/pkg/stanza/operator/transformer/regexreplace/transformer.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package regexreplace // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/regexreplace" + +import ( + "context" + "fmt" + "regexp" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" +) + +// Transformer is an operator that performs a regex-replace on a string field. +type Transformer struct { + helper.TransformerOperator + field entry.Field + regexp *regexp.Regexp + replaceWith string +} + +func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error { + return t.ProcessBatchWith(ctx, entries, t.Process) +} + +func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error { + return t.ProcessWith(ctx, entry, t.replace) +} + +func (t *Transformer) replace(e *entry.Entry) error { + value, ok := t.field.Get(e) + if !ok { + return nil + } + + switch v := value.(type) { + case string: + s := t.regexp.ReplaceAllString(v, t.replaceWith) + return t.field.Set(e, s) + default: + return fmt.Errorf("type %T cannot be handled", value) + } +} diff --git a/pkg/stanza/operator/transformer/regexreplace/transformer_test.go b/pkg/stanza/operator/transformer/regexreplace/transformer_test.go new file mode 100644 index 0000000000000..014e669194ca4 --- /dev/null +++ b/pkg/stanza/operator/transformer/regexreplace/transformer_test.go @@ -0,0 +1,301 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package regexreplace + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" +) + +type testCase struct { + name string + cfg *Config + input func() *entry.Entry + output func() *entry.Entry + expectErr string +} + +func TestBuildAndProcess(t *testing.T) { + now := time.Now() + newTestEntry := func() *entry.Entry { + e := entry.New() + e.ObservedTimestamp = now + e.Timestamp = time.Unix(1586632809, 0) + return e + } + + cases := []testCase{ + { + name: "simple_regex_replace", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.Regex = "_+" + cfg.ReplaceWith = "," + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "a__b__c" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "a,b,c" + return e + }, + }, + { + name: "group_regex_replace", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.Regex = "{(.)}" + cfg.ReplaceWith = "${1}" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "{a}{b}{c}" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "abc" + return e + }, + }, + { + name: "no_match", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "asdf" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "asdf" + return e + }, + }, + { + name: "no_color_code", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "\x1b[m" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "" + return e + }, + }, + { + name: "single_color_code", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "\x1b[31m" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "" + return e + }, + }, + { + name: "multiple_color_codes", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "\x1b[31;1;4m" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "" + return e + }, + }, + { + name: "multiple_escapes", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "\x1b[31mred\x1b[0m" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "red" + return e + }, + }, + { + name: "preserve_other_text", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "start \x1b[31mred\x1b[0m end" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "start red end" + return e + }, + }, + { + name: "nonstandard_uppercase_m", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = "\x1b[31M" + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = "" + return e + }, + }, + { + name: "invalid_type", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewBodyField() + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Body = 123 + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Body = 123 + return e + }, + expectErr: "type int cannot be handled", + }, + { + name: "attribute", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewAttributeField("foo") + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Attributes = map[string]any{ + "foo": "\x1b[31mred\x1b[0m", + } + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Attributes = map[string]any{ + "foo": "red", + } + return e + }, + }, + { + name: "missing_field", + cfg: func() *Config { + cfg := NewConfig() + cfg.Field = entry.NewAttributeField("bar") + cfg.RegexName = "ansi_control_sequences" + return cfg + }(), + input: func() *entry.Entry { + e := newTestEntry() + e.Attributes = map[string]any{ + "foo": "\x1b[31mred\x1b[0m", + } + return e + }, + output: func() *entry.Entry { + e := newTestEntry() + e.Attributes = map[string]any{ + "foo": "\x1b[31mred\x1b[0m", + } + return e + }, + }, + } + for _, tc := range cases { + t.Run("BuildandProcess/"+tc.name, func(t *testing.T) { + cfg := tc.cfg + cfg.OutputIDs = []string{"fake"} + cfg.OnError = "send" + set := componenttest.NewNopTelemetrySettings() + op, err := cfg.Build(set) + require.NoError(t, err) + + unqouteOp := op.(*Transformer) + fake := testutil.NewFakeOutput(t) + require.NoError(t, unqouteOp.SetOutputs([]operator.Operator{fake})) + val := tc.input() + err = unqouteOp.Process(context.Background(), val) + if tc.expectErr != "" { + require.Equal(t, tc.expectErr, err.Error()) + } else { + require.NoError(t, err) + } + + // Expect entry to pass through even if error, due to OnError = "send" + fake.ExpectEntry(t, tc.output()) + }) + } +}