Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions tracer/src/Datadog.Trace/Agent/AgentWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd stat

public Task<bool> Ping() => _api.SendTracesAsync(EmptyPayload, 0, false, 0, 0);

public void WriteTrace(ArraySegment<Span> trace)
public void WriteTrace(in SpanCollection trace)
{
if (trace.Count == 0)
{
Expand All @@ -130,7 +130,7 @@ public void WriteTrace(ArraySegment<Span> trace)
}
else
{
_pendingTraces.Enqueue(new WorkItem(trace));
_pendingTraces.Enqueue(new WorkItem(in trace));

if (!_serializationMutex.IsSet)
{
Expand Down Expand Up @@ -374,7 +374,7 @@ async Task InternalBufferFlush()
}
}

private void SerializeTrace(ArraySegment<Span> spans)
private void SerializeTrace(in SpanCollection spans)
{
// Declaring as inline method because only safe to invoke in the context of SerializeTrace
SpanBuffer SwapBuffers()
Expand Down Expand Up @@ -406,26 +406,26 @@ SpanBuffer SwapBuffers()
}

int? chunkSamplingPriority = null;
var chunk = spans;
if (CanComputeStats)
{
spans = _statsAggregator?.ProcessTrace(spans) ?? spans;
bool shouldSendTrace = _statsAggregator?.ShouldKeepTrace(spans) ?? true;
_statsAggregator?.AddRange(spans);
chunk = _statsAggregator?.ProcessTrace(in chunk) ?? chunk;
bool shouldSendTrace = _statsAggregator?.ShouldKeepTrace(in chunk) ?? true;
_statsAggregator?.AddRange(in chunk);
var singleSpanSamplingSpans = new List<Span>(); // TODO maybe we can store this from above?

for (var i = 0; i < spans.Count; i++)
foreach (var span in chunk)
{
var index = i + spans.Offset;
if (spans.Array?[index].GetMetric(Metrics.SingleSpanSampling.SamplingMechanism) is not null)
if (span.GetMetric(Metrics.SingleSpanSampling.SamplingMechanism) is not null)
{
singleSpanSamplingSpans.Add(spans.Array[index]);
singleSpanSamplingSpans.Add(span);
}
}

if (shouldSendTrace)
{
TelemetryFactory.Metrics.RecordCountTraceChunkEnqueued(MetricTags.TraceChunkEnqueueReason.P0Keep);
TelemetryFactory.Metrics.RecordCountSpanEnqueuedForSerialization(MetricTags.SpanEnqueueReason.P0Keep, spans.Count);
TelemetryFactory.Metrics.RecordCountSpanEnqueuedForSerialization(MetricTags.SpanEnqueueReason.P0Keep, chunk.Count);
}
else
{
Expand All @@ -435,8 +435,8 @@ SpanBuffer SwapBuffers()
if (singleSpanSamplingSpans.Count == 0)
{
Interlocked.Increment(ref _droppedP0Traces);
Interlocked.Add(ref _droppedP0Spans, spans.Count);
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.P0Drop, spans.Count);
Interlocked.Add(ref _droppedP0Spans, chunk.Count);
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.P0Drop, chunk.Count);
return;
}
else
Expand All @@ -445,11 +445,11 @@ SpanBuffer SwapBuffers()
// this will override the TraceContext sampling priority when we do a SpanBuffer.TryWrite
chunkSamplingPriority = SamplingPriorityValues.UserKeep;
Interlocked.Increment(ref _droppedP0Traces); // increment since we are sampling out the entire trace
var spansDropped = spans.Count - singleSpanSamplingSpans.Count;
var spansDropped = chunk.Count - singleSpanSamplingSpans.Count;
Interlocked.Add(ref _droppedP0Spans, spansDropped);
spans = new ArraySegment<Span>(singleSpanSamplingSpans.ToArray());
chunk = new SpanCollection(singleSpanSamplingSpans.ToArray(), singleSpanSamplingSpans.Count);
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.P0Drop, spansDropped);
TelemetryFactory.Metrics.RecordCountSpanEnqueuedForSerialization(MetricTags.SpanEnqueueReason.SingleSpanSampling, spans.Count);
TelemetryFactory.Metrics.RecordCountSpanEnqueuedForSerialization(MetricTags.SpanEnqueueReason.SingleSpanSampling, chunk.Count);
TelemetryFactory.Metrics.RecordCountTracePartialFlush(MetricTags.PartialFlushReason.SingleSpanIngestion);
}
}
Expand All @@ -458,11 +458,11 @@ SpanBuffer SwapBuffers()
{
// not using stats, so trace always kept
TelemetryFactory.Metrics.RecordCountTraceChunkEnqueued(MetricTags.TraceChunkEnqueueReason.Default);
TelemetryFactory.Metrics.RecordCountSpanEnqueuedForSerialization(MetricTags.SpanEnqueueReason.Default, spans.Count);
TelemetryFactory.Metrics.RecordCountSpanEnqueuedForSerialization(MetricTags.SpanEnqueueReason.Default, chunk.Count);
}

