Skip to content

Commit

Permalink
add otel tracing support (#182)
Browse files Browse the repository at this point in the history
If APM tracer is not set and an otel tracer is configured use it
to trace the Appender.flush execution

---------

Co-authored-by: Carson Ip <carsonip@users.noreply.github.com>
  • Loading branch information
endorama and carsonip authored Jun 17, 2024
1 parent 044911f commit 0c05046
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 3 deletions.
40 changes: 40 additions & 0 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
"go.elastic.co/apm/module/apmzap/v2"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -85,6 +87,10 @@ type Appender struct {
metrics metrics
mu sync.Mutex
closed chan struct{}

// tracer is an OTel tracer, and should not be confused with `a.config.Tracer`
// which is an Elastic APM Tracer.
tracer trace.Tracer
}

// New returns a new Appender that indexes documents into Elasticsearch.
Expand Down Expand Up @@ -183,6 +189,11 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) {
indexer.runActiveIndexer()
return nil
})

if cfg.Tracer == nil && cfg.TracerProvider != nil {
indexer.tracer = cfg.TracerProvider.Tracer("github.com/elastic/go-docappender.appender")
}

return indexer, nil
}

Expand Down Expand Up @@ -307,6 +318,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests)

logger := a.config.Logger
var span trace.Span
if a.tracingEnabled() {
tx := a.config.Tracer.StartTransaction("docappender.flush", "output")
tx.Context.SetLabel("documents", n)
Expand All @@ -316,6 +328,18 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
// Add trace IDs to logger, to associate any per-item errors
// below with the trace.
logger = logger.With(apmzap.TraceContext(ctx)...)
} else if a.otelTracingEnabled() {
ctx, span = a.tracer.Start(ctx, "docappender.flush", trace.WithAttributes(
attribute.Int("documents", n),
))
defer span.End()

// Add trace IDs to logger, to associate any per-item errors
// below with the trace.
logger = logger.With(
zap.String("traceId", span.SpanContext().TraceID().String()),
zap.String("spanId", span.SpanContext().SpanID().String()),
)
}

var flushCtx context.Context
Expand Down Expand Up @@ -346,6 +370,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
logger.Error("bulk indexing request failed", zap.Error(err))
if a.tracingEnabled() {
apm.CaptureError(ctx, err).Send()
} else if a.otelTracingEnabled() && span.IsRecording() {
span.RecordError(err)
span.SetStatus(codes.Error, "bulk indexing request failed")
}

if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
Expand Down Expand Up @@ -406,6 +433,10 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
failedCount[info]++
if a.tracingEnabled() {
apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send()
} else if a.otelTracingEnabled() && span.IsRecording() {
e := errors.New(info.Error.Reason)
span.RecordError(e)
span.SetStatus(codes.Error, e.Error())
}
}
for key, count := range failedCount {
Expand Down Expand Up @@ -450,6 +481,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
zap.Int64("docs_failed", docsFailed),
zap.Int64("docs_rate_limited", tooManyRequests),
)
if a.otelTracingEnabled() && span.IsRecording() {
span.SetStatus(codes.Ok, "")
}
return nil
}

Expand Down Expand Up @@ -687,6 +721,12 @@ func (a *Appender) tracingEnabled() bool {
return a.config.Tracer != nil && a.config.Tracer.Recording()
}

// otelTracingEnabled checks whether we should be doing tracing
// using otel tracer.
func (a *Appender) otelTracingEnabled() bool {
return a.tracer != nil
}

// activeLimit returns the value of GOMAXPROCS * cfg.ActiveRatio. Which limits
// the maximum number of active indexers to a % of GOMAXPROCS.
//
Expand Down
74 changes: 74 additions & 0 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ import (
"go.elastic.co/apm/v2/apmtest"
"go.elastic.co/apm/v2/model"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -1723,3 +1726,74 @@ func newJSONReader(v any) *bytes.Reader {
}
return bytes.NewReader(data)
}

func TestAppenderOtelTracing(t *testing.T) {
t.Run("success", func(t *testing.T) {
testTracedAppend(t, 200, sdktrace.Status{
Code: codes.Ok,
Description: "",
})
})
t.Run("failure", func(t *testing.T) {
testTracedAppend(t, 400, sdktrace.Status{
Code: codes.Error,
Description: "bulk indexing request failed",
})
})
}

func testTracedAppend(t *testing.T, responseCode int, status sdktrace.Status) {
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(responseCode)
_, result := docappendertest.DecodeBulkRequest(r)
json.NewEncoder(w).Encode(result)
})

core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))

exp := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exp),
)
defer tp.Shutdown(context.Background())

indexer, err := docappender.New(client, docappender.Config{
FlushInterval: time.Minute,
Logger: zap.New(core),
// NOTE: Tracer must be nil to use otel tracing
Tracer: nil,
TracerProvider: tp,
})
require.NoError(t, err)
defer indexer.Close(context.Background())

const N = 100
for i := 0; i < N; i++ {
addMinimalDoc(t, indexer, "logs-foo-testing")
}

// Closing the indexer flushes enqueued documents.
_ = indexer.Close(context.Background())

spans := exp.GetSpans()
assert.NotEmpty(t, spans)

gotSpan := spans[0]
assert.Equal(t, "docappender.flush", gotSpan.Name)
assert.Equal(t, status, gotSpan.Status)

for _, a := range gotSpan.Attributes {
if a.Key == "documents" {
assert.Equal(t, int64(N), a.Value.AsInt64())
}
}

correlatedLogs := observed.FilterFieldKey("traceId").All()
assert.NotEmpty(t, correlatedLogs)

log := correlatedLogs[0]
expectedTraceID := gotSpan.SpanContext.TraceID().String()
assert.Equal(t, expectedTraceID, log.ContextMap()["traceId"])
expectedSpanID := gotSpan.SpanContext.SpanID().String()
assert.Equal(t, expectedSpanID, log.ContextMap()["spanId"])
}
15 changes: 14 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand All @@ -40,9 +41,21 @@ type Config struct {
// Tracer holds an optional apm.Tracer to use for tracing bulk requests
// to Elasticsearch. Each bulk request is traced as a transaction.
//
// If Tracer is nil, requests will not be traced.
// If Tracer is nil, requests will not be traced. Note however that
// OtelTracerProvider may not be nil, in which case the request will
// be traced by a different tracer.
//
// Deprecated: Tracer is replaced by TracerProvider in a shift towards
// OpenTelemetry. Please use TracerProvider.
Tracer *apm.Tracer

// TracerProvider holds an optional otel TracerProvider for tracing
// flush requests.
//
// If TracerProvider is nil, requests will not be traced.
// To use this provider Tracer must be nil.
TracerProvider trace.TracerProvider

// CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression)
// to 9 (gzip.BestCompression). Higher values provide greater compression, at a
// greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ require (
go.elastic.co/fastjson v1.3.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
)
Expand All @@ -34,8 +36,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
go.elastic.co/apm/module/apmhttp/v2 v2.6.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/sys v0.20.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down

0 comments on commit 0c05046

Please sign in to comment.