diff --git a/.chloggen/feat_loadbalancing-exporter-queue.yaml b/.chloggen/feat_loadbalancing-exporter-queue.yaml new file mode 100644 index 000000000000..d65a0e1d8d32 --- /dev/null +++ b/.chloggen/feat_loadbalancing-exporter-queue.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adding sending_queue, retry_on_failure and timeout settings to loadbalancing exporter configuration + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35378,16826] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + When switching to top-level sending_queue configuration - users should carefully review queue size + In some rare cases setting top-level queue size to n*queueSize might be not enough to prevent data loss + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/loadbalancingexporter/README.md b/exporter/loadbalancingexporter/README.md index 2b6745341129..eecaa3e389fd 100644 --- a/exporter/loadbalancingexporter/README.md +++ b/exporter/loadbalancingexporter/README.md @@ -48,14 +48,39 @@ This also supports service name based exporting for traces. If you have two or m ## Resilience and scaling considerations -The `loadbalancingexporter` will, irrespective of the chosen resolver (`static`, `dns`, `k8s`), create one exporter per endpoint. The exporter conforms to its published configuration regarding sending queue and retry mechanisms. Importantly, the `loadbalancingexporter` will not attempt to re-route data to a healthy endpoint on delivery failure, and data loss is therefore possible if the exporter's target remains unavailable once redelivery is exhausted. Due consideration needs to be given to the exporter queue and retry configuration when running in a highly elastic environment. +The `loadbalancingexporter` will, irrespective of the chosen resolver (`static`, `dns`, `k8s`), create one `otlp` exporter per endpoint. Each level of exporters, `loadbalancingexporter` itself and all sub-exporters (one per each endpoint), have it's own queue, timeout and retry mechanisms. Importantly, the `loadbalancingexporter`, by default, will NOT attempt to re-route data to a healthy endpoint on delivery failure, because in-memory queue, retry and timeout setting are disabled by default ([more details on queuing, retry and timeout default settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)). -* When using the `static` resolver and a target is unavailable, all the target's load-balanced telemetry will fail to be delivered until either the target is restored or removed from the static list. The same principle applies to the `dns` resolver. +``` + +------------------+ +---------------+ + resiliency options 1 | | | | + -- otlp exporter 1 ------------ backend 1 | + | ---/ | | | | + | ---/ +----|-------------+ +---------------+ + | ---/ | + +-----------------+ ---/ | + | --/ | + | loadbalancing | resiliency options 2 + | exporter | | + | --\ | + +-----------------+ ----\ | + ----\ +----|-------------+ +---------------+ + ----\ | | | | + --- otlp exporter N ------------ backend N | + | | | | + +------------------+ +---------------+ +``` + +* For all types of resolvers (`static`, `dns`, `k8s`) - if one of endpoints is unavailable - first works queue, retry and timeout settings defined for sub-exporters (under `otlp` property). Once redelivery is exhausted on sub-exporter level, and resilience options 1 are enabled - telemetry data returns to `loadbalancingexporter` itself and data redelivery happens according to exporter level queue, retry and timeout settings. +* When using the `static` resolver and all targets are unavailable, all load-balanced telemetry will fail to be delivered until either one or all targets are restored or valid target is added the static list. The same principle applies to the `dns` and `k8s` resolvers, except for endpoints list update which happens automatically. * When using `k8s`, `dns`, and likely future resolvers, topology changes are eventually reflected in the `loadbalancingexporter`. The `k8s` resolver will update more quickly than `dns`, but a window of time in which the true topology doesn't match the view of the `loadbalancingexporter` remains. +* Resiliency options 1 (`timeout`, `retry_on_failure` and `sending_queue` settings in `loadbalancing` section) - are useful for highly elastic environment (like k8s), where list of resolved endpoints frequently changed due to deployments, scale-up or scale-down events. In case of permanent change of list of resolved exporters this options provide capability to re-route data into new set of healthy backends. Disabled by default. +* Resiliency options 1 (`timeout`, `retry_on_failure` and `sending_queue` settings in `otlp` section) - are useful for temporary problems with specific backend, like network flukes. Persistent Queue is NOT supported here as all sub-exporter shares the same `sending_queue` configuration, including `storage`. Enabled by default. + +Unfortunately, data loss is still possible if all of the exporter's targets remains unavailable once redelivery is exhausted. Due consideration needs to be given to the exporter queue and retry configuration when running in a highly elastic environment. ## Configuration -Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor. +Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the exporter. * The `otlp` property configures the template used for building the OTLP exporter. Refer to the OTLP Exporter documentation for information on which options are available. Note that the `endpoint` property should not be set and will be overridden by this exporter with the backend endpoint. * The `resolver` accepts a `static` node, a `dns`, a `k8s` service or `aws_cloud_map`. If all four are specified, an `errMultipleResolversProvided` error will be thrown. @@ -90,6 +115,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th * `traceID`: Routes spans based on their `traceID`. Invalid for metrics. * `metric`: Routes metrics based on their metric name. Invalid for spans. * `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data +* loadbalancing exporter supports set of standard [queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), but they are disable by default to maintain compatibility Simple example @@ -117,11 +143,76 @@ exporters: - backend-2:4317 - backend-3:4317 - backend-4:4317 - # Notice to config a headless service DNS in Kubernetes + # Notice to config a headless service DNS in Kubernetes + # dns: + # hostname: otelcol-headless.observability.svc.cluster.local + +service: + pipelines: + traces: + receivers: + - otlp + processors: [] + exporters: + - loadbalancing + logs: + receivers: + - otlp + processors: [] + exporters: + - loadbalancing +``` + +Persistent queue, retry and timeout usage example: + +```yaml +receivers: + otlp: + protocols: + grpc: + endpoint: localhost:4317 + +processors: + +exporters: + loadbalancing: + timeout: 10s + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + sending_queue: + enabled: true + num_consumers: 2 + queue_size: 1000 + storage: file_storage/otc + routing_key: "service" + protocol: + otlp: + # all options from the OTLP exporter are supported + # except the endpoint + timeout: 1s + sending_queue: + enabled: true + resolver: + static: + hostnames: + - backend-1:4317 + - backend-2:4317 + - backend-3:4317 + - backend-4:4317 + # Notice to config a headless service DNS in Kubernetes # dns: - # hostname: otelcol-headless.observability.svc.cluster.local + # hostname: otelcol-headless.observability.svc.cluster.local + +extensions: + file_storage/otc: + directory: /var/lib/storage/otc + timeout: 10s service: + extensions: [file_storage] pipelines: traces: receivers: @@ -334,7 +425,7 @@ service: ## Metrics -The following metrics are recorded by this processor: +The following metrics are recorded by this exporter: * `otelcol_loadbalancer_num_resolutions` represents the total number of resolutions performed by the resolver specified in the tag `resolver`, split by their outcome (`success=true|false`). For the static resolver, this should always be `1` with the tag `success=true`. * `otelcol_loadbalancer_num_backends` informs how many backends are currently in use. It should always match the number of items specified in the configuration file in case the `static` resolver is used, and should eventually (seconds) catch up with the DNS changes. Note that DNS caches that might exist between the load balancer and the record authority will influence how long it takes for the load balancer to see the change. diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index 8496268de7ed..b9682df16892 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -7,6 +7,8 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/service/servicediscovery/types" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/otlpexporter" ) @@ -30,6 +32,10 @@ const ( // Config defines configuration for the exporter. type Config struct { + TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` + configretry.BackOffConfig `mapstructure:"retry_on_failure"` + QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"` + Protocol Protocol `mapstructure:"protocol"` Resolver ResolverSettings `mapstructure:"resolver"` RoutingKey string `mapstructure:"routing_key"` diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index f1c37e151757..1e10395162c4 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -7,14 +7,21 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry import ( "context" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) +const ( + zapEndpointKey = "endpoint" +) + // NewFactory creates a factory for the exporter. func NewFactory() exporter.Factory { return exporter.NewFactory( @@ -32,20 +39,110 @@ func createDefaultConfig() component.Config { otlpDefaultCfg.Endpoint = "placeholder:4317" return &Config{ + // By default we disable resilience options on loadbalancing exporter level + // to maintain compatibility with workflow in previous versions Protocol: Protocol{ OTLP: *otlpDefaultCfg, }, } } -func createTracesExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) { - return newTracesExporter(params, cfg) +func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { + oCfg := cfg.Protocol.OTLP + oCfg.Endpoint = endpoint + + return oCfg +} + +func buildExporterSettings(params exporter.Settings, endpoint string) exporter.Settings { + // Override child exporter ID to segregate metrics from loadbalancing top level + childName := endpoint + if params.ID.Name() != "" { + childName = fmt.Sprintf("%s_%s", params.ID.Name(), childName) + } + params.ID = component.NewIDWithName(params.ID.Type(), childName) + // Add "endpoint" attribute to child exporter logger to segregate logs from loadbalancing top level + params.Logger = params.Logger.With(zap.String(zapEndpointKey, endpoint)) + + return params +} + +func buildExporterResilienceOptions(options []exporterhelper.Option, cfg *Config) []exporterhelper.Option { + if cfg.TimeoutSettings.Timeout > 0 { + options = append(options, exporterhelper.WithTimeout(cfg.TimeoutSettings)) + } + if cfg.QueueSettings.Enabled { + options = append(options, exporterhelper.WithQueue(cfg.QueueSettings)) + } + if cfg.BackOffConfig.Enabled { + options = append(options, exporterhelper.WithRetry(cfg.BackOffConfig)) + } + + return options +} + +func createTracesExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) { + c := cfg.(*Config) + exporter, err := newTracesExporter(params, cfg) + if err != nil { + return nil, fmt.Errorf("cannot configure loadbalancing traces exporter: %w", err) + } + + options := []exporterhelper.Option{ + exporterhelper.WithStart(exporter.Start), + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithCapabilities(exporter.Capabilities()), + } + + return exporterhelper.NewTraces( + ctx, + params, + cfg, + exporter.ConsumeTraces, + buildExporterResilienceOptions(options, c)..., + ) } -func createLogsExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) { - return newLogsExporter(params, cfg) +func createLogsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Logs, error) { + c := cfg.(*Config) + exporter, err := newLogsExporter(params, cfg) + if err != nil { + return nil, fmt.Errorf("cannot configure loadbalancing logs exporter: %w", err) + } + + options := []exporterhelper.Option{ + exporterhelper.WithStart(exporter.Start), + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithCapabilities(exporter.Capabilities()), + } + + return exporterhelper.NewLogs( + ctx, + params, + cfg, + exporter.ConsumeLogs, + buildExporterResilienceOptions(options, c)..., + ) } -func createMetricsExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Metrics, error) { - return newMetricsExporter(params, cfg) +func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg component.Config) (exporter.Metrics, error) { + c := cfg.(*Config) + exporter, err := newMetricsExporter(params, cfg) + if err != nil { + return nil, fmt.Errorf("cannot configure loadbalancing metrics exporter: %w", err) + } + + options := []exporterhelper.Option{ + exporterhelper.WithStart(exporter.Start), + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithCapabilities(exporter.Capabilities()), + } + + return exporterhelper.NewMetrics( + ctx, + params, + cfg, + exporter.ConsumeMetrics, + buildExporterResilienceOptions(options, c)..., + ) } diff --git a/exporter/loadbalancingexporter/factory_test.go b/exporter/loadbalancingexporter/factory_test.go index 974b13d04bdb..b4d3ff103e5a 100644 --- a/exporter/loadbalancingexporter/factory_test.go +++ b/exporter/loadbalancingexporter/factory_test.go @@ -5,10 +5,22 @@ package loadbalancingexporter import ( "context" + "fmt" + "path/filepath" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.opentelemetry.io/collector/otelcol/otelcoltest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) func TestTracesExporterGetsCreatedWithValidConfiguration(t *testing.T) { @@ -58,3 +70,90 @@ func TestOTLPConfigIsValid(t *testing.T) { // verify assert.NoError(t, otlpCfg.Validate()) } + +func TestBuildExporterConfig(t *testing.T) { + // prepare + factories, err := otelcoltest.NopFactories() + require.NoError(t, err) + + factories.Exporters[metadata.Type] = NewFactory() + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594 + // nolint:staticcheck + cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories) + require.NoError(t, err) + require.NotNil(t, cfg) + + c := cfg.Exporters[component.NewID(metadata.Type)] + require.NotNil(t, c) + + // test + defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config) + exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint") + + // verify + grpcSettings := defaultCfg.ClientConfig + grpcSettings.Endpoint = "the-endpoint" + assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) + + assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) + assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig) + assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) +} + +func TestBuildExporterSettings(t *testing.T) { + // prepare + creationParams := exportertest.NewNopSettings() + testEndpoint := "the-endpoint" + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + creationParams.Logger = zap.New(observedZapCore) + + // test + exporterParams := buildExporterSettings(creationParams, testEndpoint) + exporterParams.Logger.Info("test") + + // verify + expectedID := component.NewIDWithName( + creationParams.ID.Type(), + fmt.Sprintf("%s_%s", creationParams.ID.Name(), testEndpoint), + ) + assert.Equal(t, expectedID, exporterParams.ID) + + allLogs := observedLogs.All() + require.Equal(t, 1, observedLogs.Len()) + assert.Contains(t, + allLogs[0].Context, + zap.String(zapEndpointKey, testEndpoint), + ) +} + +func TestBuildExporterResilienceOptions(t *testing.T) { + t.Run("Shouldn't have resilience options by default", func(t *testing.T) { + o := []exporterhelper.Option{} + cfg := createDefaultConfig().(*Config) + assert.Empty(t, buildExporterResilienceOptions(o, cfg)) + }) + t.Run("Should have timeout option if defined", func(t *testing.T) { + o := []exporterhelper.Option{} + cfg := createDefaultConfig().(*Config) + cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() + + assert.Len(t, buildExporterResilienceOptions(o, cfg), 1) + }) + t.Run("Should have timeout and queue options if defined", func(t *testing.T) { + o := []exporterhelper.Option{} + cfg := createDefaultConfig().(*Config) + cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() + cfg.QueueSettings = exporterhelper.NewDefaultQueueConfig() + + assert.Len(t, buildExporterResilienceOptions(o, cfg), 2) + }) + t.Run("Should have all resilience options if defined", func(t *testing.T) { + o := []exporterhelper.Option{} + cfg := createDefaultConfig().(*Config) + cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() + cfg.QueueSettings = exporterhelper.NewDefaultQueueConfig() + cfg.BackOffConfig = configretry.NewDefaultBackOffConfig() + + assert.Len(t, buildExporterResilienceOptions(o, cfg), 3) + }) +} diff --git a/exporter/loadbalancingexporter/go.mod b/exporter/loadbalancingexporter/go.mod index ae60ad3b3895..b0c0269f9d96 100644 --- a/exporter/loadbalancingexporter/go.mod +++ b/exporter/loadbalancingexporter/go.mod @@ -14,6 +14,7 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.114.0 go.opentelemetry.io/collector/component/componenttest v0.114.0 + go.opentelemetry.io/collector/config/configretry v1.20.0 go.opentelemetry.io/collector/config/configtelemetry v0.114.0 go.opentelemetry.io/collector/confmap v1.20.0 go.opentelemetry.io/collector/consumer v0.114.0 @@ -113,7 +114,6 @@ require ( go.opentelemetry.io/collector/config/configgrpc v0.114.0 // indirect go.opentelemetry.io/collector/config/confignet v1.20.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.20.0 // indirect - go.opentelemetry.io/collector/config/configretry v1.20.0 // indirect go.opentelemetry.io/collector/config/configtls v1.20.0 // indirect go.opentelemetry.io/collector/config/internal v0.114.0 // indirect go.opentelemetry.io/collector/confmap/provider/envprovider v1.20.0 // indirect diff --git a/exporter/loadbalancingexporter/log_exporter.go b/exporter/loadbalancingexporter/log_exporter.go index 8d4e3cf56b37..2d1385b76e34 100644 --- a/exporter/loadbalancingexporter/log_exporter.go +++ b/exporter/loadbalancingexporter/log_exporter.go @@ -41,7 +41,9 @@ func newLogsExporter(params exporter.Settings, cfg component.Config) (*logExport exporterFactory := otlpexporter.NewFactory() cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) - return exporterFactory.CreateLogs(ctx, params, &oCfg) + oParams := buildExporterSettings(params, endpoint) + + return exporterFactory.CreateLogs(ctx, oParams, &oCfg) } lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index f88b8c7557df..45bef77149e3 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -43,7 +43,9 @@ func newMetricsExporter(params exporter.Settings, cfg component.Config) (*metric exporterFactory := otlpexporter.NewFactory() cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) - return exporterFactory.CreateMetrics(ctx, params, &oCfg) + oParams := buildExporterSettings(params, endpoint) + + return exporterFactory.CreateMetrics(ctx, oParams, &oCfg) } lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index 5faaf284ae7e..1013dcda2a5e 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -24,14 +24,11 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/otlpexporter" - "go.opentelemetry.io/collector/otelcol/otelcoltest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" "gopkg.in/yaml.v2" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) @@ -673,35 +670,6 @@ func TestConsumeMetricsUnexpectedExporterType(t *testing.T) { assert.EqualError(t, res, fmt.Sprintf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", newNopMockExporter())) } -func TestBuildExporterConfigUnknown(t *testing.T) { - // prepare - factories, err := otelcoltest.NopFactories() - require.NoError(t, err) - - factories.Exporters[metadata.Type] = NewFactory() - // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594 - // nolint:staticcheck - cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories) - require.NoError(t, err) - require.NotNil(t, cfg) - - c := cfg.Exporters[component.NewID(metadata.Type)] - require.NotNil(t, c) - - // test - defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config) - exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint") - - // verify - grpcSettings := defaultCfg.ClientConfig - grpcSettings.Endpoint = "the-endpoint" - assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) - - assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) - assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig) - assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) -} - func TestBatchWithTwoMetrics(t *testing.T) { ts, tb := getTelemetryAssets(t) sink := new(consumertest.MetricsSink) diff --git a/exporter/loadbalancingexporter/testdata/config.yaml b/exporter/loadbalancingexporter/testdata/config.yaml index da1d51818e59..64a0271338b3 100644 --- a/exporter/loadbalancingexporter/testdata/config.yaml +++ b/exporter/loadbalancingexporter/testdata/config.yaml @@ -1,6 +1,6 @@ loadbalancing: protocol: - # the OTLP exporter configuration. "endpoint" values will be ignored + # the OTLP exporter configuration "endpoint" values will be ignored otlp: timeout: 1s @@ -38,3 +38,12 @@ loadbalancing/4: namespace: cloudmap-1 service_name: service-1 port: 4319 + +loadbalancing/5: + # the OTLP exporter configuration "sending_queue" values will be ignored + sending_queue: + enabled: true + protocol: + otlp: + sending_queue: + enabled: false diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 3e335e8a9e14..e6fb9647d977 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -45,7 +45,9 @@ func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceEx exporterFactory := otlpexporter.NewFactory() cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) - return exporterFactory.CreateTraces(ctx, params, &oCfg) + oParams := buildExporterSettings(params, endpoint) + + return exporterFactory.CreateTraces(ctx, oParams, &oCfg) } lb, err := newLoadBalancer(params.Logger, cfg, cfFunc, telemetry) @@ -69,12 +71,6 @@ func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceEx return &traceExporter, nil } -func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { - oCfg := cfg.Protocol.OTLP - oCfg.Endpoint = endpoint - return oCfg -} - func (e *traceExporterImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index d48aeb462bcf..8751c83e8986 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -9,7 +9,6 @@ import ( "fmt" "math/rand" "net" - "path/filepath" "sync" "sync/atomic" "testing" @@ -23,13 +22,9 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/exporter/otlpexporter" - "go.opentelemetry.io/collector/otelcol/otelcoltest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" - - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" ) func TestNewTracesExporter(t *testing.T) { @@ -349,35 +344,6 @@ func TestConsumeTracesUnexpectedExporterType(t *testing.T) { assert.EqualError(t, res, fmt.Sprintf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", newNopMockExporter())) } -func TestBuildExporterConfig(t *testing.T) { - // prepare - factories, err := otelcoltest.NopFactories() - require.NoError(t, err) - - factories.Exporters[metadata.Type] = NewFactory() - // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594 - // nolint:staticcheck - cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "test-build-exporter-config.yaml"), factories) - require.NoError(t, err) - require.NotNil(t, cfg) - - c := cfg.Exporters[component.NewID(metadata.Type)] - require.NotNil(t, c) - - // test - defaultCfg := otlpexporter.NewFactory().CreateDefaultConfig().(*otlpexporter.Config) - exporterCfg := buildExporterConfig(c.(*Config), "the-endpoint") - - // verify - grpcSettings := defaultCfg.ClientConfig - grpcSettings.Endpoint = "the-endpoint" - assert.Equal(t, grpcSettings, exporterCfg.ClientConfig) - - assert.Equal(t, defaultCfg.TimeoutConfig, exporterCfg.TimeoutConfig) - assert.Equal(t, defaultCfg.QueueConfig, exporterCfg.QueueConfig) - assert.Equal(t, defaultCfg.RetryConfig, exporterCfg.RetryConfig) -} - func TestBatchWithTwoTraces(t *testing.T) { ts, tb := getTelemetryAssets(t) sink := new(consumertest.TracesSink)