From cbe919a1bcf4590e486318ba1a3281f036025a8a Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 21 Nov 2024 11:37:31 -0500 Subject: [PATCH 1/7] add unroll ottl function --- pkg/ottl/ottlfuncs/func_unroll.go | 162 +++++++++++++++++++++++++ pkg/ottl/ottlfuncs/func_unroll_test.go | 133 ++++++++++++++++++++ pkg/ottl/ottlfuncs/functions.go | 1 + 3 files changed, 296 insertions(+) create mode 100644 pkg/ottl/ottlfuncs/func_unroll.go create mode 100644 pkg/ottl/ottlfuncs/func_unroll_test.go diff --git a/pkg/ottl/ottlfuncs/func_unroll.go b/pkg/ottl/ottlfuncs/func_unroll.go new file mode 100644 index 000000000000..4413ca0c5d6a --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_unroll.go @@ -0,0 +1,162 @@ +package ottlfuncs + +import ( + "context" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type UnrollArguments[K any] struct { + From ottl.GetSetter[K] + To ottl.GetSetter[K] +} + +func NewUnrollFactory[K any]() ottl.Factory[K] { + return ottl.NewFactory("unroll", &UnrollArguments[K]{}, createUnrollFunction[K]) +} + +func createUnrollFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[K], error) { + args, ok := oArgs.(*UnrollArguments[K]) + + if !ok { + return nil, fmt.Errorf("UnrollFactory args must be of type *UnrollArguments[K]") + } + + return unroll(args.From, args.To) +} + +func cleanupMap(m pcommon.Map, key string) { + if bodyVal, exists := m.Get("body"); exists && bodyVal.Type() == pcommon.ValueTypeMap { + bodyMap := bodyVal.Map() + bodyMap.Remove(key) + // If body is empty after removal, remove it too + if bodyMap.Len() == 0 { + m.Remove("body") + } + } +} + +func unroll[K any](from ottl.GetSetter[K], to ottl.GetSetter[K]) (ottl.ExprFunc[K], error) { + return func(ctx context.Context, tCtx K) (interface{}, error) { + fmt.Printf("Received tCtx type: %T\n", tCtx) + // Get the original value + value, err := from.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("failed to get value to unroll: %w", err) + } + + var sliceVal []interface{} + + // Handle different types of input values + switch v := value.(type) { + case pcommon.Map: + raw := v.AsRaw() + // Look for any array in the map + for key, val := range raw { + if arr, ok := val.([]interface{}); ok { + sliceVal = arr + // Remove the field from the original map + v.Remove(key) + + // If this is a log record, clean up both body and attributes + if lr, ok := any(tCtx).(plog.LogRecord); ok { + // Clean up from attributes if present + if attrs := lr.Attributes(); attrs.Len() > 0 { + cleanupMap(attrs, key) + } + + // Clean up from body if present + if lr.Body().Type() == pcommon.ValueTypeMap { + cleanupMap(lr.Body().Map(), key) + } + } + break + } + } + if sliceVal == nil { + return nil, fmt.Errorf("map does not contain an array value") + } + case []interface{}: + sliceVal = v + // Handle direct array case + if lr, ok := any(tCtx).(plog.LogRecord); ok { + // If the array was in the body, clear it + if lr.Body().Type() == pcommon.ValueTypeMap { + lr.Body().SetEmptyMap() + } + } + default: + return nil, fmt.Errorf("value is not an array or pcommon.Map, got %T", value) + } + + results := make([]K, 0, len(sliceVal)) + + // Create new records for each value + if len(sliceVal) > 0 { + for _, item := range sliceVal { + var newTCtx K + z := any(tCtx) + switch v := z.(type) { + case ottllog.TransformContext: + newLog := plog.NewLogRecord() + v.GetLogRecord().CopyTo(newLog) + newScope := plog.NewScopeLogs() + if scope := v.GetScopeSchemaURLItem(); scope != nil { + newScope.SetSchemaUrl(scope.SchemaUrl()) + } + newResource := plog.NewResourceLogs() + if resource := v.GetResourceSchemaURLItem(); resource != nil { + newResource.SetSchemaUrl(resource.SchemaUrl()) + } + + newTCtx = any(ottllog.NewTransformContext( + newLog, + v.GetInstrumentationScope(), + v.GetResource(), + newScope, + newResource, + )).(K) + case plog.LogRecord: + newLog := plog.NewLogRecord() + v.CopyTo(newLog) + newTCtx = any(newLog).(K) + case ptrace.Span: + newSpan := ptrace.NewSpan() + v.CopyTo(newSpan) + newTCtx = any(newSpan).(K) + case pmetric.Metric: + newMetric := pmetric.NewMetric() + v.CopyTo(newMetric) + newTCtx = any(newMetric).(K) + default: + return nil, fmt.Errorf("unsupported context type: %T", tCtx) + } + + var pValue interface{} = item + if rawValue, ok := item.(map[string]interface{}); ok { + newMap := pcommon.NewMap() + err := newMap.FromRaw(rawValue) + if err != nil { + return nil, fmt.Errorf("failed to convert map value: %w", err) + } + pValue = newMap + } + + err = to.Set(ctx, newTCtx, pValue) + if err != nil { + return nil, fmt.Errorf("failed to set value: %w", err) + } + + results = append(results, newTCtx) + } + } + + return results, nil + }, nil +} diff --git a/pkg/ottl/ottlfuncs/func_unroll_test.go b/pkg/ottl/ottlfuncs/func_unroll_test.go new file mode 100644 index 000000000000..a7d7c3d05264 --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_unroll_test.go @@ -0,0 +1,133 @@ +package ottlfuncs + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +func Test_unroll(t *testing.T) { + tests := []struct { + name string + inputData map[string]interface{} + expected []plog.LogRecord + expectError bool + }{ + { + name: "simple_array", + inputData: map[string]interface{}{ + "body": map[string]interface{}{ + "items": []interface{}{"one", "two", "three"}, + }, + }, + expected: []plog.LogRecord{ + func() plog.LogRecord { + lr := plog.NewLogRecord() + bodyMap := lr.Body().SetEmptyMap() + bodyMap.PutStr("item", "one") + return lr + }(), + func() plog.LogRecord { + lr := plog.NewLogRecord() + bodyMap := lr.Body().SetEmptyMap() + bodyMap.PutStr("item", "two") + return lr + }(), + func() plog.LogRecord { + lr := plog.NewLogRecord() + bodyMap := lr.Body().SetEmptyMap() + bodyMap.PutStr("item", "three") + return lr + }(), + }, + expectError: false, + }, + { + name: "empty_array", + inputData: map[string]interface{}{ + "body": map[string]interface{}{ + "items": []interface{}{}, + }, + }, + expected: []plog.LogRecord{}, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lr := plog.NewLogRecord() + m := pcommon.NewMap() + err := m.FromRaw(tt.inputData) + assert.NoError(t, err) + lr.Attributes().FromRaw(tt.inputData) + + arrayMap := pcommon.NewMap() + arraySlice := arrayMap.PutEmptySlice("items") + items := tt.inputData["body"].(map[string]interface{})["items"].([]interface{}) + for _, item := range items { + switch v := item.(type) { + case string: + arraySlice.AppendEmpty().SetStr(v) + case map[string]interface{}: + newMap := arraySlice.AppendEmpty().SetEmptyMap() + err := newMap.FromRaw(v) + assert.NoError(t, err) + default: + arraySlice.AppendEmpty().SetStr(fmt.Sprintf("%v", v)) + } + } + + from := ottl.StandardGetSetter[plog.LogRecord]{ + Getter: func(ctx context.Context, tCtx plog.LogRecord) (interface{}, error) { + return arrayMap, nil + }, + } + + to := ottl.StandardGetSetter[plog.LogRecord]{ + Setter: func(ctx context.Context, tCtx plog.LogRecord, val interface{}) error { + bodyMap := tCtx.Body().SetEmptyMap() + switch v := val.(type) { + case string: + bodyMap.PutStr("item", v) + case map[string]interface{}: + itemsMap := bodyMap.PutEmptyMap("item") + return itemsMap.FromRaw(v) + default: + bodyMap.PutStr("item", fmt.Sprintf("%v", v)) + } + return nil + }, + } + + exprFunc, err := unroll[plog.LogRecord](from, to) + if tt.expectError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + result, err := exprFunc(context.Background(), lr) + if tt.expectError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + // Verify results + results, ok := result.([]plog.LogRecord) + assert.True(t, ok) + assert.Equal(t, len(tt.expected), len(results)) + + for i, expected := range tt.expected { + assert.Equal(t, expected.Body().AsRaw(), results[i].Body().AsRaw()) + } + }) + } +} diff --git a/pkg/ottl/ottlfuncs/functions.go b/pkg/ottl/ottlfuncs/functions.go index 6fae06eb6b01..1926d6a1c015 100644 --- a/pkg/ottl/ottlfuncs/functions.go +++ b/pkg/ottl/ottlfuncs/functions.go @@ -14,6 +14,7 @@ func StandardFuncs[K any]() map[string]ottl.Factory[K] { NewDeleteMatchingKeysFactory[K](), NewKeepMatchingKeysFactory[K](), NewFlattenFactory[K](), + NewUnrollFactory[K](), NewKeepKeysFactory[K](), NewLimitFactory[K](), NewMergeMapsFactory[K](), From ef952691bd8e3741e7c5e7a68378c2927641ea31 Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 21 Nov 2024 11:55:14 -0500 Subject: [PATCH 2/7] add split to functions --- pkg/ottl/ottlfuncs/functions.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ottl/ottlfuncs/functions.go b/pkg/ottl/ottlfuncs/functions.go index 1926d6a1c015..e9f266521390 100644 --- a/pkg/ottl/ottlfuncs/functions.go +++ b/pkg/ottl/ottlfuncs/functions.go @@ -23,6 +23,7 @@ func StandardFuncs[K any]() map[string]ottl.Factory[K] { NewReplaceMatchFactory[K](), NewReplacePatternFactory[K](), NewSetFactory[K](), + NewSplitFactory[K](), NewTruncateAllFactory[K](), } f = append(f, converters[K]()...) From ae3fd3388b78659f7dd9dd3bbef5b3b374ec13dd Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 21 Nov 2024 12:02:36 -0500 Subject: [PATCH 3/7] remove split from editors --- pkg/ottl/ottlfuncs/functions.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ottl/ottlfuncs/functions.go b/pkg/ottl/ottlfuncs/functions.go index e9f266521390..1926d6a1c015 100644 --- a/pkg/ottl/ottlfuncs/functions.go +++ b/pkg/ottl/ottlfuncs/functions.go @@ -23,7 +23,6 @@ func StandardFuncs[K any]() map[string]ottl.Factory[K] { NewReplaceMatchFactory[K](), NewReplacePatternFactory[K](), NewSetFactory[K](), - NewSplitFactory[K](), NewTruncateAllFactory[K](), } f = append(f, converters[K]()...) From 53e1994b429686d5dc68cc021ff691ea12ec0084 Mon Sep 17 00:00:00 2001 From: schmikei Date: Fri, 22 Nov 2024 14:30:36 -0500 Subject: [PATCH 4/7] spike not full solution --- pkg/ottl/contexts/ottllog/log.go | 4 + pkg/ottl/ottlfuncs/func_unroll.go | 162 ------------------ pkg/ottl/ottlfuncs/func_unroll_test.go | 133 -------------- pkg/ottl/ottlfuncs/functions.go | 1 - .../internal/logs/func_unroll.go | 86 ++++++++++ .../internal/logs/func_unroll_test.go | 133 ++++++++++++++ .../internal/logs/functions.go | 11 +- 7 files changed, 233 insertions(+), 297 deletions(-) delete mode 100644 pkg/ottl/ottlfuncs/func_unroll.go delete mode 100644 pkg/ottl/ottlfuncs/func_unroll_test.go create mode 100644 processor/transformprocessor/internal/logs/func_unroll.go create mode 100644 processor/transformprocessor/internal/logs/func_unroll_test.go diff --git a/pkg/ottl/contexts/ottllog/log.go b/pkg/ottl/contexts/ottllog/log.go index 7ca056730cc7..efada0ac4b7d 100644 --- a/pkg/ottl/contexts/ottllog/log.go +++ b/pkg/ottl/contexts/ottllog/log.go @@ -96,6 +96,10 @@ func (tCtx TransformContext) getCache() pcommon.Map { return tCtx.cache } +func (tCtx TransformContext) GetScopeLogs() plog.ScopeLogs { + return tCtx.scopeLogs +} + func (tCtx TransformContext) GetScopeSchemaURLItem() internal.SchemaURLItem { return tCtx.scopeLogs } diff --git a/pkg/ottl/ottlfuncs/func_unroll.go b/pkg/ottl/ottlfuncs/func_unroll.go deleted file mode 100644 index 4413ca0c5d6a..000000000000 --- a/pkg/ottl/ottlfuncs/func_unroll.go +++ /dev/null @@ -1,162 +0,0 @@ -package ottlfuncs - -import ( - "context" - "fmt" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -type UnrollArguments[K any] struct { - From ottl.GetSetter[K] - To ottl.GetSetter[K] -} - -func NewUnrollFactory[K any]() ottl.Factory[K] { - return ottl.NewFactory("unroll", &UnrollArguments[K]{}, createUnrollFunction[K]) -} - -func createUnrollFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[K], error) { - args, ok := oArgs.(*UnrollArguments[K]) - - if !ok { - return nil, fmt.Errorf("UnrollFactory args must be of type *UnrollArguments[K]") - } - - return unroll(args.From, args.To) -} - -func cleanupMap(m pcommon.Map, key string) { - if bodyVal, exists := m.Get("body"); exists && bodyVal.Type() == pcommon.ValueTypeMap { - bodyMap := bodyVal.Map() - bodyMap.Remove(key) - // If body is empty after removal, remove it too - if bodyMap.Len() == 0 { - m.Remove("body") - } - } -} - -func unroll[K any](from ottl.GetSetter[K], to ottl.GetSetter[K]) (ottl.ExprFunc[K], error) { - return func(ctx context.Context, tCtx K) (interface{}, error) { - fmt.Printf("Received tCtx type: %T\n", tCtx) - // Get the original value - value, err := from.Get(ctx, tCtx) - if err != nil { - return nil, fmt.Errorf("failed to get value to unroll: %w", err) - } - - var sliceVal []interface{} - - // Handle different types of input values - switch v := value.(type) { - case pcommon.Map: - raw := v.AsRaw() - // Look for any array in the map - for key, val := range raw { - if arr, ok := val.([]interface{}); ok { - sliceVal = arr - // Remove the field from the original map - v.Remove(key) - - // If this is a log record, clean up both body and attributes - if lr, ok := any(tCtx).(plog.LogRecord); ok { - // Clean up from attributes if present - if attrs := lr.Attributes(); attrs.Len() > 0 { - cleanupMap(attrs, key) - } - - // Clean up from body if present - if lr.Body().Type() == pcommon.ValueTypeMap { - cleanupMap(lr.Body().Map(), key) - } - } - break - } - } - if sliceVal == nil { - return nil, fmt.Errorf("map does not contain an array value") - } - case []interface{}: - sliceVal = v - // Handle direct array case - if lr, ok := any(tCtx).(plog.LogRecord); ok { - // If the array was in the body, clear it - if lr.Body().Type() == pcommon.ValueTypeMap { - lr.Body().SetEmptyMap() - } - } - default: - return nil, fmt.Errorf("value is not an array or pcommon.Map, got %T", value) - } - - results := make([]K, 0, len(sliceVal)) - - // Create new records for each value - if len(sliceVal) > 0 { - for _, item := range sliceVal { - var newTCtx K - z := any(tCtx) - switch v := z.(type) { - case ottllog.TransformContext: - newLog := plog.NewLogRecord() - v.GetLogRecord().CopyTo(newLog) - newScope := plog.NewScopeLogs() - if scope := v.GetScopeSchemaURLItem(); scope != nil { - newScope.SetSchemaUrl(scope.SchemaUrl()) - } - newResource := plog.NewResourceLogs() - if resource := v.GetResourceSchemaURLItem(); resource != nil { - newResource.SetSchemaUrl(resource.SchemaUrl()) - } - - newTCtx = any(ottllog.NewTransformContext( - newLog, - v.GetInstrumentationScope(), - v.GetResource(), - newScope, - newResource, - )).(K) - case plog.LogRecord: - newLog := plog.NewLogRecord() - v.CopyTo(newLog) - newTCtx = any(newLog).(K) - case ptrace.Span: - newSpan := ptrace.NewSpan() - v.CopyTo(newSpan) - newTCtx = any(newSpan).(K) - case pmetric.Metric: - newMetric := pmetric.NewMetric() - v.CopyTo(newMetric) - newTCtx = any(newMetric).(K) - default: - return nil, fmt.Errorf("unsupported context type: %T", tCtx) - } - - var pValue interface{} = item - if rawValue, ok := item.(map[string]interface{}); ok { - newMap := pcommon.NewMap() - err := newMap.FromRaw(rawValue) - if err != nil { - return nil, fmt.Errorf("failed to convert map value: %w", err) - } - pValue = newMap - } - - err = to.Set(ctx, newTCtx, pValue) - if err != nil { - return nil, fmt.Errorf("failed to set value: %w", err) - } - - results = append(results, newTCtx) - } - } - - return results, nil - }, nil -} diff --git a/pkg/ottl/ottlfuncs/func_unroll_test.go b/pkg/ottl/ottlfuncs/func_unroll_test.go deleted file mode 100644 index a7d7c3d05264..000000000000 --- a/pkg/ottl/ottlfuncs/func_unroll_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package ottlfuncs - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" -) - -func Test_unroll(t *testing.T) { - tests := []struct { - name string - inputData map[string]interface{} - expected []plog.LogRecord - expectError bool - }{ - { - name: "simple_array", - inputData: map[string]interface{}{ - "body": map[string]interface{}{ - "items": []interface{}{"one", "two", "three"}, - }, - }, - expected: []plog.LogRecord{ - func() plog.LogRecord { - lr := plog.NewLogRecord() - bodyMap := lr.Body().SetEmptyMap() - bodyMap.PutStr("item", "one") - return lr - }(), - func() plog.LogRecord { - lr := plog.NewLogRecord() - bodyMap := lr.Body().SetEmptyMap() - bodyMap.PutStr("item", "two") - return lr - }(), - func() plog.LogRecord { - lr := plog.NewLogRecord() - bodyMap := lr.Body().SetEmptyMap() - bodyMap.PutStr("item", "three") - return lr - }(), - }, - expectError: false, - }, - { - name: "empty_array", - inputData: map[string]interface{}{ - "body": map[string]interface{}{ - "items": []interface{}{}, - }, - }, - expected: []plog.LogRecord{}, - expectError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - lr := plog.NewLogRecord() - m := pcommon.NewMap() - err := m.FromRaw(tt.inputData) - assert.NoError(t, err) - lr.Attributes().FromRaw(tt.inputData) - - arrayMap := pcommon.NewMap() - arraySlice := arrayMap.PutEmptySlice("items") - items := tt.inputData["body"].(map[string]interface{})["items"].([]interface{}) - for _, item := range items { - switch v := item.(type) { - case string: - arraySlice.AppendEmpty().SetStr(v) - case map[string]interface{}: - newMap := arraySlice.AppendEmpty().SetEmptyMap() - err := newMap.FromRaw(v) - assert.NoError(t, err) - default: - arraySlice.AppendEmpty().SetStr(fmt.Sprintf("%v", v)) - } - } - - from := ottl.StandardGetSetter[plog.LogRecord]{ - Getter: func(ctx context.Context, tCtx plog.LogRecord) (interface{}, error) { - return arrayMap, nil - }, - } - - to := ottl.StandardGetSetter[plog.LogRecord]{ - Setter: func(ctx context.Context, tCtx plog.LogRecord, val interface{}) error { - bodyMap := tCtx.Body().SetEmptyMap() - switch v := val.(type) { - case string: - bodyMap.PutStr("item", v) - case map[string]interface{}: - itemsMap := bodyMap.PutEmptyMap("item") - return itemsMap.FromRaw(v) - default: - bodyMap.PutStr("item", fmt.Sprintf("%v", v)) - } - return nil - }, - } - - exprFunc, err := unroll[plog.LogRecord](from, to) - if tt.expectError { - assert.Error(t, err) - return - } - assert.NoError(t, err) - - result, err := exprFunc(context.Background(), lr) - if tt.expectError { - assert.Error(t, err) - return - } - assert.NoError(t, err) - - // Verify results - results, ok := result.([]plog.LogRecord) - assert.True(t, ok) - assert.Equal(t, len(tt.expected), len(results)) - - for i, expected := range tt.expected { - assert.Equal(t, expected.Body().AsRaw(), results[i].Body().AsRaw()) - } - }) - } -} diff --git a/pkg/ottl/ottlfuncs/functions.go b/pkg/ottl/ottlfuncs/functions.go index 1926d6a1c015..6fae06eb6b01 100644 --- a/pkg/ottl/ottlfuncs/functions.go +++ b/pkg/ottl/ottlfuncs/functions.go @@ -14,7 +14,6 @@ func StandardFuncs[K any]() map[string]ottl.Factory[K] { NewDeleteMatchingKeysFactory[K](), NewKeepMatchingKeysFactory[K](), NewFlattenFactory[K](), - NewUnrollFactory[K](), NewKeepKeysFactory[K](), NewLimitFactory[K](), NewMergeMapsFactory[K](), diff --git a/processor/transformprocessor/internal/logs/func_unroll.go b/processor/transformprocessor/internal/logs/func_unroll.go new file mode 100644 index 000000000000..f835f62115c5 --- /dev/null +++ b/processor/transformprocessor/internal/logs/func_unroll.go @@ -0,0 +1,86 @@ +package logs + +import ( + "context" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +type UnrollArguments struct { + Field ottl.GetSetter[ottllog.TransformContext] +} + +// validate that they're not trying to unroll a scope attribute/resource attribute +// restrict to logRecord fields + +func newUnrollFactory() ottl.Factory[ottllog.TransformContext] { + return ottl.NewFactory("unroll", &UnrollArguments{}, createUnrollFunction) +} + +func createUnrollFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottllog.TransformContext], error) { + args, ok := oArgs.(*UnrollArguments) + if !ok { + return nil, fmt.Errorf("UnrollFactory args must be of type *UnrollArguments") + } + return unroll(args.Field) +} + +func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottllog.TransformContext], error) { + changeCounter := 0 + return func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) { + if changeCounter > 0 { + changeCounter-- + return nil, nil + } + + value, err := field.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("failed to get value to unroll: %w", err) + } + + expansions := []string{} + switch value := value.(type) { + case pcommon.Slice: + for _, v := range value.AsRaw() { + if s, ok := v.(string); ok { + expansions = append(expansions, s) + } else { + return nil, fmt.Errorf("value is not a string slice, got %T", v) + } + value.RemoveIf(func(removeCandidate pcommon.Value) bool { + return removeCandidate.AsRaw() == v + }) + } + + default: + return nil, fmt.Errorf("input field is not a slice, got %T", value) + } + + scopeLogs := tCtx.GetScopeLogs() + + currentRecord := tCtx.GetLogRecord() + scopeLogs.LogRecords().RemoveIf(func(removeCandidate plog.LogRecord) bool { + return removeCandidate == currentRecord + }) + + newLogs := plog.NewScopeLogs() + scopeLogs.CopyTo(newLogs) + records := newLogs.LogRecords() + + for _, expansion := range expansions { + newRecord := records.AppendEmpty() + currentRecord.CopyTo(newRecord) + // figure out the field being set; not always going to be the body + newRecord.Body().SetStr(expansion) + // TODO: This is not a safe assumption that the records processed by the transform processor are going to be in order + changeCounter++ + } + newLogs.MoveTo(scopeLogs) + + return nil, nil + }, nil +} diff --git a/processor/transformprocessor/internal/logs/func_unroll_test.go b/processor/transformprocessor/internal/logs/func_unroll_test.go new file mode 100644 index 000000000000..c7b19ce515ee --- /dev/null +++ b/processor/transformprocessor/internal/logs/func_unroll_test.go @@ -0,0 +1,133 @@ +package logs + +// import ( +// "context" +// "fmt" +// "testing" + +// "github.com/stretchr/testify/assert" +// "go.opentelemetry.io/collector/pdata/pcommon" +// "go.opentelemetry.io/collector/pdata/plog" + +// "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +// ) + +// func Test_unroll(t *testing.T) { +// tests := []struct { +// name string +// inputData map[string]interface{} +// expected []plog.LogRecord +// expectError bool +// }{ +// { +// name: "simple_array", +// inputData: map[string]interface{}{ +// "body": map[string]interface{}{ +// "items": []interface{}{"one", "two", "three"}, +// }, +// }, +// expected: []plog.LogRecord{ +// func() plog.LogRecord { +// lr := plog.NewLogRecord() +// bodyMap := lr.Body().SetEmptyMap() +// bodyMap.PutStr("item", "one") +// return lr +// }(), +// func() plog.LogRecord { +// lr := plog.NewLogRecord() +// bodyMap := lr.Body().SetEmptyMap() +// bodyMap.PutStr("item", "two") +// return lr +// }(), +// func() plog.LogRecord { +// lr := plog.NewLogRecord() +// bodyMap := lr.Body().SetEmptyMap() +// bodyMap.PutStr("item", "three") +// return lr +// }(), +// }, +// expectError: false, +// }, +// { +// name: "empty_array", +// inputData: map[string]interface{}{ +// "body": map[string]interface{}{ +// "items": []interface{}{}, +// }, +// }, +// expected: []plog.LogRecord{}, +// expectError: false, +// }, +// } + +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// lr := plog.NewLogRecord() +// m := pcommon.NewMap() +// err := m.FromRaw(tt.inputData) +// assert.NoError(t, err) +// lr.Attributes().FromRaw(tt.inputData) + +// arrayMap := pcommon.NewMap() +// arraySlice := arrayMap.PutEmptySlice("items") +// items := tt.inputData["body"].(map[string]interface{})["items"].([]interface{}) +// for _, item := range items { +// switch v := item.(type) { +// case string: +// arraySlice.AppendEmpty().SetStr(v) +// case map[string]interface{}: +// newMap := arraySlice.AppendEmpty().SetEmptyMap() +// err := newMap.FromRaw(v) +// assert.NoError(t, err) +// default: +// arraySlice.AppendEmpty().SetStr(fmt.Sprintf("%v", v)) +// } +// } + +// from := ottl.StandardGetSetter[plog.LogRecord]{ +// Getter: func(ctx context.Context, tCtx plog.LogRecord) (interface{}, error) { +// return arrayMap, nil +// }, +// } + +// to := ottl.StandardGetSetter[plog.LogRecord]{ +// Setter: func(ctx context.Context, tCtx plog.LogRecord, val interface{}) error { +// bodyMap := tCtx.Body().SetEmptyMap() +// switch v := val.(type) { +// case string: +// bodyMap.PutStr("item", v) +// case map[string]interface{}: +// itemsMap := bodyMap.PutEmptyMap("item") +// return itemsMap.FromRaw(v) +// default: +// bodyMap.PutStr("item", fmt.Sprintf("%v", v)) +// } +// return nil +// }, +// } + +// exprFunc, err := unroll[plog.LogRecord](from, to) +// if tt.expectError { +// assert.Error(t, err) +// return +// } +// assert.NoError(t, err) + +// result, err := exprFunc(context.Background(), lr) +// if tt.expectError { +// assert.Error(t, err) +// return +// } +// assert.NoError(t, err) + +// // Verify results +// results, ok := result.([]plog.LogRecord) +// assert.True(t, ok) +// assert.Equal(t, len(tt.expected), len(results)) + +// for i, expected := range tt.expected { +// assert.Equal(t, expected.Body().AsRaw(), results[i].Body().AsRaw()) +// } +// }) +// } +// } diff --git a/processor/transformprocessor/internal/logs/functions.go b/processor/transformprocessor/internal/logs/functions.go index 47a5151f6241..49322b23e0a6 100644 --- a/processor/transformprocessor/internal/logs/functions.go +++ b/processor/transformprocessor/internal/logs/functions.go @@ -11,5 +11,14 @@ import ( func LogFunctions() map[string]ottl.Factory[ottllog.TransformContext] { // No logs-only functions yet. - return ottlfuncs.StandardFuncs[ottllog.TransformContext]() + functions := ottlfuncs.StandardFuncs[ottllog.TransformContext]() + + logFunctions := ottl.CreateFactoryMap[ottllog.TransformContext]( + newUnrollFactory(), + ) + + for k, v := range logFunctions { + functions[k] = v + } + return functions } From 4d36574b2881b1f20c4282c7706bae9e0e779668 Mon Sep 17 00:00:00 2001 From: schmikei Date: Mon, 25 Nov 2024 14:14:32 -0500 Subject: [PATCH 5/7] try to start addressing the typing nightmare of slices --- .../internal/logs/func_unroll.go | 56 ++-- .../internal/logs/func_unroll_test.go | 253 +++++++++--------- 2 files changed, 173 insertions(+), 136 deletions(-) diff --git a/processor/transformprocessor/internal/logs/func_unroll.go b/processor/transformprocessor/internal/logs/func_unroll.go index f835f62115c5..e5cfb39b4311 100644 --- a/processor/transformprocessor/internal/logs/func_unroll.go +++ b/processor/transformprocessor/internal/logs/func_unroll.go @@ -31,31 +31,44 @@ func createUnrollFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.Ex func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottllog.TransformContext], error) { changeCounter := 0 + var unrollType pcommon.ValueType return func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) { - if changeCounter > 0 { - changeCounter-- - return nil, nil - } - value, err := field.Get(ctx, tCtx) if err != nil { return nil, fmt.Errorf("failed to get value to unroll: %w", err) } - expansions := []string{} + if changeCounter > 0 { + changeCounter-- + currentLogRecord := tCtx.GetLogRecord() + switch unrollType { + case pcommon.ValueTypeStr: + currentLogRecord.Body().SetStr(currentLogRecord.Body().AsString()) + case pcommon.ValueTypeInt: + currentLogRecord.Body().SetInt(currentLogRecord.Body().Int()) + case pcommon.ValueTypeDouble: + currentLogRecord.Body().SetDouble(currentLogRecord.Body().Double()) + case pcommon.ValueTypeBool: + currentLogRecord.Body().SetBool(currentLogRecord.Body().Bool()) + case pcommon.ValueTypeMap, pcommon.ValueTypeSlice: + // do nothing + default: + return nil, fmt.Errorf("unable to continue unrolling%v", unrollType) + } + return nil, nil + } + + expansions := []pcommon.Value{} switch value := value.(type) { case pcommon.Slice: - for _, v := range value.AsRaw() { - if s, ok := v.(string); ok { - expansions = append(expansions, s) - } else { - return nil, fmt.Errorf("value is not a string slice, got %T", v) - } + for i := 0; i < value.Len(); i++ { + v := value.At(i) + unrollType = v.Type() + expansions = append(expansions, v) value.RemoveIf(func(removeCandidate pcommon.Value) bool { - return removeCandidate.AsRaw() == v + return removeCandidate == v }) } - default: return nil, fmt.Errorf("input field is not a slice, got %T", value) } @@ -74,9 +87,20 @@ func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottll for _, expansion := range expansions { newRecord := records.AppendEmpty() currentRecord.CopyTo(newRecord) + switch unrollType { + case pcommon.ValueTypeStr: + newRecord.Body().SetStr(expansion.AsString()) + case pcommon.ValueTypeInt: + newRecord.Body().SetInt(expansion.Int()) + case pcommon.ValueTypeDouble: + newRecord.Body().SetDouble(expansion.Double()) + case pcommon.ValueTypeBool: + newRecord.Body().SetBool(expansion.Bool()) + default: + return nil, fmt.Errorf("unable to unroll %v", unrollType) + } // figure out the field being set; not always going to be the body - newRecord.Body().SetStr(expansion) - // TODO: This is not a safe assumption that the records processed by the transform processor are going to be in order + // newRecord.Body().SetStr(expansion) changeCounter++ } newLogs.MoveTo(scopeLogs) diff --git a/processor/transformprocessor/internal/logs/func_unroll_test.go b/processor/transformprocessor/internal/logs/func_unroll_test.go index c7b19ce515ee..6149795c1c9a 100644 --- a/processor/transformprocessor/internal/logs/func_unroll_test.go +++ b/processor/transformprocessor/internal/logs/func_unroll_test.go @@ -1,133 +1,146 @@ package logs -// import ( -// "context" -// "fmt" -// "testing" +import ( + "context" + "fmt" + "testing" -// "github.com/stretchr/testify/assert" -// "go.opentelemetry.io/collector/pdata/pcommon" -// "go.opentelemetry.io/collector/pdata/plog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" -// "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" -// ) + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" +) -// func Test_unroll(t *testing.T) { -// tests := []struct { -// name string -// inputData map[string]interface{} -// expected []plog.LogRecord -// expectError bool -// }{ -// { -// name: "simple_array", -// inputData: map[string]interface{}{ -// "body": map[string]interface{}{ -// "items": []interface{}{"one", "two", "three"}, -// }, -// }, -// expected: []plog.LogRecord{ -// func() plog.LogRecord { -// lr := plog.NewLogRecord() -// bodyMap := lr.Body().SetEmptyMap() -// bodyMap.PutStr("item", "one") -// return lr -// }(), -// func() plog.LogRecord { -// lr := plog.NewLogRecord() -// bodyMap := lr.Body().SetEmptyMap() -// bodyMap.PutStr("item", "two") -// return lr -// }(), -// func() plog.LogRecord { -// lr := plog.NewLogRecord() -// bodyMap := lr.Body().SetEmptyMap() -// bodyMap.PutStr("item", "three") -// return lr -// }(), -// }, -// expectError: false, -// }, -// { -// name: "empty_array", -// inputData: map[string]interface{}{ -// "body": map[string]interface{}{ -// "items": []interface{}{}, -// }, -// }, -// expected: []plog.LogRecord{}, -// expectError: false, -// }, -// } +func TestUnroll(t *testing.T) { + tests := []struct { + name string + inputData map[string]interface{} + expected []plog.LogRecord + expectError bool + }{ + // { + // name: "simple_array", + // inputData: map[string]interface{}{ + // "body": map[string]interface{}{ + // "items": []interface{}{"one", "two", "three"}, + // }, + // }, + // expected: []plog.LogRecord{ + // func() plog.LogRecord { + // lr := plog.NewLogRecord() + // lr.Body().SetStr("one") + // return lr + // }(), + // func() plog.LogRecord { + // lr := plog.NewLogRecord() + // lr.Body().SetStr("two") + // return lr + // }(), + // func() plog.LogRecord { + // lr := plog.NewLogRecord() + // lr.Body().SetStr("three") + // return lr + // }(), + // }, + // expectError: false, + // }, + { + name: "numeric_array", + inputData: map[string]interface{}{ + "body": map[string]interface{}{ + "items": []interface{}{1, 2, 3}, + }, + }, + expected: []plog.LogRecord{ + func() plog.LogRecord { + lr := plog.NewLogRecord() + lr.Body().SetInt(1) + return lr + }(), + func() plog.LogRecord { + lr := plog.NewLogRecord() + lr.Body().SetInt(2) + return lr + }(), + func() plog.LogRecord { + lr := plog.NewLogRecord() + lr.Body().SetInt(3) + return lr + }(), + }, + }, + { + name: "empty_array", + inputData: map[string]interface{}{ + "body": map[string]interface{}{ + "items": []interface{}{}, + }, + }, + expected: []plog.LogRecord{}, + expectError: false, + }, + } -// for _, tt := range tests { -// t.Run(tt.name, func(t *testing.T) { -// lr := plog.NewLogRecord() -// m := pcommon.NewMap() -// err := m.FromRaw(tt.inputData) -// assert.NoError(t, err) -// lr.Attributes().FromRaw(tt.inputData) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lr := plog.NewLogRecord() + m := pcommon.NewMap() + err := m.FromRaw(tt.inputData) + require.NoError(t, err) + lr.Attributes().FromRaw(tt.inputData) -// arrayMap := pcommon.NewMap() -// arraySlice := arrayMap.PutEmptySlice("items") -// items := tt.inputData["body"].(map[string]interface{})["items"].([]interface{}) -// for _, item := range items { -// switch v := item.(type) { -// case string: -// arraySlice.AppendEmpty().SetStr(v) -// case map[string]interface{}: -// newMap := arraySlice.AppendEmpty().SetEmptyMap() -// err := newMap.FromRaw(v) -// assert.NoError(t, err) -// default: -// arraySlice.AppendEmpty().SetStr(fmt.Sprintf("%v", v)) -// } -// } + arrayMap := pcommon.NewMap() + arraySlice := arrayMap.PutEmptySlice("items") + items := tt.inputData["body"].(map[string]any)["items"].([]any) + for _, item := range items { + switch v := item.(type) { + case string: + arraySlice.AppendEmpty().SetStr(v) + case map[string]any: + newMap := arraySlice.AppendEmpty().SetEmptyMap() + err := newMap.FromRaw(v) + assert.NoError(t, err) + default: + arraySlice.AppendEmpty().SetStr(fmt.Sprintf("%v", v)) + } + } -// from := ottl.StandardGetSetter[plog.LogRecord]{ -// Getter: func(ctx context.Context, tCtx plog.LogRecord) (interface{}, error) { -// return arrayMap, nil -// }, -// } + field := ottl.StandardGetSetter[ottllog.TransformContext]{ + Getter: func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) { + return arraySlice, nil + }, + } -// to := ottl.StandardGetSetter[plog.LogRecord]{ -// Setter: func(ctx context.Context, tCtx plog.LogRecord, val interface{}) error { -// bodyMap := tCtx.Body().SetEmptyMap() -// switch v := val.(type) { -// case string: -// bodyMap.PutStr("item", v) -// case map[string]interface{}: -// itemsMap := bodyMap.PutEmptyMap("item") -// return itemsMap.FromRaw(v) -// default: -// bodyMap.PutStr("item", fmt.Sprintf("%v", v)) -// } -// return nil -// }, -// } + exprFunc, err := unroll(field) + if tt.expectError { + require.Error(t, err) + return + } + require.NoError(t, err) + scopeLogs := plog.NewScopeLogs() -// exprFunc, err := unroll[plog.LogRecord](from, to) -// if tt.expectError { -// assert.Error(t, err) -// return -// } -// assert.NoError(t, err) + _, err = exprFunc(context.Background(), ottllog.NewTransformContext(lr, plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), scopeLogs, plog.NewResourceLogs())) + if tt.expectError { + require.Error(t, err) + return + } + require.Equal(t, scopeLogs.LogRecords().Len(), len(tt.expected)) -// result, err := exprFunc(context.Background(), lr) -// if tt.expectError { -// assert.Error(t, err) -// return -// } -// assert.NoError(t, err) + for i, expected := range tt.expected { + require.Equal(t, expected.Body().AsRaw(), scopeLogs.LogRecords().At(i).Body().AsRaw()) + } -// // Verify results -// results, ok := result.([]plog.LogRecord) -// assert.True(t, ok) -// assert.Equal(t, len(tt.expected), len(results)) + // Verify results + // scopeLogs + // require.True(t, ok) + // require.Equal(t, len(tt.expected), len(results.Slice().AsRaw())) -// for i, expected := range tt.expected { -// assert.Equal(t, expected.Body().AsRaw(), results[i].Body().AsRaw()) -// } -// }) -// } -// } + // for i, expected := range tt.expected { + // require.Equal(t, expected.Body().AsRaw(), results.Slice().At(i).AsRaw()) + // } + }) + } +} From e3db2490b1182e8626ac58c19a84bde4a030ec52 Mon Sep 17 00:00:00 2001 From: schmikei Date: Tue, 26 Nov 2024 15:21:03 -0500 Subject: [PATCH 6/7] try to keep a sequential array approach --- .../internal/logs/func_unroll.go | 61 +++++++++++-------- 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/processor/transformprocessor/internal/logs/func_unroll.go b/processor/transformprocessor/internal/logs/func_unroll.go index e5cfb39b4311..a3a586c339d6 100644 --- a/processor/transformprocessor/internal/logs/func_unroll.go +++ b/processor/transformprocessor/internal/logs/func_unroll.go @@ -30,7 +30,8 @@ func createUnrollFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.Ex } func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottllog.TransformContext], error) { - changeCounter := 0 + + var currentExpansions []pcommon.Value var unrollType pcommon.ValueType return func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) { value, err := field.Get(ctx, tCtx) @@ -38,22 +39,26 @@ func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottll return nil, fmt.Errorf("failed to get value to unroll: %w", err) } - if changeCounter > 0 { - changeCounter-- - currentLogRecord := tCtx.GetLogRecord() + currentLogRecord := tCtx.GetLogRecord() + if unrollIdx, ok := currentLogRecord.Attributes().Get("unrolled_idx"); ok { + // we're in the middle of unrolling + currentLogRecord.Attributes().Remove("unrolled_idx") switch unrollType { case pcommon.ValueTypeStr: - currentLogRecord.Body().SetStr(currentLogRecord.Body().AsString()) + value := currentExpansions[unrollIdx.Int()] + currentLogRecord.Body().SetStr(value.AsString()) case pcommon.ValueTypeInt: - currentLogRecord.Body().SetInt(currentLogRecord.Body().Int()) + value := currentExpansions[unrollIdx.Int()] + currentLogRecord.Body().SetInt(value.Int()) case pcommon.ValueTypeDouble: - currentLogRecord.Body().SetDouble(currentLogRecord.Body().Double()) + value := currentExpansions[unrollIdx.Int()] + currentLogRecord.Body().SetDouble(value.Double()) case pcommon.ValueTypeBool: - currentLogRecord.Body().SetBool(currentLogRecord.Body().Bool()) + value := currentExpansions[unrollIdx.Int()] + currentLogRecord.Body().SetBool(value.Bool()) case pcommon.ValueTypeMap, pcommon.ValueTypeSlice: - // do nothing default: - return nil, fmt.Errorf("unable to continue unrolling%v", unrollType) + return nil, fmt.Errorf("unable to continue unrolling %v", unrollType) } return nil, nil } @@ -65,28 +70,39 @@ func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottll v := value.At(i) unrollType = v.Type() expansions = append(expansions, v) - value.RemoveIf(func(removeCandidate pcommon.Value) bool { - return removeCandidate == v - }) } default: return nil, fmt.Errorf("input field is not a slice, got %T", value) } scopeLogs := tCtx.GetScopeLogs() - currentRecord := tCtx.GetLogRecord() scopeLogs.LogRecords().RemoveIf(func(removeCandidate plog.LogRecord) bool { return removeCandidate == currentRecord }) - newLogs := plog.NewScopeLogs() - scopeLogs.CopyTo(newLogs) - records := newLogs.LogRecords() - - for _, expansion := range expansions { - newRecord := records.AppendEmpty() + for idx, expansion := range expansions { + newRecord := scopeLogs.LogRecords().AppendEmpty() currentRecord.CopyTo(newRecord) + + // handle current element as base + if idx == 0 { + switch unrollType { + case pcommon.ValueTypeStr: + newRecord.Body().SetStr(expansion.AsString()) + case pcommon.ValueTypeInt: + newRecord.Body().SetInt(expansion.Int()) + case pcommon.ValueTypeDouble: + newRecord.Body().SetDouble(expansion.Double()) + case pcommon.ValueTypeBool: + newRecord.Body().SetBool(expansion.Bool()) + default: + return nil, fmt.Errorf("unable to unroll %v", unrollType) + } + continue + } + // currentLength := scopeLogs.LogRecords().Len() + newRecord.Attributes().PutInt("unrolled_idx", int64(len(currentExpansions)+idx)) switch unrollType { case pcommon.ValueTypeStr: newRecord.Body().SetStr(expansion.AsString()) @@ -99,11 +115,8 @@ func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottll default: return nil, fmt.Errorf("unable to unroll %v", unrollType) } - // figure out the field being set; not always going to be the body - // newRecord.Body().SetStr(expansion) - changeCounter++ } - newLogs.MoveTo(scopeLogs) + currentExpansions = append(currentExpansions, expansions...) return nil, nil }, nil From 1fe42df1f20765f0ba7c571ec805b6e337b4414a Mon Sep 17 00:00:00 2001 From: schmikei Date: Mon, 2 Dec 2024 09:13:27 -0500 Subject: [PATCH 7/7] try a new approach with temporary attributes --- .../internal/logs/func_unroll.go | 93 ++++++++----------- 1 file changed, 38 insertions(+), 55 deletions(-) diff --git a/processor/transformprocessor/internal/logs/func_unroll.go b/processor/transformprocessor/internal/logs/func_unroll.go index a3a586c339d6..1962e46635ae 100644 --- a/processor/transformprocessor/internal/logs/func_unroll.go +++ b/processor/transformprocessor/internal/logs/func_unroll.go @@ -29,10 +29,29 @@ func createUnrollFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.Ex return unroll(args.Field) } +type valueQueue struct { + data []pcommon.Value +} + +func (v *valueQueue) push(val pcommon.Value) { + v.data = append(v.data, val) +} + +func (v *valueQueue) pop() (pcommon.Value, error) { + if len(v.data) == 0 { + return pcommon.NewValueInt(-1), fmt.Errorf("no values in queue") + } + val := v.data[0] + v.data = v.data[1:] + return val, nil +} + func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottllog.TransformContext], error) { + valueQueue := valueQueue{ + data: []pcommon.Value{}, + } var currentExpansions []pcommon.Value - var unrollType pcommon.ValueType return func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) { value, err := field.Get(ctx, tCtx) if err != nil { @@ -40,83 +59,47 @@ func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottll } currentLogRecord := tCtx.GetLogRecord() - if unrollIdx, ok := currentLogRecord.Attributes().Get("unrolled_idx"); ok { - // we're in the middle of unrolling - currentLogRecord.Attributes().Remove("unrolled_idx") - switch unrollType { - case pcommon.ValueTypeStr: - value := currentExpansions[unrollIdx.Int()] - currentLogRecord.Body().SetStr(value.AsString()) - case pcommon.ValueTypeInt: - value := currentExpansions[unrollIdx.Int()] - currentLogRecord.Body().SetInt(value.Int()) - case pcommon.ValueTypeDouble: - value := currentExpansions[unrollIdx.Int()] - currentLogRecord.Body().SetDouble(value.Double()) - case pcommon.ValueTypeBool: - value := currentExpansions[unrollIdx.Int()] - currentLogRecord.Body().SetBool(value.Bool()) - case pcommon.ValueTypeMap, pcommon.ValueTypeSlice: - default: - return nil, fmt.Errorf("unable to continue unrolling %v", unrollType) + + // Note this is a hack to store metadata on the log record + if _, ok := currentLogRecord.Attributes().Get("is_expanded"); ok { + value, err := valueQueue.pop() + if err != nil { + return nil, fmt.Errorf("unable to get value from channel") } + currentLogRecord.Body().SetStr(value.AsString()) + currentLogRecord.Attributes().Remove("is_expanded") return nil, nil } - expansions := []pcommon.Value{} + newValues := []pcommon.Value{} switch value := value.(type) { case pcommon.Slice: for i := 0; i < value.Len(); i++ { v := value.At(i) - unrollType = v.Type() - expansions = append(expansions, v) + newValues = append(newValues, v) } default: return nil, fmt.Errorf("input field is not a slice, got %T", value) } scopeLogs := tCtx.GetScopeLogs() - currentRecord := tCtx.GetLogRecord() scopeLogs.LogRecords().RemoveIf(func(removeCandidate plog.LogRecord) bool { - return removeCandidate == currentRecord + return removeCandidate == currentLogRecord }) - for idx, expansion := range expansions { + for idx, expansion := range newValues { newRecord := scopeLogs.LogRecords().AppendEmpty() - currentRecord.CopyTo(newRecord) - + currentLogRecord.CopyTo(newRecord) // handle current element as base if idx == 0 { - switch unrollType { - case pcommon.ValueTypeStr: - newRecord.Body().SetStr(expansion.AsString()) - case pcommon.ValueTypeInt: - newRecord.Body().SetInt(expansion.Int()) - case pcommon.ValueTypeDouble: - newRecord.Body().SetDouble(expansion.Double()) - case pcommon.ValueTypeBool: - newRecord.Body().SetBool(expansion.Bool()) - default: - return nil, fmt.Errorf("unable to unroll %v", unrollType) - } - continue - } - // currentLength := scopeLogs.LogRecords().Len() - newRecord.Attributes().PutInt("unrolled_idx", int64(len(currentExpansions)+idx)) - switch unrollType { - case pcommon.ValueTypeStr: newRecord.Body().SetStr(expansion.AsString()) - case pcommon.ValueTypeInt: - newRecord.Body().SetInt(expansion.Int()) - case pcommon.ValueTypeDouble: - newRecord.Body().SetDouble(expansion.Double()) - case pcommon.ValueTypeBool: - newRecord.Body().SetBool(expansion.Bool()) - default: - return nil, fmt.Errorf("unable to unroll %v", unrollType) + continue } + + newRecord.Attributes().PutBool("is_expanded", true) + valueQueue.push(expansion) } - currentExpansions = append(currentExpansions, expansions...) + currentExpansions = append(currentExpansions, newValues...) return nil, nil }, nil