Skip to content

Commit

Permalink
refractored the metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jayamala17 committed May 6, 2024
1 parent 727b46f commit c818bd8
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 120 deletions.
151 changes: 74 additions & 77 deletions lib/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,97 +7,97 @@
require_relative "bag_status"
module Jobs
class DarkBlueMetrics

# for being unable to contact the push gateway
class PushGatewayClientError < StandardError; end

def initialize(start_time:, end_time:)
@start_time = start_time
@end_time = end_time
end

def get_overall_metrics_current_run(start_time)
successful_transfer_status = [
def get_success_count
successful_status = [
BagStatus::BAGGING, BagStatus::COPYING, BagStatus::COPIED, BagStatus::VALIDATING,BagStatus::VALIDATED,
BagStatus::BAGGED, BagStatus::PACKING,BagStatus::PACKED, BagStatus::DEPOSITING, BagStatus::DEPOSITED
]

# failed_transfer_status = [
# BagStatus::FAILED, BagStatus::DEPOSIT_SKIPPED, BagStatus::VALIDATION_SKIPPED, BagStatus::VERIFY_FAILED
# ]
result = DatabaseSchema::StatusEvent
BagStatus::BAGGED, BagStatus::PACKING,BagStatus::PACKED, BagStatus::DEPOSITING, BagStatus::DEPOSITED]
success_count = DatabaseSchema::StatusEvent
.join(:status, Sequel.qualify(:status, :id) => Sequel.qualify(:status_event, :status_id))
.join(:bag, Sequel.qualify(:bag, :id) => Sequel.qualify(:status_event, :bag_id))
.where { Sequel.qualify(:status_event,:timestamp) >= start_time }
.select(:timestamp, Sequel.qualify(:status, :name).as(:status_name), Sequel.qualify(:bag, :identifier).as(:bag_id),
Sequel.as(Sequel.function(:COUNT, Sequel.case({{Sequel.qualify(:status, :id) => successful_transfer_status} => 1}, 0)),
:success_count))

successful_bag_transfer = []
failed_bag_transfer = []

# successful transfer
result.each do |row|
if row[:success_count]==successful_transfer_status.length
successful_bag_transfer << row[:bag_id]
else
failed_bag_transfer << row[:bag_id]
end
end
bag_transfer_metrics = registry.gauge(
:bag_transfer_metrics,
docstring: "Bag transfer metrics",
labels: [:date_of_processing , :success, :failure]
)
labels = {
date_of_processing: Time.at(start_time).strftime("%m/%d/%Y"),
success: successful_bag_transfer.count,
failure: failed_bag_transfer.count
}

bag_transfer_metrics.set(start_time, labels: labels)
gateway.add(registry)
.where { Sequel.qualify(:status_event,:timestamp) >= @start_time }
.select( :bag_id, Sequel.qualify(:status, :name))
.group(:bag_id)
.having{count(Sequel.qualify(:status, :name)) == successful_status.length}
.count
success_count
end

successful_bag_id = registry.gauge(
:successful_bag_id,
docstring: "Successful Bag transfer metrics",
labels: [:bag_id]
)
def get_failure_count
failed_status = [BagStatus::DEPOSIT_SKIPPED, BagStatus::FAILED, BagStatus::VALIDATION_SKIPPED, BagStatus::VERIFY_FAILED]
failure_count = DatabaseSchema::StatusEvent
.join(:status, Sequel.qualify(:status, :id) => Sequel.qualify(:status_event, :status_id))
.where { Sequel.qualify(:status_event,:timestamp) >= @start_time }
.select( :bag_id, Sequel.qualify(:status, :name))
.group(:bag_id)
.having(~Sequel.|(*failed_status.map {|status| Sequel.qualify(:status, :name) => status}))
.count
failure_count
end

labels = {
bag_id: successful_bag_transfer
}
def get_failed_bag_ids
failed_status = [BagStatus::DEPOSIT_SKIPPED, BagStatus::FAILED, BagStatus::VALIDATION_SKIPPED, BagStatus::VERIFY_FAILED]
failure_bag_ids = DatabaseSchema::Bag
.join(:status_event, Sequel.qualify(:status_event, :bag_id) => Sequel.qualify(:bag, :id))
.join(:status, Sequel.qualify(:status, :id) => Sequel.qualify(:status_event, :status_id))
.where(Sequel.qualify(:status_event, :timestamp) >= @start_time)
.where(Sequel.qualify(:status, :name).like("%#{failed_status.join('%')}%"))
.group(Sequel[:bag][:identifier])
.select_map(Sequel[:bag][:identifier])

successful_bag_id.set(start_time, labels: labels)
gateway.add(registry)
failure_bag_ids
end

failed_bag_id = registry.gauge(
:failed_bag_id,
docstring: "Failed Bag transfer metrics",
labels: [:bag_id]
)
def set_success_count
dark_blue_success_count = registry.gauge(
:dark_blue_success_count,
docstring: "Successful number of bag transfer")
dark_blue_success_count.set(get_success_count)
end

labels = {
bag_id: failed_bag_transfer
}
def set_failed_count
dark_blue_failed_count = registry.gauge(
:dark_blue_failed_count,
docstring: "Failed number of bag transfer")
dark_blue_failed_count.set(get_failure_count)
end

failed_bag_id.set(start_time, labels: labels)
gateway.add(registry)
def set_failed_bag_id
dark_blue_failed_bag_ids = registry.gauge(
:dark_blue_failed_bag_ids,
docstring: "Failed bag transfer")
if !get_failed_bag_ids.empty?
dark_blue_failed_bag_ids.set(get_failed_bag_ids)
end
end

def push_last_successful_run
def set_last_successful_run
dark_blue_last_successful_run = registry.gauge(
:dark_blue_last_successful_run,
{docstring: "Timestamp of the last successful run of the cron job"}
)
time_in_milli_sec = (Time.now.to_i) * 1000
docstring: "Timestamp of the last successful run of the cron job")
time_in_milli_sec = @start_time * 1000
dark_blue_last_successful_run.set(time_in_milli_sec)
gateway.add(registry)
end

def push_processing_duration(start_time, end_time)
def set_processing_duration
dark_blue_processing_duration = registry.gauge(
:dark_blue_processing_duration,
{docstring: "Duration of processing in seconds for the cron job"}
)
dark_blue_processing_duration.set(end_time - start_time)
gateway.add(registry)
docstring: "Duration of processing in seconds for the cron job")
dark_blue_processing_duration.set(@end_time - @start_time)
end

def set_all_metrics
set_last_successful_run
set_processing_duration
set_success_count
set_failed_count
set_failed_bag_id
push_metrics
end

private
Expand All @@ -112,11 +112,8 @@ def gateway
)
end

def push_metrics
gateway.add(registry)
end
end

# class DarkBlueMetricsFactory
# def self.for(use_db:)
# DarkBlueMetrics.new if use_db?
# end
# end
end
6 changes: 2 additions & 4 deletions run_dark_blue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,5 @@ def self.parse(options)
dark_blue_job.process
end
end_time = Time.now.to_i
metrics = Jobs::DarkBlueMetrics.new
metrics.get_overall_metrics_current_run(start_time)
metrics.push_processing_duration(start_time,end_time)
metrics.push_last_successful_run
metrics = Jobs::DarkBlueMetrics.new(start_time: start_time, end_time: end_time)
metrics.set_all_metrics
64 changes: 25 additions & 39 deletions test/test_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

class DarkBlueMetricTest < Minitest::Test
def setup
@metrics = Jobs::DarkBlueMetrics.new
@time_now = Time.now
@metrics = Jobs::DarkBlueMetrics.new(start_time: @time_now, end_time:(@time_now + 5))
ENV["PROMETHEUS_PUSH_GATEWAY"] = "http://test.xyz"

@registry = Minitest::Mock.new
Expand All @@ -26,35 +27,29 @@ def test_gateway
assert_instance_of Prometheus::Client::Push, result
end

def test_push_last_successful_run
@gauge.expect(:set, nil,[Numeric])
@registry.expect(:gauge, @gauge,[:dark_blue_last_successful_run,
{docstring: "Timestamp of the last successful run of the cron job"}
])
@gateway.expect(:add, nil, [@registry])

@metrics.instance_variable_set(:@registry, @registry)
@metrics.instance_variable_set(:@gateway, @gateway)
@metrics.push_last_successful_run
@registry.verify
@gauge.verify
@gateway.verify
end

def test_push_processing_duration
expected_duration = 5
time_now = Time.now
@gauge.expect(:set, nil,[Numeric])
@registry.expect(:gauge, @gauge,[:dark_blue_processing_duration,{docstring: "Duration of processing in seconds for the cron job"} ])
@gateway.expect(:add, nil, [@registry])

@metrics.instance_variable_set(:@registry, @registry)
@metrics.instance_variable_set(:@gateway, @gateway)
@metrics.push_processing_duration(time_now,(time_now + expected_duration))
@registry.verify
@gauge.verify
@gateway.verify
end
# def test_set_last_successful_run
# expected_time = @time_now * 1000
# @gauge.expect(:set, nil,[expected_time])

# @metrics.stub(:create_gauge, @gauge) do
# Time.stub :now, Time.at(expected_time) do
# @metrics.set_last_successful_run
# end
# end
# @gauge.verify
# end

# def test_set_processing_duration
# expected_duration = 5
# @gauge.expect(:set, nil,[expected_duration])

# @metrics.stub(:create_gauge, @gauge) do
# Time.stub :now, Time.at(expected_duration) do
# @metrics.set_processing_duration
# end
# end
# @gauge.verify
# end
# TBD:

# sleep(30)
Expand Down Expand Up @@ -90,12 +85,3 @@ def test_push_processing_duration
# bag_transfer_metrics.set(start_time + (60 * 60 * 24 * 3) , labels: labels)
# gateway.add(registry)
end

# class DarkBlueMetricsFactoryTest < Minitest::Test
# def test_for_creates_db_repo
# db = Sequel.connect("mock://mysql2")
# repo = Job::DarkBlueMetricsFactory.for(use_db: db)
# assert repo.is_a?(Job::DarkBlueMetrics)
# end

# end

0 comments on commit c818bd8

Please sign in to comment.