// Add the current keep rate to trace
if (spans.Array?[spans.Offset].Context.TraceContext is { } trace)
if (chunk.RootSpan?.Context.TraceContext is { } trace)
{
trace.TracesKeepRate = _traceKeepRateCalculator.GetKeepRate();
}
Expand All @@ -471,7 +471,7 @@ SpanBuffer SwapBuffers()
// This allows the serialization thread to keep doing its job while a buffer is being flushed
var buffer = _activeBuffer;

var writeStatus = buffer.TryWrite(spans, ref _temporaryBuffer, chunkSamplingPriority);
var writeStatus = buffer.TryWrite(in chunk, ref _temporaryBuffer, chunkSamplingPriority);

if (writeStatus == SpanBuffer.WriteStatus.Success)
{
Expand All @@ -482,7 +482,7 @@ SpanBuffer SwapBuffers()
if (writeStatus == SpanBuffer.WriteStatus.Overflow)
{
// The trace is too big for the buffer, no point in trying again
DropTrace(spans);
DropTrace(chunk.Count);
return;
}

Expand All @@ -494,28 +494,28 @@ SpanBuffer SwapBuffers()
// One buffer is full, request an eager flush
RequestFlush();

if (buffer.TryWrite(spans, ref _temporaryBuffer, chunkSamplingPriority) == SpanBuffer.WriteStatus.Success)
if (buffer.TryWrite(in chunk, ref _temporaryBuffer, chunkSamplingPriority) == SpanBuffer.WriteStatus.Success)
{
// Serialization to the secondary buffer succeeded
return;
}
}

// All the buffers are full :( drop the trace
DropTrace(spans);
DropTrace(chunk.Count);
}

private void DropTrace(ArraySegment<Span> spans)
private void DropTrace(int count)
{
Interlocked.Increment(ref _droppedTraces);
_traceKeepRateCalculator.IncrementDrops(1);
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.OverfullBuffer, spans.Count);
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.OverfullBuffer, count);
TelemetryFactory.Metrics.RecordCountTraceChunkDropped(MetricTags.DropReason.OverfullBuffer);

if (_statsd != null)
{
_statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
_statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
_statsd.Increment(TracerMetricNames.Queue.DroppedSpans, count);
}
}

Expand Down Expand Up @@ -577,10 +577,10 @@ private void SerializeTracesLoop()

