diff --git a/pkg/ottl/contexts/ottllog/log.go b/pkg/ottl/contexts/ottllog/log.go index 7ca056730cc7..efada0ac4b7d 100644 --- a/pkg/ottl/contexts/ottllog/log.go +++ b/pkg/ottl/contexts/ottllog/log.go @@ -96,6 +96,10 @@ func (tCtx TransformContext) getCache() pcommon.Map { return tCtx.cache } +func (tCtx TransformContext) GetScopeLogs() plog.ScopeLogs { + return tCtx.scopeLogs +} + func (tCtx TransformContext) GetScopeSchemaURLItem() internal.SchemaURLItem { return tCtx.scopeLogs } diff --git a/processor/transformprocessor/internal/logs/func_unroll.go b/processor/transformprocessor/internal/logs/func_unroll.go new file mode 100644 index 000000000000..1962e46635ae --- /dev/null +++ b/processor/transformprocessor/internal/logs/func_unroll.go @@ -0,0 +1,106 @@ +package logs + +import ( + "context" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +type UnrollArguments struct { + Field ottl.GetSetter[ottllog.TransformContext] +} + +// validate that they're not trying to unroll a scope attribute/resource attribute +// restrict to logRecord fields + +func newUnrollFactory() ottl.Factory[ottllog.TransformContext] { + return ottl.NewFactory("unroll", &UnrollArguments{}, createUnrollFunction) +} + +func createUnrollFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottllog.TransformContext], error) { + args, ok := oArgs.(*UnrollArguments) + if !ok { + return nil, fmt.Errorf("UnrollFactory args must be of type *UnrollArguments") + } + return unroll(args.Field) +} + +type valueQueue struct { + data []pcommon.Value +} + +func (v *valueQueue) push(val pcommon.Value) { + v.data = append(v.data, val) +} + +func (v *valueQueue) pop() (pcommon.Value, error) { + if len(v.data) == 0 { + return pcommon.NewValueInt(-1), fmt.Errorf("no values in queue") + } + val := v.data[0] + v.data = v.data[1:] + return val, nil +} + +func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottllog.TransformContext], error) { + valueQueue := valueQueue{ + data: []pcommon.Value{}, + } + + var currentExpansions []pcommon.Value + return func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) { + value, err := field.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("failed to get value to unroll: %w", err) + } + + currentLogRecord := tCtx.GetLogRecord() + + // Note this is a hack to store metadata on the log record + if _, ok := currentLogRecord.Attributes().Get("is_expanded"); ok { + value, err := valueQueue.pop() + if err != nil { + return nil, fmt.Errorf("unable to get value from channel") + } + currentLogRecord.Body().SetStr(value.AsString()) + currentLogRecord.Attributes().Remove("is_expanded") + return nil, nil + } + + newValues := []pcommon.Value{} + switch value := value.(type) { + case pcommon.Slice: + for i := 0; i < value.Len(); i++ { + v := value.At(i) + newValues = append(newValues, v) + } + default: + return nil, fmt.Errorf("input field is not a slice, got %T", value) + } + + scopeLogs := tCtx.GetScopeLogs() + scopeLogs.LogRecords().RemoveIf(func(removeCandidate plog.LogRecord) bool { + return removeCandidate == currentLogRecord + }) + + for idx, expansion := range newValues { + newRecord := scopeLogs.LogRecords().AppendEmpty() + currentLogRecord.CopyTo(newRecord) + // handle current element as base + if idx == 0 { + newRecord.Body().SetStr(expansion.AsString()) + continue + } + + newRecord.Attributes().PutBool("is_expanded", true) + valueQueue.push(expansion) + } + currentExpansions = append(currentExpansions, newValues...) + + return nil, nil + }, nil +} diff --git a/processor/transformprocessor/internal/logs/func_unroll_test.go b/processor/transformprocessor/internal/logs/func_unroll_test.go new file mode 100644 index 000000000000..6149795c1c9a --- /dev/null +++ b/processor/transformprocessor/internal/logs/func_unroll_test.go @@ -0,0 +1,146 @@ +package logs + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" +) + +func TestUnroll(t *testing.T) { + tests := []struct { + name string + inputData map[string]interface{} + expected []plog.LogRecord + expectError bool + }{ + // { + // name: "simple_array", + // inputData: map[string]interface{}{ + // "body": map[string]interface{}{ + // "items": []interface{}{"one", "two", "three"}, + // }, + // }, + // expected: []plog.LogRecord{ + // func() plog.LogRecord { + // lr := plog.NewLogRecord() + // lr.Body().SetStr("one") + // return lr + // }(), + // func() plog.LogRecord { + // lr := plog.NewLogRecord() + // lr.Body().SetStr("two") + // return lr + // }(), + // func() plog.LogRecord { + // lr := plog.NewLogRecord() + // lr.Body().SetStr("three") + // return lr + // }(), + // }, + // expectError: false, + // }, + { + name: "numeric_array", + inputData: map[string]interface{}{ + "body": map[string]interface{}{ + "items": []interface{}{1, 2, 3}, + }, + }, + expected: []plog.LogRecord{ + func() plog.LogRecord { + lr := plog.NewLogRecord() + lr.Body().SetInt(1) + return lr + }(), + func() plog.LogRecord { + lr := plog.NewLogRecord() + lr.Body().SetInt(2) + return lr + }(), + func() plog.LogRecord { + lr := plog.NewLogRecord() + lr.Body().SetInt(3) + return lr + }(), + }, + }, + { + name: "empty_array", + inputData: map[string]interface{}{ + "body": map[string]interface{}{ + "items": []interface{}{}, + }, + }, + expected: []plog.LogRecord{}, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lr := plog.NewLogRecord() + m := pcommon.NewMap() + err := m.FromRaw(tt.inputData) + require.NoError(t, err) + lr.Attributes().FromRaw(tt.inputData) + + arrayMap := pcommon.NewMap() + arraySlice := arrayMap.PutEmptySlice("items") + items := tt.inputData["body"].(map[string]any)["items"].([]any) + for _, item := range items { + switch v := item.(type) { + case string: + arraySlice.AppendEmpty().SetStr(v) + case map[string]any: + newMap := arraySlice.AppendEmpty().SetEmptyMap() + err := newMap.FromRaw(v) + assert.NoError(t, err) + default: + arraySlice.AppendEmpty().SetStr(fmt.Sprintf("%v", v)) + } + } + + field := ottl.StandardGetSetter[ottllog.TransformContext]{ + Getter: func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) { + return arraySlice, nil + }, + } + + exprFunc, err := unroll(field) + if tt.expectError { + require.Error(t, err) + return + } + require.NoError(t, err) + scopeLogs := plog.NewScopeLogs() + + _, err = exprFunc(context.Background(), ottllog.NewTransformContext(lr, plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), scopeLogs, plog.NewResourceLogs())) + if tt.expectError { + require.Error(t, err) + return + } + require.Equal(t, scopeLogs.LogRecords().Len(), len(tt.expected)) + + for i, expected := range tt.expected { + require.Equal(t, expected.Body().AsRaw(), scopeLogs.LogRecords().At(i).Body().AsRaw()) + } + + // Verify results + // scopeLogs + // require.True(t, ok) + // require.Equal(t, len(tt.expected), len(results.Slice().AsRaw())) + + // for i, expected := range tt.expected { + // require.Equal(t, expected.Body().AsRaw(), results.Slice().At(i).AsRaw()) + // } + }) + } +} diff --git a/processor/transformprocessor/internal/logs/functions.go b/processor/transformprocessor/internal/logs/functions.go index 47a5151f6241..49322b23e0a6 100644 --- a/processor/transformprocessor/internal/logs/functions.go +++ b/processor/transformprocessor/internal/logs/functions.go @@ -11,5 +11,14 @@ import ( func LogFunctions() map[string]ottl.Factory[ottllog.TransformContext] { // No logs-only functions yet. - return ottlfuncs.StandardFuncs[ottllog.TransformContext]() + functions := ottlfuncs.StandardFuncs[ottllog.TransformContext]() + + logFunctions := ottl.CreateFactoryMap[ottllog.TransformContext]( + newUnrollFactory(), + ) + + for k, v := range logFunctions { + functions[k] = v + } + return functions }