Skip to content

[exporter/azuremonitor] support sending to multiple azuremonitor exporter #36700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/34188.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: azuremonitorexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: support sending to multiple azure monitor exporters

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34188]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/azuremonitorexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The following settings can be optionally configured:
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 1000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false`
- `storage` (default = `none`): When set, enables persistence and uses the component specified as a storage extension for the persistent queue
- `shutdown_timeout` (default = 1s): Timeout to wait for graceful shutdown. Once exceeded, the component will shut down forcibly, dropping any element in queue.

Example:

Expand Down
141 changes: 141 additions & 0 deletions exporter/azuremonitorexporter/azuremonitor_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter"

import (
"context"

"github.com/microsoft/ApplicationInsights-Go/appinsights"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

type azureMonitorExporter struct {
config *Config
transportChannel appinsights.TelemetryChannel
logger *zap.Logger
packer *metricPacker
}

func (exporter *azureMonitorExporter) Start(_ context.Context, _ component.Host) (err error) {
connectionVars, err := parseConnectionString(exporter.config)
if err != nil {
return
}

exporter.config.InstrumentationKey = configopaque.String(connectionVars.InstrumentationKey)
exporter.config.Endpoint = connectionVars.IngestionURL
telemetryConfiguration := appinsights.NewTelemetryConfiguration(connectionVars.InstrumentationKey)
telemetryConfiguration.EndpointUrl = connectionVars.IngestionURL
telemetryConfiguration.MaxBatchSize = exporter.config.MaxBatchSize
telemetryConfiguration.MaxBatchInterval = exporter.config.MaxBatchInterval

telemetryClient := appinsights.NewTelemetryClientFromConfig(telemetryConfiguration)
exporter.transportChannel = telemetryClient.Channel()

return nil
}

func (exporter *azureMonitorExporter) Shutdown(_ context.Context) (err error) {
if exporter.transportChannel != nil {
exporter.transportChannel.Close(exporter.config.ShutdownTimeout)
}

return nil
}

func (exporter *azureMonitorExporter) consumeLogs(_ context.Context, logData plog.Logs) error {
resourceLogs := logData.ResourceLogs()
logPacker := newLogPacker(exporter.logger)

for i := 0; i < resourceLogs.Len(); i++ {
scopeLogs := resourceLogs.At(i).ScopeLogs()
resource := resourceLogs.At(i).Resource()
for j := 0; j < scopeLogs.Len(); j++ {
logs := scopeLogs.At(j).LogRecords()
scope := scopeLogs.At(j).Scope()
for k := 0; k < logs.Len(); k++ {
envelope := logPacker.LogRecordToEnvelope(logs.At(k), resource, scope)
envelope.IKey = string(exporter.config.InstrumentationKey)
exporter.transportChannel.Send(envelope)
}
}
}
// Flush the transport channel to force the telemetry to be sent
exporter.transportChannel.Flush()
return nil
}

func (exporter *azureMonitorExporter) consumeMetrics(_ context.Context, metricData pmetric.Metrics) error {
resourceMetrics := metricData.ResourceMetrics()

for i := 0; i < resourceMetrics.Len(); i++ {
scopeMetrics := resourceMetrics.At(i).ScopeMetrics()
resource := resourceMetrics.At(i).Resource()
for j := 0; j < scopeMetrics.Len(); j++ {
metrics := scopeMetrics.At(j).Metrics()
scope := scopeMetrics.At(j).Scope()
for k := 0; k < metrics.Len(); k++ {
for _, envelope := range exporter.packer.MetricToEnvelopes(metrics.At(k), resource, scope) {
envelope.IKey = string(exporter.config.InstrumentationKey)
exporter.transportChannel.Send(envelope)
}
}
}
}

// Flush the transport channel to force the telemetry to be sent
exporter.transportChannel.Flush()
return nil
}

type traceVisitor struct {
processed int
err error
exporter *azureMonitorExporter
}

// Called for each tuple of Resource, InstrumentationScope, and Span
func (v *traceVisitor) visit(
resource pcommon.Resource,
scope pcommon.InstrumentationScope,
span ptrace.Span,
) (ok bool) {
envelopes, err := spanToEnvelopes(resource, scope, span, v.exporter.config.SpanEventsEnabled, v.exporter.logger)
if err != nil {
// record the error and short-circuit
v.err = consumererror.NewPermanent(err)
return false
}

for _, envelope := range envelopes {
envelope.IKey = string(v.exporter.config.InstrumentationKey)

// This is a fire and forget operation
v.exporter.transportChannel.Send(envelope)
}

// Flush the transport channel to force the telemetry to be sent
v.exporter.transportChannel.Flush()
v.processed++

return true
}

func (exporter *azureMonitorExporter) consumeTraces(_ context.Context, traceData ptrace.Traces) error {
spanCount := traceData.SpanCount()
if spanCount == 0 {
return nil
}

visitor := &traceVisitor{exporter: exporter}
accept(traceData, visitor)
return visitor.err
}
11 changes: 0 additions & 11 deletions exporter/azuremonitorexporter/channels.go

This file was deleted.

1 change: 1 addition & 0 deletions exporter/azuremonitorexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ type Config struct {
MaxBatchSize int `mapstructure:"maxbatchsize"`
MaxBatchInterval time.Duration `mapstructure:"maxbatchinterval"`
SpanEventsEnabled bool `mapstructure:"spaneventsenabled"`
ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout"`
}
1 change: 1 addition & 0 deletions exporter/azuremonitorexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 10,
StorageID: &disk,
},
ShutdownTimeout: 2 * time.Second,
},
},
}
Expand Down
122 changes: 66 additions & 56 deletions exporter/azuremonitorexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,29 @@ package azuremonitorexporter // import "github.com/open-telemetry/opentelemetry-
import (
"context"
"errors"
"sync"
"time"

"github.com/microsoft/ApplicationInsights-Go/appinsights"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
)

