diff --git a/.chloggen/add_container_parser.yaml b/.chloggen/add_container_parser.yaml
new file mode 100644
index 000000000000..b6b4406b8f43
--- /dev/null
+++ b/.chloggen/add_container_parser.yaml
@@ -0,0 +1,27 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: enhancement
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: filelogreceiver
+
+# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: Add container operator parser
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [31959]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext:
+
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: [user]
diff --git a/pkg/stanza/adapter/register.go b/pkg/stanza/adapter/register.go
index 8105ef17d587..426e456decfa 100644
--- a/pkg/stanza/adapter/register.go
+++ b/pkg/stanza/adapter/register.go
@@ -6,6 +6,7 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con
import (
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/file" // Register parsers and transformers for stanza-based log receivers
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout"
+ _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container"
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/csv"
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/json"
_ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/jsonarray"
diff --git a/pkg/stanza/docs/operators/container.md b/pkg/stanza/docs/operators/container.md
new file mode 100644
index 000000000000..4cc972fbc5ed
--- /dev/null
+++ b/pkg/stanza/docs/operators/container.md
@@ -0,0 +1,238 @@
+## `container` operator
+
+The `container` operator parses logs in `docker`, `cri-o` and `containerd` formats.
+
+### Configuration Fields
+
+| Field | Default | Description |
+|------------------------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `id` | `container` | A unique identifier for the operator. |
+| `format` | `` | The container log format to use if it is known. Users can choose between `docker`, `crio` and `containerd`. If not set, the format will be automatically detected. |
+| `add_metadata_from_filepath` | `true` | Set if k8s metadata should be added from the file path. Requires the `log.file.path` field to be present. |
+| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
+| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. |
+| `parse_to` | `attributes` | The [field](../types/field.md) to which the value will be parsed. |
+| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
+| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |
+| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. |
+
+
+### Embedded Operations
+
+The `container` parser can be configured to embed certain operations such as the severity parsing. For more information, see [complex parsers](../types/parsers.md#complex-parsers).
+
+### Add metadata from file path
+
+Requires `include_file_path: true` in order for the `log.file.path` field to be available for the operator.
+If that's not possible, users can disable the metadata addition with `add_metadata_from_filepath: false`.
+A file path like `"/var/log/pods/some-ns_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"`,
+will produce the following k8s metadata:
+
+```json
+{
+ "attributes": {
+ "k8s": {
+ "container": {
+ "name": "kube-controller",
+ "restart_count": "1"
+ }, "pod": {
+ "uid": "49cc7c1fd3702c40b2686ea7486091d6",
+ "name": "kube-controller-kind-control-plane"
+ }, "namespace": {
+ "name": "some-ns"
+ }
+ }
+ }
+}
+```
+
+### Example Configurations:
+
+#### Parse the body as docker container log
+
+Configuration:
+```yaml
+- type: container
+ format: docker
+ add_metadata_from_filepath: true
+```
+
+Note: in this example the `format: docker` is optional since formats can be automatically detected as well.
+ `add_metadata_from_filepath` is true by default as well.
+
+
+ Input body Output body
+
+
+
+```json
+{
+ "timestamp": "",
+ "body": "{\"log\":\"INFO: log line here\",\"stream\":\"stdout\",\"time\":\"2029-03-30T08:31:20.545192187Z\"}",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+}
+```
+
+
+
+
+```json
+{
+ "timestamp": "2024-03-30 08:31:20.545192187 +0000 UTC",
+ "body": "log line here",
+ "attributes": {
+ "time": "2024-03-30T08:31:20.545192187Z",
+ "log.iostream": "stdout",
+ "k8s.pod.name": "kube-controller-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6",
+ "k8s.container.name": "kube-controller",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+ }
+}
+```
+
+
+
+
+
+#### Parse the body as cri-o container log
+
+Configuration:
+```yaml
+- type: container
+```
+
+
+ Input body Output body
+
+
+
+```json
+{
+ "timestamp": "",
+ "body": "2024-04-13T07:59:37.505201169-05:00 stdout F standalone crio line which is awesome",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+}
+```
+
+
+
+
+```json
+{
+ "timestamp": "2024-04-13 12:59:37.505201169 +0000 UTC",
+ "body": "standalone crio line which is awesome",
+ "attributes": {
+ "time": "2024-04-13T07:59:37.505201169-05:00",
+ "logtag": "F",
+ "log.iostream": "stdout",
+ "k8s.pod.name": "kube-controller-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6",
+ "k8s.container.name": "kube-controller",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+ }
+}
+```
+
+
+
+
+
+#### Parse the body as containerd container log
+
+Configuration:
+```yaml
+- type: container
+```
+
+
+ Input body Output body
+
+
+
+```json
+{
+ "timestamp": "",
+ "body": "2023-06-22T10:27:25.813799277Z stdout F standalone containerd line that is super awesome",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+}
+```
+
+
+
+
+```json
+{
+ "timestamp": "2023-06-22 10:27:25.813799277 +0000 UTC",
+ "body": "standalone containerd line that is super awesome",
+ "attributes": {
+ "time": "2023-06-22T10:27:25.813799277Z",
+ "logtag": "F",
+ "log.iostream": "stdout",
+ "k8s.pod.name": "kube-controller-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6",
+ "k8s.container.name": "kube-controller",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+ }
+}
+```
+
+
+
+
+
+#### Parse the multiline as containerd container log and recombine into a single one
+
+Configuration:
+```yaml
+- type: container
+```
+
+
+ Input body Output body
+
+
+
+```json
+{
+ "timestamp": "",
+ "body": "2023-06-22T10:27:25.813799277Z stdout P multiline containerd line that i",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+},
+{
+ "timestamp": "",
+ "body": "2023-06-22T10:27:25.813799277Z stdout F s super awesomne",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+}
+```
+
+
+
+
+```json
+{
+ "timestamp": "2023-06-22 10:27:25.813799277 +0000 UTC",
+ "body": "multiline containerd line that is super awesome",
+ "attributes": {
+ "time": "2023-06-22T10:27:25.813799277Z",
+ "logtag": "F",
+ "log.iostream": "stdout",
+ "k8s.pod.name": "kube-controller-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6",
+ "k8s.container.name": "kube-controller",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"
+ }
+}
+```
+
+
+
+
\ No newline at end of file
diff --git a/pkg/stanza/operator/helper/regexp.go b/pkg/stanza/operator/helper/regexp.go
new file mode 100644
index 000000000000..7306926ced79
--- /dev/null
+++ b/pkg/stanza/operator/helper/regexp.go
@@ -0,0 +1,28 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
+
+import (
+ "fmt"
+ "regexp"
+)
+
+func MatchValues(value string, regexp *regexp.Regexp) (map[string]any, error) {
+ matches := regexp.FindStringSubmatch(value)
+ if matches == nil {
+ return nil, fmt.Errorf("regex pattern does not match")
+ }
+
+ parsedValues := map[string]any{}
+ for i, subexp := range regexp.SubexpNames() {
+ if i == 0 {
+ // Skip whole match
+ continue
+ }
+ if subexp != "" {
+ parsedValues[subexp] = matches[i]
+ }
+ }
+ return parsedValues, nil
+}
diff --git a/pkg/stanza/operator/parser/container/config.go b/pkg/stanza/operator/parser/container/config.go
new file mode 100644
index 000000000000..fb6555708182
--- /dev/null
+++ b/pkg/stanza/operator/parser/container/config.go
@@ -0,0 +1,120 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package container // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container"
+
+import (
+ "fmt"
+ "sync"
+
+ jsoniter "github.com/json-iterator/go"
+ "go.opentelemetry.io/collector/component"
+
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/recombine"
+)
+
+const operatorType = "container"
+
+func init() {
+ operator.Register(operatorType, func() operator.Builder { return NewConfig() })
+}
+
+// NewConfig creates a new JSON parser config with default values
+func NewConfig() *Config {
+ return NewConfigWithID(operatorType)
+}
+
+// NewConfigWithID creates a new JSON parser config with default values
+func NewConfigWithID(operatorID string) *Config {
+ return &Config{
+ ParserConfig: helper.NewParserConfig(operatorID, operatorType),
+ Format: "",
+ AddMetadataFromFilePath: true,
+ }
+}
+
+// Config is the configuration of a Container parser operator.
+type Config struct {
+ helper.ParserConfig `mapstructure:",squash"`
+
+ Format string `mapstructure:"format"`
+ AddMetadataFromFilePath bool `mapstructure:"add_metadata_from_filepath"`
+}
+
+// Build will build a Container parser operator.
+func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error) {
+ parserOperator, err := c.ParserConfig.Build(set)
+ if err != nil {
+ return nil, err
+ }
+
+ cLogEmitter := helper.NewLogEmitter(set.Logger.Sugar())
+ recombineParser, err := createRecombine(set, cLogEmitter)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create internal recombine config: %w", err)
+ }
+
+ wg := sync.WaitGroup{}
+
+ if c.Format != "" {
+ switch c.Format {
+ case dockerFormat, crioFormat, containerdFormat:
+ default:
+ return &Parser{}, errors.NewError(
+ "operator config has an invalid `format` field.",
+ "ensure that the `format` field is set to one of `docker`, `crio`, `containerd`.",
+ "format", c.OnError,
+ )
+ }
+ }
+
+ p := &Parser{
+ ParserOperator: parserOperator,
+ recombineParser: recombineParser,
+ json: jsoniter.ConfigFastest,
+ format: c.Format,
+ addMetadataFromFilepath: c.AddMetadataFromFilePath,
+ crioLogEmitter: cLogEmitter,
+ criConsumers: &wg,
+ }
+ return p, nil
+}
+
+// createRecombine creates an internal recombine operator which outputs to an async helper.LogEmitter
+// the equivalent recombine config:
+//
+// combine_field: body
+// combine_with: ""
+// is_last_entry: attributes.logtag == 'F'
+// max_log_size: 102400
+// source_identifier: attributes["log.file.path"]
+// type: recombine
+func createRecombine(set component.TelemetrySettings, cLogEmitter *helper.LogEmitter) (operator.Operator, error) {
+ recombineParserCfg := createRecombineConfig()
+ recombineParser, err := recombineParserCfg.Build(set)
+ if err != nil {
+ return nil, fmt.Errorf("failed to resolve internal recombine config: %w", err)
+ }
+
+ // set the LogEmmiter as the output of the recombine parser
+ recombineParser.SetOutputIDs([]string{cLogEmitter.OperatorID})
+ if err := recombineParser.SetOutputs([]operator.Operator{cLogEmitter}); err != nil {
+ return nil, fmt.Errorf("failed to set outputs of internal recombine")
+ }
+
+ return recombineParser, nil
+}
+
+func createRecombineConfig() *recombine.Config {
+ recombineParserCfg := recombine.NewConfigWithID(recombineInternalID)
+ recombineParserCfg.IsLastEntry = "attributes.logtag == 'F'"
+ recombineParserCfg.CombineField = entry.NewBodyField()
+ recombineParserCfg.CombineWith = ""
+ recombineParserCfg.SourceIdentifier = entry.NewAttributeField("log.file.path")
+ recombineParserCfg.MaxLogSize = 102400
+ return recombineParserCfg
+}
diff --git a/pkg/stanza/operator/parser/container/config_test.go b/pkg/stanza/operator/parser/container/config_test.go
new file mode 100644
index 000000000000..599c26c1b7fd
--- /dev/null
+++ b/pkg/stanza/operator/parser/container/config_test.go
@@ -0,0 +1,107 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+package container
+
+import (
+ "path/filepath"
+ "testing"
+
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest"
+)
+
+func TestConfig(t *testing.T) {
+ operatortest.ConfigUnmarshalTests{
+ DefaultConfig: NewConfig(),
+ TestsFile: filepath.Join(".", "testdata", "config.yaml"),
+ Tests: []operatortest.ConfigUnmarshalTest{
+ {
+ Name: "default",
+ Expect: NewConfig(),
+ },
+ {
+ Name: "parse_from_simple",
+ Expect: func() *Config {
+ cfg := NewConfig()
+ cfg.ParseFrom = entry.NewBodyField("from")
+ return cfg
+ }(),
+ },
+ {
+ Name: "parse_to_simple",
+ Expect: func() *Config {
+ cfg := NewConfig()
+ cfg.ParseTo = entry.RootableField{Field: entry.NewBodyField("log")}
+ return cfg
+ }(),
+ },
+ {
+ Name: "on_error_drop",
+ Expect: func() *Config {
+ cfg := NewConfig()
+ cfg.OnError = "drop"
+ return cfg
+ }(),
+ },
+ {
+ Name: "severity",
+ Expect: func() *Config {
+ cfg := NewConfig()
+ parseField := entry.NewBodyField("severity_field")
+ severityField := helper.NewSeverityConfig()
+ severityField.ParseFrom = &parseField
+ mapping := map[string]any{
+ "critical": "5xx",
+ "error": "4xx",
+ "info": "3xx",
+ "debug": "2xx",
+ }
+ severityField.Mapping = mapping
+ cfg.SeverityConfig = &severityField
+ return cfg
+ }(),
+ },
+ {
+ Name: "format",
+ Expect: func() *Config {
+ cfg := NewConfig()
+ cfg.Format = "docker"
+ return cfg
+ }(),
+ },
+ {
+ Name: "add_metadata_from_file_path",
+ Expect: func() *Config {
+ cfg := NewConfig()
+ cfg.AddMetadataFromFilePath = true
+ return cfg
+ }(),
+ },
+ {
+ Name: "parse_to_attributes",
+ Expect: func() *Config {
+ p := NewConfig()
+ p.ParseTo = entry.RootableField{Field: entry.NewAttributeField()}
+ return p
+ }(),
+ },
+ {
+ Name: "parse_to_body",
+ Expect: func() *Config {
+ p := NewConfig()
+ p.ParseTo = entry.RootableField{Field: entry.NewBodyField()}
+ return p
+ }(),
+ },
+ {
+ Name: "parse_to_resource",
+ Expect: func() *Config {
+ p := NewConfig()
+ p.ParseTo = entry.RootableField{Field: entry.NewResourceField()}
+ return p
+ }(),
+ },
+ },
+ }.Run(t)
+}
diff --git a/pkg/stanza/operator/parser/container/package_test.go b/pkg/stanza/operator/parser/container/package_test.go
new file mode 100644
index 000000000000..245776eec13d
--- /dev/null
+++ b/pkg/stanza/operator/parser/container/package_test.go
@@ -0,0 +1,14 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package container
+
+import (
+ "testing"
+
+ "go.uber.org/goleak"
+)
+
+func TestMain(m *testing.M) {
+ goleak.VerifyTestMain(m)
+}
diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go
new file mode 100644
index 000000000000..d531925e9735
--- /dev/null
+++ b/pkg/stanza/operator/parser/container/parser.go
@@ -0,0 +1,357 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package container // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container"
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "regexp"
+ "strings"
+ "sync"
+ "time"
+
+ jsoniter "github.com/json-iterator/go"
+
+ "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
+)
+
+const dockerFormat = "docker"
+const crioFormat = "crio"
+const containerdFormat = "containerd"
+const recombineInternalID = "recombine_container_internal"
+const dockerPattern = "^\\{"
+const crioPattern = "^(?P[^ Z]+) (?Pstdout|stderr) (?P[^ ]*) ?(?P.*)$"
+const containerdPattern = "^(?P[^ ^Z]+Z) (?Pstdout|stderr) (?P[^ ]*) ?(?P.*)$"
+const logpathPattern = "^.*\\/(?P[^_]+)_(?P[^_]+)_(?P[a-f0-9\\-]+)\\/(?P[^\\._]+)\\/(?P\\d+)\\.log$"
+const logPathField = "log.file.path"
+const crioTimeLayout = "2006-01-02T15:04:05.999999999Z07:00"
+const goTimeLayout = "2006-01-02T15:04:05.999Z"
+
+var (
+ dockerMatcher = regexp.MustCompile(dockerPattern)
+ crioMatcher = regexp.MustCompile(crioPattern)
+ containerdMatcher = regexp.MustCompile(containerdPattern)
+ pathMatcher = regexp.MustCompile(logpathPattern)
+)
+
+var (
+ logFieldsMapping = map[string]string{
+ "stream": "log.iostream",
+ }
+ k8sMetadataMapping = map[string]string{
+ "container_name": "k8s.container.name",
+ "namespace": "k8s.namespace.name",
+ "pod_name": "k8s.pod.name",
+ "restart_count": "k8s.container.restart_count",
+ "uid": "k8s.pod.uid",
+ }
+)
+
+// Parser is an operator that parses Container logs.
+type Parser struct {
+ helper.ParserOperator
+ recombineParser operator.Operator
+ format string
+ json jsoniter.API
+ addMetadataFromFilepath bool
+ crioLogEmitter *helper.LogEmitter
+ asyncConsumerStarted bool
+ criConsumerStartOnce sync.Once
+ criConsumers *sync.WaitGroup
+}
+
+// Process will parse an entry of Container logs
+func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {
+ var timeLayout string
+
+ format := p.format
+ if format == "" {
+ format, err = p.detectFormat(entry)
+ if err != nil {
+ return fmt.Errorf("failed to detect a valid container log format: %w", err)
+ }
+ }
+
+ switch format {
+ case dockerFormat:
+ err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleAttributeMappings)
+ if err != nil {
+ return fmt.Errorf("failed to process the docker log: %w", err)
+ }
+ timeLayout = goTimeLayout
+ err = parseTime(entry, timeLayout)
+ if err != nil {
+ return fmt.Errorf("failed to parse time: %w", err)
+ }
+ case containerdFormat, crioFormat:
+ p.criConsumerStartOnce.Do(func() {
+ err = p.crioLogEmitter.Start(nil)
+ if err != nil {
+ p.Logger().Warn("unable to start the internal LogEmitter: %w", err)
+ return
+ }
+ err = p.recombineParser.Start(nil)
+ if err != nil {
+ p.Logger().Warn("unable to start the internal recombine operator: %w", err)
+ return
+ }
+ go p.crioConsumer(ctx)
+ p.asyncConsumerStarted = true
+ })
+
+ // Short circuit if the "if" condition does not match
+ skip, err := p.Skip(ctx, entry)
+ if err != nil {
+ return p.HandleEntryError(ctx, entry, err)
+ }
+ if skip {
+ p.Write(ctx, entry)
+ return nil
+ }
+
+ if format == containerdFormat {
+ // parse the message
+ err = p.ParserOperator.ParseWith(ctx, entry, p.parseContainerd)
+ if err != nil {
+ return fmt.Errorf("failed to parse containerd log: %w", err)
+ }
+ timeLayout = goTimeLayout
+ } else {
+ // parse the message
+ err = p.ParserOperator.ParseWith(ctx, entry, p.parseCRIO)
+ if err != nil {
+ return fmt.Errorf("failed to parse crio log: %w", err)
+ }
+ timeLayout = crioTimeLayout
+ }
+
+ err = parseTime(entry, timeLayout)
+ if err != nil {
+ return fmt.Errorf("failed to parse time: %w", err)
+ }
+
+ err = p.handleAttributeMappings(entry)
+ if err != nil {
+ return fmt.Errorf("failed to handle attribute mappings: %w", err)
+ }
+
+ // send it to the recombine operator
+ err = p.recombineParser.Process(ctx, entry)
+ if err != nil {
+ return fmt.Errorf("failed to recombine the crio log: %w", err)
+ }
+ default:
+ return fmt.Errorf("failed to detect a valid container log format")
+ }
+
+ return nil
+}
+
+// crioConsumer receives log entries from the crioLogEmitter and
+// writes them to the output of the main parser
+func (p *Parser) crioConsumer(ctx context.Context) {
+ entriesChan := p.crioLogEmitter.OutChannel()
+ p.criConsumers.Add(1)
+ defer p.criConsumers.Done()
+ for entries := range entriesChan {
+ for _, e := range entries {
+ p.Write(ctx, e)
+ }
+ }
+}
+
+// Stop ensures that the internal recombineParser, the internal crioLogEmitter and
+// the crioConsumer are stopped in the proper order without being affected by
+// any possible race conditions
+func (p *Parser) Stop() error {
+ if !p.asyncConsumerStarted {
+ // nothing is started return
+ return nil
+ }
+
+ var stopErrs []error
+ err := p.recombineParser.Stop()
+ if err != nil {
+ stopErrs = append(stopErrs, fmt.Errorf("unable to stop the internal recombine operator: %w", err))
+ }
+ // the recombineParser will call the Process of the crioLogEmitter synchronously so the entries will be first
+ // written to the channel before the Stop of the recombineParser returns. Then since the crioLogEmitter handles
+ // the entries synchronously it is safe to call its Stop.
+ // After crioLogEmitter is stopped the crioConsumer will consume the remaining messages and return.
+ err = p.crioLogEmitter.Stop()
+ if err != nil {
+ stopErrs = append(stopErrs, fmt.Errorf("unable to stop the internal LogEmitter: %w", err))
+ }
+ p.criConsumers.Wait()
+ return errors.Join(stopErrs...)
+}
+
+// detectFormat will detect the container log format
+func (p *Parser) detectFormat(e *entry.Entry) (string, error) {
+ value, ok := e.Get(p.ParseFrom)
+ if !ok {
+ return "", fmt.Errorf("entry cannot be parsed as container logs")
+ }
+
+ raw, ok := value.(string)
+ if !ok {
+ return "", fmt.Errorf("type '%T' cannot be parsed as container logs", value)
+ }
+
+ switch {
+ case dockerMatcher.MatchString(raw):
+ return dockerFormat, nil
+ case crioMatcher.MatchString(raw):
+ return crioFormat, nil
+ case containerdMatcher.MatchString(raw):
+ return containerdFormat, nil
+ }
+ return "", fmt.Errorf("entry cannot be parsed as container logs: %v", value)
+}
+
+// parseCRIO will parse a crio log value based on a fixed regexp
+func (p *Parser) parseCRIO(value any) (any, error) {
+ raw, ok := value.(string)
+ if !ok {
+ return "", fmt.Errorf("type '%T' cannot be parsed as cri-o container logs", value)
+ }
+
+ return helper.MatchValues(raw, crioMatcher)
+}
+
+// parseContainerd will parse a containerd log value based on a fixed regexp
+func (p *Parser) parseContainerd(value any) (any, error) {
+ raw, ok := value.(string)
+ if !ok {
+ return nil, fmt.Errorf("type '%T' cannot be parsed as containerd logs", value)
+ }
+
+ return helper.MatchValues(raw, containerdMatcher)
+}
+
+// parseDocker will parse a docker log value as JSON
+func (p *Parser) parseDocker(value any) (any, error) {
+ raw, ok := value.(string)
+ if !ok {
+ return nil, fmt.Errorf("type '%T' cannot be parsed as docker container logs", value)
+ }
+
+ parsedValue := make(map[string]any)
+ err := p.json.UnmarshalFromString(raw, &parsedValue)
+ if err != nil {
+ return nil, err
+ }
+ return parsedValue, nil
+}
+
+// handleAttributeMappings handles fields' mappings and k8s meta extraction
+func (p *Parser) handleAttributeMappings(e *entry.Entry) error {
+ err := p.handleMoveAttributes(e)
+ if err != nil {
+ return err
+ }
+ err = p.extractk8sMetaFromFilePath(e)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// handleMoveAttributes moves fields to final attributes
+func (p *Parser) handleMoveAttributes(e *entry.Entry) error {
+ // move `log` to `body` explicitly first to avoid
+ // moving after more attributes have been added under the `log.*` key
+ err := moveFieldToBody(e, "log", "body")
+ if err != nil {
+ return err
+ }
+ // then move the rest of the fields
+ for originalKey, mappedKey := range logFieldsMapping {
+ err = moveField(e, originalKey, mappedKey)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// extractk8sMetaFromFilePath extracts metadata attributes from logfilePath
+func (p *Parser) extractk8sMetaFromFilePath(e *entry.Entry) error {
+ if !p.addMetadataFromFilepath {
+ return nil
+ }
+
+ logPath := e.Attributes[logPathField]
+ rawLogPath, ok := logPath.(string)
+ if !ok {
+ return fmt.Errorf("type '%T' cannot be parsed as log path field", logPath)
+ }
+
+ parsedValues, err := helper.MatchValues(rawLogPath, pathMatcher)
+ if err != nil {
+ return fmt.Errorf("failed to detect a valid log path")
+ }
+
+ for originalKey, attributeKey := range k8sMetadataMapping {
+ newField := entry.NewAttributeField(attributeKey)
+ if err := newField.Set(e, parsedValues[originalKey]); err != nil {
+ return fmt.Errorf("failed to set %v as metadata at %v", originalKey, attributeKey)
+ }
+
+ }
+ return nil
+}
+
+func moveField(e *entry.Entry, originalKey, mappedKey string) error {
+ val, exist := entry.NewAttributeField(originalKey).Delete(e)
+ if !exist {
+ return fmt.Errorf("move: field %v does not exist", originalKey)
+ }
+ atKey := entry.NewAttributeField(mappedKey)
+ if err := atKey.Set(e, val); err != nil {
+ return fmt.Errorf("failed to move %v to %v", originalKey, mappedKey)
+ }
+ return nil
+}
+
+func moveFieldToBody(e *entry.Entry, originalKey, mappedKey string) error {
+ val, exist := entry.NewAttributeField(originalKey).Delete(e)
+ if !exist {
+ return fmt.Errorf("move: field %v does not exist", originalKey)
+ }
+ body, _ := entry.NewField(mappedKey)
+ if err := body.Set(e, val); err != nil {
+ return fmt.Errorf("failed to move %v to %v", originalKey, mappedKey)
+ }
+ return nil
+}
+
+func parseTime(e *entry.Entry, layout string) error {
+ var location *time.Location
+ parseFrom := "time"
+ value, ok := e.Get(entry.NewAttributeField(parseFrom))
+ if !ok {
+ return fmt.Errorf("failed to get the time from %v", e)
+ }
+
+ if strings.HasSuffix(layout, "Z") {
+ // If a timestamp ends with 'Z', it should be interpreted at Zulu (UTC) time
+ location = time.UTC
+ } else {
+ location = time.Local
+ }
+
+ timeValue, err := timeutils.ParseGotime(layout, value, location)
+ if err != nil {
+ return err
+ }
+ // timeutils.ParseGotime calls timeutils.SetTimestampYear before returning the timeValue
+ e.Timestamp = timeValue
+ return nil
+}
diff --git a/pkg/stanza/operator/parser/container/parser_test.go b/pkg/stanza/operator/parser/container/parser_test.go
new file mode 100644
index 000000000000..1d966705be8d
--- /dev/null
+++ b/pkg/stanza/operator/parser/container/parser_test.go
@@ -0,0 +1,370 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package container
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "go.opentelemetry.io/collector/component/componenttest"
+
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/recombine"
+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
+)
+
+func newTestParser(t *testing.T) *Parser {
+ config := NewConfigWithID("test")
+ set := componenttest.NewNopTelemetrySettings()
+ op, err := config.Build(set)
+ require.NoError(t, err)
+ return op.(*Parser)
+}
+
+func TestConfigBuild(t *testing.T) {
+ config := NewConfigWithID("test")
+ set := componenttest.NewNopTelemetrySettings()
+ op, err := config.Build(set)
+ require.NoError(t, err)
+ require.IsType(t, &Parser{}, op)
+}
+
+func TestConfigBuildFailure(t *testing.T) {
+ config := NewConfigWithID("test")
+ config.OnError = "invalid_on_error"
+ set := componenttest.NewNopTelemetrySettings()
+ _, err := config.Build(set)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "invalid `on_error` field")
+}
+
+func TestConfigBuildFormatError(t *testing.T) {
+ config := NewConfigWithID("test")
+ config.Format = "invalid_runtime"
+ set := componenttest.NewNopTelemetrySettings()
+ _, err := config.Build(set)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "invalid `format` field")
+}
+
+func TestDockerParserInvalidType(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parseDocker([]int{})
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "type '[]int' cannot be parsed as docker container logs")
+}
+
+func TestCrioParserInvalidType(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parseCRIO([]int{})
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "type '[]int' cannot be parsed as cri-o container logs")
+}
+
+func TestContainerdParserInvalidType(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parseContainerd([]int{})
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "type '[]int' cannot be parsed as containerd logs")
+}
+
+func TestFormatDetectionFailure(t *testing.T) {
+ parser := newTestParser(t)
+ e := &entry.Entry{
+ Body: `invalid container format`,
+ }
+ _, err := parser.detectFormat(e)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "entry cannot be parsed as container logs")
+}
+
+func TestInternalRecombineCfg(t *testing.T) {
+ cfg := createRecombineConfig()
+ expected := recombine.NewConfigWithID(recombineInternalID)
+ expected.IsLastEntry = "attributes.logtag == 'F'"
+ expected.CombineField = entry.NewBodyField()
+ expected.CombineWith = ""
+ expected.SourceIdentifier = entry.NewAttributeField("log.file.path")
+ expected.MaxLogSize = 102400
+ require.Equal(t, cfg, expected)
+}
+
+func TestProcess(t *testing.T) {
+ cases := []struct {
+ name string
+ op func() (operator.Operator, error)
+ input *entry.Entry
+ expect *entry.Entry
+ }{
+ {
+ "docker",
+ func() (operator.Operator, error) {
+ cfg := NewConfigWithID("test_id")
+ cfg.AddMetadataFromFilePath = false
+ cfg.Format = "docker"
+ set := componenttest.NewNopTelemetrySettings()
+ return cfg.Build(set)
+ },
+ &entry.Entry{
+ Body: `{"log":"INFO: log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}`,
+ },
+ &entry.Entry{
+ Attributes: map[string]any{
+ "time": "2029-03-30T08:31:20.545192187Z",
+ "log.iostream": "stdout",
+ },
+ Body: "INFO: log line here",
+ Timestamp: time.Date(2029, time.March, 30, 8, 31, 20, 545192187, time.UTC),
+ },
+ },
+ {
+ "docker_with_auto_detection",
+ func() (operator.Operator, error) {
+ cfg := NewConfigWithID("test_id")
+ cfg.AddMetadataFromFilePath = false
+ set := componenttest.NewNopTelemetrySettings()
+ return cfg.Build(set)
+ },
+ &entry.Entry{
+ Body: `{"log":"INFO: log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}`,
+ },
+ &entry.Entry{
+ Attributes: map[string]any{
+ "time": "2029-03-30T08:31:20.545192187Z",
+ "log.iostream": "stdout",
+ },
+ Body: "INFO: log line here",
+ Timestamp: time.Date(2029, time.March, 30, 8, 31, 20, 545192187, time.UTC),
+ },
+ },
+ {
+ "docker_with_auto_detection_and_metadata_from_file_path",
+ func() (operator.Operator, error) {
+ cfg := NewConfigWithID("test_id")
+ cfg.AddMetadataFromFilePath = true
+ set := componenttest.NewNopTelemetrySettings()
+ return cfg.Build(set)
+ },
+ &entry.Entry{
+ Body: `{"log":"INFO: log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}`,
+ Attributes: map[string]any{
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ },
+ &entry.Entry{
+ Attributes: map[string]any{
+ "time": "2029-03-30T08:31:20.545192187Z",
+ "log.iostream": "stdout",
+ "k8s.pod.name": "kube-scheduler-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d3",
+ "k8s.container.name": "kube-scheduler44",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ Body: "INFO: log line here",
+ Timestamp: time.Date(2029, time.March, 30, 8, 31, 20, 545192187, time.UTC),
+ },
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ op, err := tc.op()
+ require.NoError(t, err, "did not expect operator function to return an error, this is a bug with the test case")
+
+ err = op.Process(context.Background(), tc.input)
+ require.NoError(t, err)
+ require.Equal(t, tc.expect, tc.input)
+ // Stop the operator
+ require.NoError(t, op.Stop())
+ })
+ }
+}
+
+func TestRecombineProcess(t *testing.T) {
+ cases := []struct {
+ name string
+ op func() (operator.Operator, error)
+ input []*entry.Entry
+ expectedOutput []*entry.Entry
+ }{
+ {
+ "crio_standalone_with_auto_detection_and_metadata_from_file_path",
+ func() (operator.Operator, error) {
+ cfg := NewConfigWithID("test_id")
+ cfg.AddMetadataFromFilePath = true
+ set := componenttest.NewNopTelemetrySettings()
+ return cfg.Build(set)
+ },
+ []*entry.Entry{
+ {
+ Body: `2024-04-13T07:59:37.505201169-10:00 stdout F standalone crio line which is awesome!`,
+ Attributes: map[string]any{
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ },
+ },
+ []*entry.Entry{
+ {
+ Attributes: map[string]any{
+ "time": "2024-04-13T07:59:37.505201169-10:00",
+ "log.iostream": "stdout",
+ "logtag": "F",
+ "k8s.pod.name": "kube-scheduler-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d3",
+ "k8s.container.name": "kube-scheduler44",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ Body: "standalone crio line which is awesome!",
+ Timestamp: time.Date(2024, time.April, 13, 7, 59, 37, 505201169, time.FixedZone("", -10*60*60)),
+ },
+ },
+ },
+ {
+ "containerd_standalone_with_auto_detection_and_metadata_from_file_path",
+ func() (operator.Operator, error) {
+ cfg := NewConfigWithID("test_id")
+ cfg.AddMetadataFromFilePath = true
+ set := componenttest.NewNopTelemetrySettings()
+ return cfg.Build(set)
+ },
+ []*entry.Entry{
+ {
+ Body: `2024-04-13T07:59:37.505201169Z stdout F standalone containerd line which is awesome!`,
+ Attributes: map[string]any{
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ },
+ },
+ []*entry.Entry{
+ {
+ Attributes: map[string]any{
+ "time": "2024-04-13T07:59:37.505201169Z",
+ "log.iostream": "stdout",
+ "logtag": "F",
+ "k8s.pod.name": "kube-scheduler-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d3",
+ "k8s.container.name": "kube-scheduler44",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ Body: "standalone containerd line which is awesome!",
+ Timestamp: time.Date(2024, time.April, 13, 7, 59, 37, 505201169, time.UTC),
+ },
+ },
+ },
+ {
+ "crio_multiple_with_auto_detection_and_metadata_from_file_path",
+ func() (operator.Operator, error) {
+ cfg := NewConfigWithID("test_id")
+ cfg.AddMetadataFromFilePath = true
+ set := componenttest.NewNopTelemetrySettings()
+ return cfg.Build(set)
+ },
+ []*entry.Entry{
+ {
+ Body: `2024-04-13T07:59:37.505201169-10:00 stdout P standalone crio line which i`,
+ Attributes: map[string]any{
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ },
+ {
+ Body: `2024-04-13T07:59:37.505201169-10:00 stdout F s awesome!`,
+ Attributes: map[string]any{
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ },
+ },
+ []*entry.Entry{
+ {
+ Attributes: map[string]any{
+ "time": "2024-04-13T07:59:37.505201169-10:00",
+ "log.iostream": "stdout",
+ "logtag": "P",
+ "k8s.pod.name": "kube-scheduler-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d3",
+ "k8s.container.name": "kube-scheduler44",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ Body: "standalone crio line which is awesome!",
+ Timestamp: time.Date(2024, time.April, 13, 7, 59, 37, 505201169, time.FixedZone("", -10*60*60)),
+ },
+ },
+ },
+ {
+ "containerd_standalone_with_auto_detection_and_metadata_from_file_path",
+ func() (operator.Operator, error) {
+ cfg := NewConfigWithID("test_id")
+ cfg.AddMetadataFromFilePath = true
+ set := componenttest.NewNopTelemetrySettings()
+ return cfg.Build(set)
+ },
+ []*entry.Entry{
+ {
+ Body: `2024-04-13T07:59:37.505201169Z stdout P standalone containerd line which i`,
+ Attributes: map[string]any{
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ },
+ {
+ Body: `2024-04-13T07:59:37.505201169Z stdout F s awesome!`,
+ Attributes: map[string]any{
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ },
+ },
+ []*entry.Entry{
+ {
+ Attributes: map[string]any{
+ "time": "2024-04-13T07:59:37.505201169Z",
+ "log.iostream": "stdout",
+ "logtag": "P",
+ "k8s.pod.name": "kube-scheduler-kind-control-plane",
+ "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d3",
+ "k8s.container.name": "kube-scheduler44",
+ "k8s.container.restart_count": "1",
+ "k8s.namespace.name": "some",
+ "log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
+ },
+ Body: "standalone containerd line which is awesome!",
+ Timestamp: time.Date(2024, time.April, 13, 7, 59, 37, 505201169, time.UTC),
+ },
+ },
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ ctx := context.Background()
+ op, err := tc.op()
+ require.NoError(t, err)
+ defer func() { require.NoError(t, op.Stop()) }()
+ r := op.(*Parser)
+
+ fake := testutil.NewFakeOutput(t)
+ r.OutputOperators = ([]operator.Operator{fake})
+
+ for _, e := range tc.input {
+ require.NoError(t, r.Process(ctx, e))
+ }
+
+ fake.ExpectEntries(t, tc.expectedOutput)
+
+ select {
+ case e := <-fake.Received:
+ require.FailNow(t, "Received unexpected entry: ", e)
+ default:
+ }
+ })
+ }
+}
diff --git a/pkg/stanza/operator/parser/container/testdata/config.yaml b/pkg/stanza/operator/parser/container/testdata/config.yaml
new file mode 100644
index 000000000000..fe174f3bb071
--- /dev/null
+++ b/pkg/stanza/operator/parser/container/testdata/config.yaml
@@ -0,0 +1,41 @@
+default:
+ type: container
+format:
+ type: container
+ format: "docker"
+on_error_drop:
+ type: container
+ on_error: drop
+add_metadata_from_file_path:
+ type: container
+ add_metadata_from_file_path: true
+parse_from_simple:
+ type: container
+ parse_from: body.from
+parse_to_attributes:
+ type: container
+ parse_to: attributes
+parse_to_body:
+ type: container
+ parse_to: body
+parse_to_resource:
+ type: container
+ parse_to: resource
+parse_to_simple:
+ type: container
+ parse_to: body.log
+severity:
+ type: container
+ severity:
+ parse_from: body.severity_field
+ mapping:
+ critical: 5xx
+ error: 4xx
+ info: 3xx
+ debug: 2xx
+timestamp:
+ type: container
+ timestamp:
+ parse_from: body.timestamp_field
+ layout_type: strptime
+ layout: '%Y-%m-%d'
diff --git a/pkg/stanza/operator/parser/regex/parser.go b/pkg/stanza/operator/parser/regex/parser.go
index 2821ea551293..7cd2b2e909c2 100644
--- a/pkg/stanza/operator/parser/regex/parser.go
+++ b/pkg/stanza/operator/parser/regex/parser.go
@@ -50,20 +50,9 @@ func (p *Parser) match(value string) (any, error) {
}
}
- matches := p.regexp.FindStringSubmatch(value)
- if matches == nil {
- return nil, fmt.Errorf("regex pattern does not match")
- }
-
- parsedValues := map[string]any{}
- for i, subexp := range p.regexp.SubexpNames() {
- if i == 0 {
- // Skip whole match
- continue
- }
- if subexp != "" {
- parsedValues[subexp] = matches[i]
- }
+ parsedValues, err := helper.MatchValues(value, p.regexp)
+ if err != nil {
+ return nil, err
}
if p.cache != nil {