From 0c050463b02846ab06563b635ba09d394d5083b6 Mon Sep 17 00:00:00 2001 From: Edoardo Tenani <526307+endorama@users.noreply.github.com> Date: Mon, 17 Jun 2024 16:52:46 +0200 Subject: [PATCH] add otel tracing support (#182) If APM tracer is not set and an otel tracer is configured use it to trace the Appender.flush execution --------- Co-authored-by: Carson Ip --- appender.go | 40 ++++++++++++++++++++++++++ appender_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++++++++ config.go | 15 +++++++++- go.mod | 4 +-- 4 files changed, 130 insertions(+), 3 deletions(-) diff --git a/appender.go b/appender.go index 9deca3e..615127c 100644 --- a/appender.go +++ b/appender.go @@ -33,7 +33,9 @@ import ( "go.elastic.co/apm/module/apmzap/v2" "go.elastic.co/apm/v2" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -85,6 +87,10 @@ type Appender struct { metrics metrics mu sync.Mutex closed chan struct{} + + // tracer is an OTel tracer, and should not be confused with `a.config.Tracer` + // which is an Elastic APM Tracer. + tracer trace.Tracer } // New returns a new Appender that indexes documents into Elasticsearch. @@ -183,6 +189,11 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) { indexer.runActiveIndexer() return nil }) + + if cfg.Tracer == nil && cfg.TracerProvider != nil { + indexer.tracer = cfg.TracerProvider.Tracer("github.com/elastic/go-docappender.appender") + } + return indexer, nil } @@ -307,6 +318,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests) logger := a.config.Logger + var span trace.Span if a.tracingEnabled() { tx := a.config.Tracer.StartTransaction("docappender.flush", "output") tx.Context.SetLabel("documents", n) @@ -316,6 +328,18 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { // Add trace IDs to logger, to associate any per-item errors // below with the trace. logger = logger.With(apmzap.TraceContext(ctx)...) + } else if a.otelTracingEnabled() { + ctx, span = a.tracer.Start(ctx, "docappender.flush", trace.WithAttributes( + attribute.Int("documents", n), + )) + defer span.End() + + // Add trace IDs to logger, to associate any per-item errors + // below with the trace. + logger = logger.With( + zap.String("traceId", span.SpanContext().TraceID().String()), + zap.String("spanId", span.SpanContext().SpanID().String()), + ) } var flushCtx context.Context @@ -346,6 +370,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { logger.Error("bulk indexing request failed", zap.Error(err)) if a.tracingEnabled() { apm.CaptureError(ctx, err).Send() + } else if a.otelTracingEnabled() && span.IsRecording() { + span.RecordError(err) + span.SetStatus(codes.Error, "bulk indexing request failed") } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { @@ -406,6 +433,10 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { failedCount[info]++ if a.tracingEnabled() { apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send() + } else if a.otelTracingEnabled() && span.IsRecording() { + e := errors.New(info.Error.Reason) + span.RecordError(e) + span.SetStatus(codes.Error, e.Error()) } } for key, count := range failedCount { @@ -450,6 +481,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { zap.Int64("docs_failed", docsFailed), zap.Int64("docs_rate_limited", tooManyRequests), ) + if a.otelTracingEnabled() && span.IsRecording() { + span.SetStatus(codes.Ok, "") + } return nil } @@ -687,6 +721,12 @@ func (a *Appender) tracingEnabled() bool { return a.config.Tracer != nil && a.config.Tracer.Recording() } +// otelTracingEnabled checks whether we should be doing tracing +// using otel tracer. +func (a *Appender) otelTracingEnabled() bool { + return a.tracer != nil +} + // activeLimit returns the value of GOMAXPROCS * cfg.ActiveRatio. Which limits // the maximum number of active indexers to a % of GOMAXPROCS. // diff --git a/appender_test.go b/appender_test.go index 1075f96..c226b3a 100644 --- a/appender_test.go +++ b/appender_test.go @@ -39,9 +39,12 @@ import ( "go.elastic.co/apm/v2/apmtest" "go.elastic.co/apm/v2/model" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -1723,3 +1726,74 @@ func newJSONReader(v any) *bytes.Reader { } return bytes.NewReader(data) } + +func TestAppenderOtelTracing(t *testing.T) { + t.Run("success", func(t *testing.T) { + testTracedAppend(t, 200, sdktrace.Status{ + Code: codes.Ok, + Description: "", + }) + }) + t.Run("failure", func(t *testing.T) { + testTracedAppend(t, 400, sdktrace.Status{ + Code: codes.Error, + Description: "bulk indexing request failed", + }) + }) +} + +func testTracedAppend(t *testing.T, responseCode int, status sdktrace.Status) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(responseCode) + _, result := docappendertest.DecodeBulkRequest(r) + json.NewEncoder(w).Encode(result) + }) + + core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel)) + + exp := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSyncer(exp), + ) + defer tp.Shutdown(context.Background()) + + indexer, err := docappender.New(client, docappender.Config{ + FlushInterval: time.Minute, + Logger: zap.New(core), + // NOTE: Tracer must be nil to use otel tracing + Tracer: nil, + TracerProvider: tp, + }) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + const N = 100 + for i := 0; i < N; i++ { + addMinimalDoc(t, indexer, "logs-foo-testing") + } + + // Closing the indexer flushes enqueued documents. + _ = indexer.Close(context.Background()) + + spans := exp.GetSpans() + assert.NotEmpty(t, spans) + + gotSpan := spans[0] + assert.Equal(t, "docappender.flush", gotSpan.Name) + assert.Equal(t, status, gotSpan.Status) + + for _, a := range gotSpan.Attributes { + if a.Key == "documents" { + assert.Equal(t, int64(N), a.Value.AsInt64()) + } + } + + correlatedLogs := observed.FilterFieldKey("traceId").All() + assert.NotEmpty(t, correlatedLogs) + + log := correlatedLogs[0] + expectedTraceID := gotSpan.SpanContext.TraceID().String() + assert.Equal(t, expectedTraceID, log.ContextMap()["traceId"]) + expectedSpanID := gotSpan.SpanContext.SpanID().String() + assert.Equal(t, expectedSpanID, log.ContextMap()["spanId"]) +} diff --git a/config.go b/config.go index c7a268e..eeac3ce 100644 --- a/config.go +++ b/config.go @@ -23,6 +23,7 @@ import ( "go.elastic.co/apm/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -40,9 +41,21 @@ type Config struct { // Tracer holds an optional apm.Tracer to use for tracing bulk requests // to Elasticsearch. Each bulk request is traced as a transaction. // - // If Tracer is nil, requests will not be traced. + // If Tracer is nil, requests will not be traced. Note however that + // OtelTracerProvider may not be nil, in which case the request will + // be traced by a different tracer. + // + // Deprecated: Tracer is replaced by TracerProvider in a shift towards + // OpenTelemetry. Please use TracerProvider. Tracer *apm.Tracer + // TracerProvider holds an optional otel TracerProvider for tracing + // flush requests. + // + // If TracerProvider is nil, requests will not be traced. + // To use this provider Tracer must be nil. + TracerProvider trace.TracerProvider + // CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression) // to 9 (gzip.BestCompression). Higher values provide greater compression, at a // greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the diff --git a/go.mod b/go.mod index e01d4df..2a2a2fd 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,9 @@ require ( go.elastic.co/fastjson v1.3.0 go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/metric v1.27.0 + go.opentelemetry.io/otel/sdk v1.27.0 go.opentelemetry.io/otel/sdk/metric v1.27.0 + go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.7.0 ) @@ -34,8 +36,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect go.elastic.co/apm/module/apmhttp/v2 v2.6.0 // indirect - go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/trace v1.27.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/sys v0.20.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect