Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove meters from storage list when removed and handle NoRecordedValue flag when removing meters #5997

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ internal sealed class ExperimentalOptions

public const string OtlpDiskRetryDirectoryPathEnvVar = "OTEL_DOTNET_EXPERIMENTAL_OTLP_DISK_RETRY_DIRECTORY_PATH";

public const string EmitNoRecordedValueNeededDataPointsEnvVar = "OTEL_DOTNET_EXPERIMENTAL_OTLP_METRICS_EMIT_NO_RECORDED_VALUE";

public ExperimentalOptions()
: this(new ConfigurationBuilder().AddEnvironmentVariables().Build())
{
Expand All @@ -29,6 +31,11 @@ public ExperimentalOptions(IConfiguration configuration)
this.EmitLogEventAttributes = emitLogEventAttributes;
}

if (configuration.TryGetBoolValue(OpenTelemetryProtocolExporterEventSource.Log, EmitNoRecordedValueNeededDataPointsEnvVar, out var emitNoRecordedValueNeededDataPoints))
{
this.EmitNoRecordedValueNeededDataPoints = emitNoRecordedValueNeededDataPoints;
}

if (configuration.TryGetStringValue(OtlpRetryEnvVar, out var retryPolicy) && retryPolicy != null)
{
if (retryPolicy.Equals("in_memory", StringComparison.OrdinalIgnoreCase))
Expand Down Expand Up @@ -78,4 +85,10 @@ public ExperimentalOptions(IConfiguration configuration)
/// Gets the path on disk where the telemetry will be stored for retries at a later point.
/// </summary>
public string? DiskRetryDirectoryPath { get; }

/// <summary>
/// Gets a value indicating whether the NoRecordedValue measurement should be sent when metrics are removed,
/// e.g when disposing a Meter.
/// </summary>
public bool EmitNoRecordedValueNeededDataPoints { get; }
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ internal static int WriteSInt32WithTag(byte[] buffer, int writePosition, int fie
return writePosition;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static int WriteVarInt32WithTag(byte[] buffer, int writePosition, int fieldNumber, uint value)
{
writePosition = WriteTag(buffer, writePosition, fieldNumber, ProtobufWireType.VARINT);
writePosition = WriteVarInt32(buffer, writePosition, value);

return writePosition;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static int WriteVarInt32(byte[] buffer, int writePosition, uint value)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class OtlpMetricExporter : BaseExporter<Metric>
private readonly OtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;

private readonly bool emitNoRecordedValueNeededDataPoints;
private Resource? resource;

// Initial buffer size set to ~732KB.
Expand Down Expand Up @@ -53,6 +54,7 @@ internal OtlpMetricExporter(

this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics);
this.emitNoRecordedValueNeededDataPoints = experimentalOptions!.EmitNoRecordedValueNeededDataPoints;
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -65,7 +67,7 @@ public override ExportResult Export(in Batch<Metric> metrics)

try
{
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(ref this.buffer, this.startWritePosition, this.Resource, metrics);
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(ref this.buffer, this.startWritePosition, this.Resource, metrics, this.emitNoRecordedValueNeededDataPoints);

if (this.startWritePosition == GrpcStartWritePosition)
{
Expand Down
19 changes: 19 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ Notes](../../RELEASENOTES.md).

## Unreleased

* Fixed storage exhaustion when disposing and recreating Meters.
**Previous Behavior:** Disposing a meter set the associated metric to null
in an array of default size 1000 allocated at creation time. Disposing and
recreating meters could exhaust the storage available in that list, leading
to an inability to collect the data points from newly created meters.

**New Behavior:** Disposing a meter now marks the associated metrics for
deletion and they are cleaned up after the next collection cycle.

**Limitation:** This means that quickly recreating meters within the same
collection cycle will still exhaust the storage limit.

* Added an experimental flag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should move to OTLP Exporter changelog.

`OTEL_DOTNET_EXPERIMENTAL_OTLP_METRICS_EMIT_NO_RECORDED_VALUE`. When set to
`true`, after disposing a meter, a DataPoint with the flag NoRecordedValue
will be sent on the next collection cycle for all associated metrics as per
the
[OTLP data model](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#no-recorded-value).

## 1.11.0-rc.1

Released 2024-Dec-11
Expand Down
2 changes: 2 additions & 0 deletions src/OpenTelemetry/Metrics/Metric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ internal Metric(

internal bool Active { get; set; } = true;

internal bool NoRecordedValueNeeded { get; set; }

/// <summary>
/// Get the metric points for the metric stream.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/OpenTelemetry/Metrics/MetricPoint/MetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public readonly HistogramBuckets GetHistogramBuckets()
/// </remarks>
/// <returns><see cref="ExponentialHistogramData"/>.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ExponentialHistogramData GetExponentialHistogramData()
public readonly ExponentialHistogramData GetExponentialHistogramData()
{
if (this.aggType != AggregationType.Base2ExponentialHistogram &&
this.aggType != AggregationType.Base2ExponentialHistogramWithMinMax)
Expand Down
57 changes: 29 additions & 28 deletions src/OpenTelemetry/Metrics/Reader/MetricReaderExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ private void CreateOrUpdateMetricStreamRegistration(in MetricStreamIdentity metr
{
if (!this.metricStreamNames.Add(metricStreamIdentity.MetricStreamName))
{
// TODO: If a metric is deactivated and then reactivated we log the
// same warning as if it was a duplicate.
OpenTelemetrySdkEventSource.Log.DuplicateMetricInstrument(
metricStreamIdentity.InstrumentName,
metricStreamIdentity.MeterName,
Expand Down Expand Up @@ -231,7 +229,35 @@ private Batch<Metric> GetMetricsBatch()

if (!metric.Active)
{
this.RemoveMetric(ref metric);
// Inactive metrics are sent one last time so the remaining data points and
// NoRecordedValue data points can be sent. The Active property might be
// set to false between collection cycles, so a separate property must be
// used to avoid duplicate staleness markers.
metric.NoRecordedValueNeeded = true;

lock (this.instrumentCreationLock)
{
OpenTelemetrySdkEventSource.Log.MetricInstrumentRemoved(metric.Name, metric.MeterName);

// Note: This is using TryUpdate and NOT TryRemove because there is a
// race condition. If a metric is deactivated and then reactivated in
// the same collection cycle
// instrumentIdentityToMetric[metric.InstrumentIdentity] may already
// point to the new activated metric and not the old deactivated one.
this.instrumentIdentityToMetric.TryUpdate(metric.InstrumentIdentity, null, metric);

this.metricStreamNames.Remove(metric.InstrumentIdentity.MetricStreamName);

// Defragment metrics list so storage can be reused on future metrics.
for (int j = i + 1; j < target; j++)
{
this.metrics[j - 1] = this.metrics[j];
}

this.metrics[target - 1] = null;
this.metricIndex--;
i--;
}
}
}
}
Expand All @@ -244,29 +270,4 @@ private Batch<Metric> GetMetricsBatch()
return default;
}
}

private void RemoveMetric(ref Metric? metric)
{
Debug.Assert(metric != null, "metric was null");

// TODO: This logic removes the metric. If the same
// metric is published again we will create a new metric
// for it. If this happens often we will run out of
// storage. Instead, should we keep the metric around
// and set a new start time + reset its data if it comes
// back?

OpenTelemetrySdkEventSource.Log.MetricInstrumentRemoved(metric!.Name, metric.MeterName);

// Note: This is using TryUpdate and NOT TryRemove because there is a
// race condition. If a metric is deactivated and then reactivated in
// the same collection cycle
// instrumentIdentityToMetric[metric.InstrumentIdentity] may already
// point to the new activated metric and not the old deactivated one.
this.instrumentIdentityToMetric.TryUpdate(metric.InstrumentIdentity, null, metric);

// Note: metric is a reference to the array storage so
// this clears the metric out of the array.
metric = null;
}
}
Loading
Loading