Skip to content

Commit 18d5c02

Browse files
authored
Move processor builders into internal service (#10782)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This moves the processor builder out of the `processor` package, and into `service/internal/builders`. There's no real reason for this struct to be public (folks shouldn't call it), and making it private will allow us to add profiling support to it. <!-- Issue number if applicable --> #### Link to tracking issue #10375 (review)
1 parent 7cd1579 commit 18d5c02

File tree

16 files changed

+394
-26
lines changed

16 files changed

+394
-26
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: processor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Deprecate processor.Builder, and move it into an internal package of the service module
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10782]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

cmd/otelcorecol/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ require (
102102
go.opentelemetry.io/collector/internal/globalgates v0.107.0 // indirect
103103
go.opentelemetry.io/collector/pdata v1.13.0 // indirect
104104
go.opentelemetry.io/collector/pdata/pprofile v0.107.0 // indirect
105+
go.opentelemetry.io/collector/pdata/testdata v0.107.0 // indirect
105106
go.opentelemetry.io/collector/semconv v0.107.0 // indirect
106107
go.opentelemetry.io/collector/service v0.107.0 // indirect
107108
go.opentelemetry.io/contrib/config v0.8.0 // indirect

internal/e2e/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ require (
2323
go.opentelemetry.io/collector/extension v0.107.0
2424
go.opentelemetry.io/collector/pdata v1.13.0
2525
go.opentelemetry.io/collector/pdata/testdata v0.107.0
26-
go.opentelemetry.io/collector/processor v0.107.0
2726
go.opentelemetry.io/collector/receiver v0.107.0
2827
go.opentelemetry.io/collector/receiver/otlpreceiver v0.107.0
2928
go.opentelemetry.io/collector/service v0.107.0
@@ -82,6 +81,7 @@ require (
8281
go.opentelemetry.io/collector/featuregate v1.13.0 // indirect
8382
go.opentelemetry.io/collector/internal/globalgates v0.107.0 // indirect
8483
go.opentelemetry.io/collector/pdata/pprofile v0.107.0 // indirect
84+
go.opentelemetry.io/collector/processor v0.107.0 // indirect
8585
go.opentelemetry.io/collector/semconv v0.107.0 // indirect
8686
go.opentelemetry.io/contrib/config v0.8.0 // indirect
8787
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect

internal/e2e/status_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"go.opentelemetry.io/collector/exporter/exportertest"
2525
"go.opentelemetry.io/collector/extension"
2626
"go.opentelemetry.io/collector/internal/sharedcomponent"
27-
"go.opentelemetry.io/collector/processor/processortest"
2827
"go.opentelemetry.io/collector/receiver"
2928
"go.opentelemetry.io/collector/service"
3029
"go.opentelemetry.io/collector/service/extensions"
@@ -52,7 +51,6 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) {
5251
ReceiversFactories: map[component.Type]receiver.Factory{
5352
component.MustNewType("test"): newReceiverFactory(),
5453
},
55-
Processors: processortest.NewNopBuilder(),
5654
ExportersConfigs: map[component.ID]component.Config{
5755
component.NewID(nopType): exporterFactory.CreateDefaultConfig(),
5856
},

otelcol/collector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"go.opentelemetry.io/collector/confmap"
2424
"go.opentelemetry.io/collector/extension"
2525
"go.opentelemetry.io/collector/otelcol/internal/grpclog"
26-
"go.opentelemetry.io/collector/processor"
2726
"go.opentelemetry.io/collector/service"
2827
)
2928

@@ -187,7 +186,8 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
187186

188187
ReceiversConfigs: cfg.Receivers,
189188
ReceiversFactories: factories.Receivers,
190-
Processors: processor.NewBuilder(cfg.Processors, factories.Processors),
189+
ProcessorsConfigs: cfg.Processors,
190+
ProcessorsFactories: factories.Processors,
191191
ExportersConfigs: cfg.Exporters,
192192
ExportersFactories: factories.Exporters,
193193
ConnectorsConfigs: cfg.Connectors,

processor/builder.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package processor // import "go.opentelemetry.io/collector/processor"
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910

1011
"go.uber.org/zap"
@@ -13,13 +14,21 @@ import (
1314
"go.opentelemetry.io/collector/consumer"
1415
)
1516

17+
var errNilNextConsumer = errors.New("nil next Consumer")
18+
1619
// Builder processor is a helper struct that given a set of Configs and Factories helps with creating processors.
20+
//
21+
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
22+
// and will be removed soon.
1723
type Builder struct {
1824
cfgs map[component.ID]component.Config
1925
factories map[component.Type]Factory
2026
}
2127

2228
// NewBuilder creates a new processor.Builder to help with creating components form a set of configs and factories.
29+
//
30+
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
31+
// and will be removed soon.
2332
func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder {
2433
return &Builder{cfgs: cfgs, factories: factories}
2534
}

processor/processor.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,12 @@
44
package processor // import "go.opentelemetry.io/collector/processor"
55

66
import (
7-
"errors"
87
"fmt"
98

109
"go.opentelemetry.io/collector/component"
1110
"go.opentelemetry.io/collector/processor/internal"
1211
)
1312

14-
var (
15-
errNilNextConsumer = errors.New("nil next Consumer")
16-
)
17-
1813
// Traces is a processor that can consume traces.
1914
type Traces = internal.Traces
2015

processor/processortest/nop_processor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ type nopProcessor struct {
6363
}
6464

6565
// NewNopBuilder returns a processor.Builder that constructs nop processors.
66+
//
67+
// Deprecated: [v0.108.0] this builder is being internalized within the service module,
68+
// and will be removed soon.
6669
func NewNopBuilder() *processor.Builder {
6770
nopFactory := NewNopFactory()
6871
return processor.NewBuilder(
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package builders // import "go.opentelemetry.io/collector/service/internal/builders"
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/consumer"
12+
"go.opentelemetry.io/collector/processor"
13+
"go.opentelemetry.io/collector/processor/processortest"
14+
)
15+
16+
// Processor is an interface that allows using implementations of the builder
17+
// from different packages.
18+
type Processor interface {
19+
CreateTraces(context.Context, processor.Settings, consumer.Traces) (processor.Traces, error)
20+
CreateMetrics(context.Context, processor.Settings, consumer.Metrics) (processor.Metrics, error)
21+
CreateLogs(context.Context, processor.Settings, consumer.Logs) (processor.Logs, error)
22+
Factory(component.Type) component.Factory
23+
}
24+
25+
// ProcessorBuilder processor is a helper struct that given a set of Configs
26+
// and Factories helps with creating processors.
27+
type ProcessorBuilder struct {
28+
cfgs map[component.ID]component.Config
29+
factories map[component.Type]processor.Factory
30+
}
31+
32+
// NewProcessor creates a new ProcessorBuilder to help with creating components form a set of configs and factories.
33+
func NewProcessor(cfgs map[component.ID]component.Config, factories map[component.Type]processor.Factory) *ProcessorBuilder {
34+
return &ProcessorBuilder{cfgs: cfgs, factories: factories}
35+
}
36+
37+
// CreateTraces creates a Traces processor based on the settings and config.
38+
func (b *ProcessorBuilder) CreateTraces(ctx context.Context, set processor.Settings, next consumer.Traces) (processor.Traces, error) {
39+
if next == nil {
40+
return nil, errNilNextConsumer
41+
}
42+
cfg, existsCfg := b.cfgs[set.ID]
43+
if !existsCfg {
44+
return nil, fmt.Errorf("processor %q is not configured", set.ID)
45+
}
46+
47+
f, existsFactory := b.factories[set.ID.Type()]
48+
if !existsFactory {
49+
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
50+
}
51+
52+
logStabilityLevel(set.Logger, f.TracesProcessorStability())
53+
return f.CreateTracesProcessor(ctx, set, cfg, next)
54+
}
55+
56+
// CreateMetrics creates a Metrics processor based on the settings and config.
57+
func (b *ProcessorBuilder) CreateMetrics(ctx context.Context, set processor.Settings, next consumer.Metrics) (processor.Metrics, error) {
58+
if next == nil {
59+
return nil, errNilNextConsumer
60+
}
61+
cfg, existsCfg := b.cfgs[set.ID]
62+
if !existsCfg {
63+
return nil, fmt.Errorf("processor %q is not configured", set.ID)
64+
}
65+
66+
f, existsFactory := b.factories[set.ID.Type()]
67+
if !existsFactory {
68+
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
69+
}
70+
71+
logStabilityLevel(set.Logger, f.MetricsProcessorStability())
72+
return f.CreateMetricsProcessor(ctx, set, cfg, next)
73+
}
74+
75+
// CreateLogs creates a Logs processor based on the settings and config.
76+
func (b *ProcessorBuilder) CreateLogs(ctx context.Context, set processor.Settings, next consumer.Logs) (processor.Logs, error) {
77+
if next == nil {
78+
return nil, errNilNextConsumer
79+
}
80+
cfg, existsCfg := b.cfgs[set.ID]
81+
if !existsCfg {
82+
return nil, fmt.Errorf("processor %q is not configured", set.ID)
83+
}
84+
85+
f, existsFactory := b.factories[set.ID.Type()]
86+
if !existsFactory {
87+
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
88+
}
89+
90+
logStabilityLevel(set.Logger, f.LogsProcessorStability())
91+
return f.CreateLogsProcessor(ctx, set, cfg, next)
92+
}
93+
94+
func (b *ProcessorBuilder) Factory(componentType component.Type) component.Factory {
95+
return b.factories[componentType]
96+
}
97+
98+
// NewNopProcessorConfigsAndFactories returns a configuration and factories that allows building a new nop processor.
99+
func NewNopProcessorConfigsAndFactories() (map[component.ID]component.Config, map[component.Type]processor.Factory) {
100+
nopFactory := processortest.NewNopFactory()
101+
configs := map[component.ID]component.Config{
102+
component.NewID(nopType): nopFactory.CreateDefaultConfig(),
103+
}
104+
factories := map[component.Type]processor.Factory{
105+
nopType: nopFactory,
106+
}
107+
108+
return configs, factories
109+
}

0 commit comments

Comments
 (0)