private readonly struct WorkItem
{
public readonly ArraySegment<Span> Trace;
public readonly SpanCollection Trace;
public readonly Action Callback;

public WorkItem(ArraySegment<Span> trace)
public WorkItem(in SpanCollection trace)
{
Trace = trace;
Callback = null;
Expand Down
2 changes: 1 addition & 1 deletion tracer/src/Datadog.Trace/Agent/IAgentWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Datadog.Trace.Agent
{
internal interface IAgentWriter
{
void WriteTrace(ArraySegment<Span> trace);
void WriteTrace(in SpanCollection trace);

Task<bool> Ping();

Expand Down
8 changes: 5 additions & 3 deletions tracer/src/Datadog.Trace/Agent/IStatsAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.Threading.Tasks;
using Datadog.Trace.SourceGenerators;

namespace Datadog.Trace.Agent
{
Expand All @@ -30,22 +31,23 @@ internal interface IStatsAggregator
/// Receives an array of spans and computes stats points for them.
/// </summary>
/// <param name="spans">The array of spans to process.</param>
[TestingOnly]
void Add(params Span[] spans);

/// <summary>
/// Receives an array of spans and computes stats points for them.
/// </summary>
/// <param name="spans">The ArraySegment of spans to process.</param>
void AddRange(ArraySegment<Span> spans);
void AddRange(in SpanCollection spans);

/// <summary>
/// Runs a series of samplers over the entire trace chunk
/// </summary>
/// <param name="spans">The trace chunk to sample</param>
/// <returns>True if the trace chunk should be sampled, false otherwise.</returns>
bool ShouldKeepTrace(ArraySegment<Span> spans);
bool ShouldKeepTrace(in SpanCollection spans);

ArraySegment<Span> ProcessTrace(ArraySegment<Span> trace);
SpanCollection ProcessTrace(in SpanCollection trace);

Task DisposeAsync();
}
Expand Down
42 changes: 28 additions & 14 deletions tracer/src/Datadog.Trace/Agent/MessagePack/TraceChunkModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Threading;
using Datadog.Trace.Configuration;
using Datadog.Trace.SourceGenerators;
using Datadog.Trace.Tagging;

namespace Datadog.Trace.Agent.MessagePack;
Expand All @@ -20,7 +21,7 @@ namespace Datadog.Trace.Agent.MessagePack;
internal readonly struct TraceChunkModel
{
// for small trace chunks, use the ArraySegment<Span> copy directly, no heap allocations
private readonly ArraySegment<Span> _spans;
private readonly SpanCollection _spans;

// for large trace chunks, use a HashSet<ulong> instead of iterating the array.
// there are 3 possible states:
Expand Down Expand Up @@ -75,8 +76,8 @@ internal readonly struct TraceChunkModel
/// <param name="spans">The spans that will be within this <see cref="TraceChunkModel"/>.</param>
/// <param name="samplingPriority">Optional sampling priority to override the <see cref="TraceContext"/> sampling priority.</param>
/// <param name="isFirstChunkInPayload">Indicates if this is the first trace chunk being written to the output buffer.</param>
public TraceChunkModel(in ArraySegment<Span> spans, int? samplingPriority = null, bool isFirstChunkInPayload = false)
: this(spans, TraceContext.GetTraceContext(spans), samplingPriority, isFirstChunkInPayload)
public TraceChunkModel(in SpanCollection spans, int? samplingPriority = null, bool isFirstChunkInPayload = false)
: this(in spans, TraceContext.GetTraceContext(in spans), samplingPriority, isFirstChunkInPayload)
{
// since all we have is an array of spans, use the trace context from the first span
// to get the other values we need (sampling priority, origin, trace tags, etc) for now.
Expand All @@ -85,8 +86,8 @@ public TraceChunkModel(in ArraySegment<Span> spans, int? samplingPriority = null
}

// used only to chain constructors
private TraceChunkModel(in ArraySegment<Span> spans, TraceContext? traceContext, int? samplingPriority, bool isFirstChunkInPayload)
: this(spans, traceContext?.RootSpan)
private TraceChunkModel(in SpanCollection spans, TraceContext? traceContext, int? samplingPriority, bool isFirstChunkInPayload)
: this(in spans, traceContext?.RootSpan)
{
// sampling decision override takes precedence over TraceContext.SamplingPriority
SamplingPriority = samplingPriority;
Expand Down Expand Up @@ -128,8 +129,8 @@ private TraceChunkModel(in ArraySegment<Span> spans, TraceContext? traceContext,
}
}

// used in tests
internal TraceChunkModel(in ArraySegment<Span> spans, Span? localRootSpan)
[TestingAndPrivateOnly]
internal TraceChunkModel(in SpanCollection spans, Span? localRootSpan)
{
_spans = spans;

Expand Down Expand Up @@ -161,7 +162,7 @@ public SpanModel GetSpanModel(int spanIndex)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(spanIndex));
}

var span = _spans.Array![_spans.Offset + spanIndex];
var span = _spans[spanIndex];
var parentId = span.Context.ParentId ?? 0;
bool isLocalRoot = parentId is 0 || span.SpanId == LocalRootSpanId;
bool isFirstSpan = spanIndex == 0;
Expand Down Expand Up @@ -225,9 +226,9 @@ private bool Contains(ulong spanId, int startIndex)

if (hashSet.Count == 0)
{
for (var i = 0; i < _spans.Count; i++)
foreach (var span in _spans)
{
hashSet.Add(_spans.Array![_spans.Offset + i].SpanId);
hashSet.Add(span.SpanId);
}
}

Expand All @@ -243,15 +244,28 @@ private bool Contains(ulong spanId, int startIndex)
private int IndexOf(ulong spanId, int startIndex)
{
// wrap around the end of the array
if (startIndex >= _spans.Count)
var count = _spans.Count;
if (count == 0)
{
return -1;
}

if (startIndex >= count)
{
startIndex = 0;
}

if (count == 1)
{
return _spans[0].SpanId == spanId ? 0 : -1;
}

var array = _spans.ToArray();

// iterate over the span array starting at the specified index + 1
for (var i = startIndex; i < _spans.Count; i++)
for (var i = startIndex; i < count; i++)
{
if (spanId == _spans.Array![_spans.Offset + i].SpanId)
if (spanId == array.Array![array.Offset + i].SpanId)
{
return i;
}
Expand All @@ -260,7 +274,7 @@ private int IndexOf(ulong spanId, int startIndex)
// if not found above, wrap around to the beginning to search the rest of the array
for (var i = 0; i < startIndex; i++)
{
if (spanId == _spans.Array![_spans.Offset + i].SpanId)
if (spanId == array.Array![array.Offset + i].SpanId)
{
return i;
}
Expand Down
6 changes: 3 additions & 3 deletions tracer/src/Datadog.Trace/Agent/NullStatsAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ public void Add(params Span[] spans)
{
}

public void AddRange(ArraySegment<Span> spans)
public void AddRange(in SpanCollection spans)
{
}

public bool ShouldKeepTrace(ArraySegment<Span> spans) => true;
public bool ShouldKeepTrace(in SpanCollection spans) => true;

public ArraySegment<Span> ProcessTrace(ArraySegment<Span> trace) => trace;
public SpanCollection ProcessTrace(in SpanCollection trace) => trace;

public Task DisposeAsync()
{
Expand Down
4 changes: 2 additions & 2 deletions tracer/src/Datadog.Trace/Agent/SpanBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ArraySegment<byte> Data
// For tests only
internal bool IsEmpty => !_locked && !IsFull && TraceCount == 0 && SpanCount == 0 && _offset == HeaderSize;

public WriteStatus TryWrite(ArraySegment<Span> spans, ref byte[] temporaryBuffer, int? samplingPriority = null)
public WriteStatus TryWrite(in SpanCollection spans, ref byte[] temporaryBuffer, int? samplingPriority = null)
{
bool lockTaken = false;

Expand All @@ -91,7 +91,7 @@ public WriteStatus TryWrite(ArraySegment<Span> spans, ref byte[] temporaryBuffer
// to get the other values we need (sampling priority, origin, trace tags, etc) for now.
// the idea is that as we refactor further, we can pass more than just the spans,
// and these values can come directly from the trace context.
var traceChunk = new TraceChunkModel(spans, samplingPriority, isFirstChunkInPayload: TraceCount == 0);
var traceChunk = new TraceChunkModel(in spans, samplingPriority, isFirstChunkInPayload: TraceCount == 0);

// We don't know what the serialized size of the payload will be,
// so we need to write to a temporary buffer first
Expand Down
Loading
Loading