Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: [processor/transform] unroll func (PR only for discussion do not merge) #36506

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/ottl/contexts/ottllog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func (tCtx TransformContext) getCache() pcommon.Map {
return tCtx.cache
}

func (tCtx TransformContext) GetScopeLogs() plog.ScopeLogs {
return tCtx.scopeLogs
}

func (tCtx TransformContext) GetScopeSchemaURLItem() internal.SchemaURLItem {
return tCtx.scopeLogs
}
Expand Down
106 changes: 106 additions & 0 deletions processor/transformprocessor/internal/logs/func_unroll.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package logs

import (
"context"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

type UnrollArguments struct {
Field ottl.GetSetter[ottllog.TransformContext]
}

// validate that they're not trying to unroll a scope attribute/resource attribute
// restrict to logRecord fields

func newUnrollFactory() ottl.Factory[ottllog.TransformContext] {
return ottl.NewFactory("unroll", &UnrollArguments{}, createUnrollFunction)
}

func createUnrollFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottllog.TransformContext], error) {
args, ok := oArgs.(*UnrollArguments)
if !ok {
return nil, fmt.Errorf("UnrollFactory args must be of type *UnrollArguments")
}
return unroll(args.Field)
}

type valueQueue struct {
data []pcommon.Value
}

func (v *valueQueue) push(val pcommon.Value) {
v.data = append(v.data, val)
}

func (v *valueQueue) pop() (pcommon.Value, error) {
if len(v.data) == 0 {
return pcommon.NewValueInt(-1), fmt.Errorf("no values in queue")
}
val := v.data[0]
v.data = v.data[1:]
return val, nil
}

func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottllog.TransformContext], error) {
valueQueue := valueQueue{
data: []pcommon.Value{},
}

var currentExpansions []pcommon.Value
return func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) {
value, err := field.Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get value to unroll: %w", err)
}

currentLogRecord := tCtx.GetLogRecord()

// Note this is a hack to store metadata on the log record
if _, ok := currentLogRecord.Attributes().Get("is_expanded"); ok {
value, err := valueQueue.pop()
if err != nil {
return nil, fmt.Errorf("unable to get value from channel")
}
currentLogRecord.Body().SetStr(value.AsString())
currentLogRecord.Attributes().Remove("is_expanded")
return nil, nil
}

newValues := []pcommon.Value{}
switch value := value.(type) {
case pcommon.Slice:
for i := 0; i < value.Len(); i++ {
v := value.At(i)
newValues = append(newValues, v)
}
default:
return nil, fmt.Errorf("input field is not a slice, got %T", value)
}

scopeLogs := tCtx.GetScopeLogs()
scopeLogs.LogRecords().RemoveIf(func(removeCandidate plog.LogRecord) bool {
return removeCandidate == currentLogRecord
})

for idx, expansion := range newValues {
newRecord := scopeLogs.LogRecords().AppendEmpty()
currentLogRecord.CopyTo(newRecord)
// handle current element as base
if idx == 0 {
newRecord.Body().SetStr(expansion.AsString())
continue
}

newRecord.Attributes().PutBool("is_expanded", true)
valueQueue.push(expansion)
}
currentExpansions = append(currentExpansions, newValues...)

return nil, nil
}, nil
}
146 changes: 146 additions & 0 deletions processor/transformprocessor/internal/logs/func_unroll_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package logs

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
)

func TestUnroll(t *testing.T) {
tests := []struct {
name string
inputData map[string]interface{}
expected []plog.LogRecord
expectError bool
}{
// {
// name: "simple_array",
// inputData: map[string]interface{}{
// "body": map[string]interface{}{
// "items": []interface{}{"one", "two", "three"},
// },
// },
// expected: []plog.LogRecord{
// func() plog.LogRecord {
// lr := plog.NewLogRecord()
// lr.Body().SetStr("one")
// return lr
// }(),
// func() plog.LogRecord {
// lr := plog.NewLogRecord()
// lr.Body().SetStr("two")
// return lr
// }(),
// func() plog.LogRecord {
// lr := plog.NewLogRecord()
// lr.Body().SetStr("three")
// return lr
// }(),
// },
// expectError: false,
// },
{
name: "numeric_array",
inputData: map[string]interface{}{
"body": map[string]interface{}{
"items": []interface{}{1, 2, 3},
},
},
expected: []plog.LogRecord{
func() plog.LogRecord {
lr := plog.NewLogRecord()
lr.Body().SetInt(1)
return lr
}(),
func() plog.LogRecord {
lr := plog.NewLogRecord()
lr.Body().SetInt(2)
return lr
}(),
func() plog.LogRecord {
lr := plog.NewLogRecord()
lr.Body().SetInt(3)
return lr
}(),
},
},
{
name: "empty_array",
inputData: map[string]interface{}{
"body": map[string]interface{}{
"items": []interface{}{},
},
},
expected: []plog.LogRecord{},
expectError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lr := plog.NewLogRecord()
m := pcommon.NewMap()
err := m.FromRaw(tt.inputData)
require.NoError(t, err)
lr.Attributes().FromRaw(tt.inputData)

arrayMap := pcommon.NewMap()
arraySlice := arrayMap.PutEmptySlice("items")
items := tt.inputData["body"].(map[string]any)["items"].([]any)
for _, item := range items {
switch v := item.(type) {
case string:
arraySlice.AppendEmpty().SetStr(v)
case map[string]any:
newMap := arraySlice.AppendEmpty().SetEmptyMap()
err := newMap.FromRaw(v)
assert.NoError(t, err)
default:
arraySlice.AppendEmpty().SetStr(fmt.Sprintf("%v", v))
}
}

field := ottl.StandardGetSetter[ottllog.TransformContext]{
Getter: func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) {
return arraySlice, nil
},
}

exprFunc, err := unroll(field)
if tt.expectError {
require.Error(t, err)
return
}
require.NoError(t, err)
scopeLogs := plog.NewScopeLogs()

_, err = exprFunc(context.Background(), ottllog.NewTransformContext(lr, plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), scopeLogs, plog.NewResourceLogs()))
if tt.expectError {
require.Error(t, err)
return
}
require.Equal(t, scopeLogs.LogRecords().Len(), len(tt.expected))

for i, expected := range tt.expected {
require.Equal(t, expected.Body().AsRaw(), scopeLogs.LogRecords().At(i).Body().AsRaw())
}

// Verify results
// scopeLogs
// require.True(t, ok)
// require.Equal(t, len(tt.expected), len(results.Slice().AsRaw()))

// for i, expected := range tt.expected {
// require.Equal(t, expected.Body().AsRaw(), results.Slice().At(i).AsRaw())
// }
})
}
}
11 changes: 10 additions & 1 deletion processor/transformprocessor/internal/logs/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,14 @@ import (

func LogFunctions() map[string]ottl.Factory[ottllog.TransformContext] {
// No logs-only functions yet.
return ottlfuncs.StandardFuncs[ottllog.TransformContext]()
functions := ottlfuncs.StandardFuncs[ottllog.TransformContext]()

logFunctions := ottl.CreateFactoryMap[ottllog.TransformContext](
newUnrollFactory(),
)

for k, v := range logFunctions {
functions[k] = v
}
return functions
}
Loading