var errUnexpectedConfigurationType = errors.New("failed to cast configuration to Azure Monitor Config")
var (
errUnexpectedConfigurationType = errors.New("failed to cast configuration to Azure Monitor Config")
exporters = sharedcomponent.NewSharedComponents()
)

// NewFactory returns a factory for Azure Monitor exporter.
func NewFactory() exporter.Factory {
f := &factory{}
f := &factory{
loggerInitOnce: sync.Once{},
}
return exporter.NewFactory(
metadata.Type,
createDefaultConfig,
Expand All @@ -35,7 +41,7 @@ func NewFactory() exporter.Factory {

// Implements the interface from go.opentelemetry.io/collector/exporter/factory.go
type factory struct {
tChannel transportChannel
loggerInitOnce sync.Once
}

func createDefaultConfig() component.Config {
Expand All @@ -44,95 +50,99 @@ func createDefaultConfig() component.Config {
MaxBatchInterval: 10 * time.Second,
SpanEventsEnabled: false,
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
ShutdownTimeout: 1 * time.Second,
}
}

func (f *factory) createTracesExporter(
_ context.Context,
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Traces, error) {
exporterConfig, ok := cfg.(*Config)

f.initLogger(set.Logger)
config, ok := cfg.(*Config)
if !ok {
return nil, errUnexpectedConfigurationType
}

tc, errInstrumentationKeyOrConnectionString := f.getTransportChannel(exporterConfig, set.Logger)
if errInstrumentationKeyOrConnectionString != nil {
return nil, errInstrumentationKeyOrConnectionString
}

return newTracesExporter(exporterConfig, tc, set)
ame := getOrCreateAzureMonitorExporter(cfg, set)
origComp := ame.Unwrap().(*azureMonitorExporter)

return exporterhelper.NewTraces(
ctx,
set,
cfg,
origComp.consumeTraces,
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithStart(ame.Start),
exporterhelper.WithShutdown(ame.Shutdown))
}

func (f *factory) createLogsExporter(
_ context.Context,
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Logs, error) {
exporterConfig, ok := cfg.(*Config)

f.initLogger(set.Logger)
config, ok := cfg.(*Config)
if !ok {
return nil, errUnexpectedConfigurationType
}

tc, errInstrumentationKeyOrConnectionString := f.getTransportChannel(exporterConfig, set.Logger)
if errInstrumentationKeyOrConnectionString != nil {
return nil, errInstrumentationKeyOrConnectionString
}

return newLogsExporter(exporterConfig, tc, set)
ame := getOrCreateAzureMonitorExporter(cfg, set)
origComp := ame.Unwrap().(*azureMonitorExporter)

return exporterhelper.NewLogs(
ctx,
set,
cfg,
origComp.consumeLogs,
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithStart(ame.Start),
exporterhelper.WithShutdown(ame.Shutdown))
}

func (f *factory) createMetricsExporter(
_ context.Context,
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Metrics, error) {
exporterConfig, ok := cfg.(*Config)

f.initLogger(set.Logger)
config, ok := cfg.(*Config)
if !ok {
return nil, errUnexpectedConfigurationType
}

tc, errInstrumentationKeyOrConnectionString := f.getTransportChannel(exporterConfig, set.Logger)
if errInstrumentationKeyOrConnectionString != nil {
return nil, errInstrumentationKeyOrConnectionString
}

return newMetricsExporter(exporterConfig, tc, set)
ame := getOrCreateAzureMonitorExporter(cfg, set)
origComp := ame.Unwrap().(*azureMonitorExporter)

return exporterhelper.NewMetrics(
ctx,
set,
cfg,
origComp.consumeMetrics,
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithStart(ame.Start),
exporterhelper.WithShutdown(ame.Shutdown))
}

// Configures the transport channel.
// This method is not thread-safe
func (f *factory) getTransportChannel(exporterConfig *Config, logger *zap.Logger) (transportChannel, error) {
// The default transport channel uses the default send mechanism from the AppInsights telemetry client.
// This default channel handles batching, appropriate retries, and is backed by memory.
if f.tChannel == nil {
connectionVars, err := parseConnectionString(exporterConfig)
if err != nil {
return nil, err
func getOrCreateAzureMonitorExporter(cfg component.Config, set exporter.Settings) *sharedcomponent.SharedComponent {
conf := cfg.(*Config)
ame := exporters.GetOrAdd(set.ID, func() component.Component {
return &azureMonitorExporter{
config: conf,
logger: set.Logger,
packer: newMetricPacker(set.Logger),
}
})

exporterConfig.InstrumentationKey = configopaque.String(connectionVars.InstrumentationKey)
exporterConfig.Endpoint = connectionVars.IngestionURL
telemetryConfiguration := appinsights.NewTelemetryConfiguration(string(exporterConfig.InstrumentationKey))
telemetryConfiguration.EndpointUrl = exporterConfig.Endpoint
telemetryConfiguration.MaxBatchSize = exporterConfig.MaxBatchSize
telemetryConfiguration.MaxBatchInterval = exporterConfig.MaxBatchInterval
telemetryClient := appinsights.NewTelemetryClientFromConfig(telemetryConfiguration)

f.tChannel = telemetryClient.Channel()
return ame
}

// Don't even bother enabling the AppInsights diagnostics listener unless debug logging is enabled
func (f *factory) initLogger(logger *zap.Logger) {
f.loggerInitOnce.Do(func() {
if checkedEntry := logger.Check(zap.DebugLevel, ""); checkedEntry != nil {
appinsights.NewDiagnosticsMessageListener(func(msg string) error {
logger.Debug(msg)
return nil
})
}
}

return f.tChannel, nil
})
}
Loading