From fd7ae93832b044e2a8721e332cb34f770efc1102 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Tue, 9 Jul 2024 14:09:16 +0200 Subject: [PATCH] flush metrics in the telemetry worker --- lib/datadog/core/telemetry/component.rb | 1 + lib/datadog/core/telemetry/worker.rb | 19 +++++++++--- sig/datadog/core/telemetry/worker.rbs | 9 ++++-- spec/datadog/core/telemetry/component_spec.rb | 4 ++- spec/datadog/core/telemetry/worker_spec.rb | 31 ++++++++++++++++++- 5 files changed, 54 insertions(+), 10 deletions(-) diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb index 5b6a7335568..c4dac39a2bb 100644 --- a/lib/datadog/core/telemetry/component.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -40,6 +40,7 @@ def initialize( heartbeat_interval_seconds: heartbeat_interval_seconds, metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: Emitter.new, + metrics_manager: @metrics_manager, dependency_collection: dependency_collection ) @worker.start diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 4add8057cfb..5166f63a7d9 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -23,12 +23,14 @@ def initialize( heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, + metrics_manager:, dependency_collection:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE ) @emitter = emitter + @metrics_manager = metrics_manager @dependency_collection = dependency_collection @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i @@ -79,8 +81,10 @@ def perform(*events) return if !enabled? || forked? started! unless sent_started_event? - # flush metrics here - flush_events(events) + + metric_events = @metrics_manager.flush! + events = [] if events.nil? + flush_events(events + metric_events) @current_ticks += 1 return if @current_ticks < @ticks_per_heartbeat @@ -90,7 +94,7 @@ def perform(*events) end def flush_events(events) - return if events.nil? || events.empty? + return if events.empty? return if !enabled? || !sent_started_event? Datadog.logger.debug { "Sending #{events&.count} telemetry events" } @@ -108,7 +112,7 @@ def started! if failed_to_start? Datadog.logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker') - self.enabled = false + disable! return end @@ -152,11 +156,16 @@ def buffer_klass end end + def disable! + self.enabled = false + @metrics_manager.disable! + end + def disable_on_not_found!(response) return unless response.not_found? Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.') - self.enabled = false + disable! end end end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 4e0f87a3249..8d98714f912 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -13,6 +13,7 @@ module Datadog DEFAULT_BUFFER_MAX_SIZE: 1000 @emitter: Emitter + @metrics_manager: MetricsManager @sent_started_event: bool @shutdown_timeout: Integer @buffer_size: Integer @@ -20,7 +21,7 @@ module Datadog @ticks_per_heartbeat: Integer @current_ticks: Integer - def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void + def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, metrics_manager: MetricsManager, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void def start: () -> void @@ -40,9 +41,11 @@ module Datadog def flush_events: (Array[Event::Base] events) -> void - def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response + def send_event: (Event::Base event) -> Http::Adapters::Net::Response - def disable_on_not_found!: (Datadog::Core::Telemetry::Http::Adapters::Net::Response response) -> void + def disable!: () -> void + + def disable_on_not_found!: (Http::Adapters::Net::Response response) -> void def buffer_klass: () -> untyped end diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb index eee7639ba1e..8cc9d0c0b02 100644 --- a/spec/datadog/core/telemetry/component_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -27,7 +27,8 @@ metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection, enabled: enabled, - emitter: an_instance_of(Datadog::Core::Telemetry::Emitter) + emitter: an_instance_of(Datadog::Core::Telemetry::Emitter), + metrics_manager: anything ).and_return(worker) allow(worker).to receive(:start) @@ -218,6 +219,7 @@ let(:value) { double('value') } let(:tags) { double('tags') } let(:common) { double('common') } + before do expect(Datadog::Core::Telemetry::MetricsManager).to receive(:new).with( aggregation_interval: metrics_aggregation_interval_seconds, diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 3f141a738ed..64725de8821 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -9,6 +9,7 @@ heartbeat_interval_seconds: heartbeat_interval_seconds, metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: emitter, + metrics_manager: metrics_manager, dependency_collection: dependency_collection ) end @@ -16,7 +17,8 @@ let(:enabled) { true } let(:heartbeat_interval_seconds) { 0.5 } let(:metrics_aggregation_interval_seconds) { 0.25 } - let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } + let(:metrics_manager) { instance_double(Datadog::Core::Telemetry::MetricsManager, flush!: [], disable!: nil) } + let(:emitter) { instance_double(Datadog::Core::Telemetry::Emitter) } let(:dependency_collection) { false } let(:backend_supports_telemetry?) { true } @@ -205,6 +207,32 @@ try_wait_until { sent_dependencies } end end + + context 'when metrics are flushed' do + before do + allow(metrics_manager).to receive(:flush!).and_return( + [Datadog::Core::Telemetry::Event::GenerateMetrics.new('namespace', [])] + ) + end + + it 'sends metrics event' do + received_metrics = false + + allow(emitter).to receive(:request).with( + an_instance_of(Datadog::Core::Telemetry::Event::MessageBatch) + ) do |event| + event.events.each do |subevent| + received_metrics = true if subevent.is_a?(Datadog::Core::Telemetry::Event::GenerateMetrics) + end + + response + end + + worker.start + + try_wait_until { received_metrics } + end + end end context 'when internal error returned by emitter' do @@ -244,6 +272,7 @@ heartbeat_interval_seconds: heartbeat_interval_seconds, metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: emitter, + metrics_manager: metrics_manager, dependency_collection: dependency_collection ) end