Skip to content

Commit

Permalink
Added support to execute sacct to get job historic data for metrics (#…
Browse files Browse the repository at this point in the history
…857)

* Added support to execute sacct to get job historic data for metrics

* Updated sacct call to return an array of info objects

* Added custom parse_time method to sacct call

* Added info_historic adapter method

* Added group_name to info_historic call
  • Loading branch information
abujeda authored Mar 7, 2025
1 parent 9c10c77 commit be9ede9
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 2 deletions.
12 changes: 12 additions & 0 deletions lib/ood_core/job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ def info_all(attrs: nil)
raise NotImplementedError, "subclass did not define #info_all"
end

# Retrieve historic info for all completed jobs from the resource manager.
# This depends on the data retention configuration of the resource manager.
# @abstract Subclass is expected to implement {#info_historic}
# @raise [NotImplementedError] if subclass did not define {#info_historic}
#
# @param opts [#to_h] options to filter jobs in the resource manager.
#
# @return [Array<Info>] information describing the jobs
def info_historic(opts: {})
raise NotImplementedError, "subclass did not define #info_historic"
end

# Retrieve info for all jobs for a given owner or owners from the
# resource manager
# @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs
Expand Down
130 changes: 128 additions & 2 deletions lib/ood_core/job/adapters/slurm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,46 @@ def all_squeue_fields
}
end

# Job info fields requested from a formatted `sacct` call
def sacct_info_fields
{
# The user name of the user who ran the job.
user: 'User',
# The group name of the user who ran the job.
group_name: 'Group',
# Job Id for reference
job_id: 'JobId',
# The name of the job or job step
job_name: 'JobName',
# The job's elapsed time.
elapsed: 'Elapsed',
# Minimum required memory for the job
req_mem: 'ReqMem',
# Count of allocated CPUs
alloc_cpus: 'AllocCPUS',
# Number of requested CPUs.
req_cpus: 'ReqCPUS',
# What the timelimit was/is for the job
time_limit: 'Timelimit',
# Displays the job status, or state
state: 'State',
# The sum of the SystemCPU and UserCPU time used by the job or job step
total_cpu: 'TotalCPU',
# Maximum resident set size of all tasks in job.
max_rss: 'MaxRSS',
# Identifies the partition on which the job ran.
partition: 'Partition',
# The time the job was submitted. In the same format as End.
submit_time: 'Submit',
# Initiation time of the job. In the same format as End.
start_time: 'Start',
# Termination time of the job.
end: 'End',
# Trackable resources. These are the minimum resource counts requested by the job/step at submission time.
gres: 'ReqTRES'
}
end

def queues
info_raw = call('scontrol', 'show', 'part', '-o')

Expand Down Expand Up @@ -357,6 +397,31 @@ def nodes
end.compact
end

def sacct_info(job_ids, states, from, to, show_steps)
# https://slurm.schedmd.com/sacct.html
fields = sacct_info_fields
args = ['-P'] # Output will be delimited
args.concat ['--delimiter', UNIT_SEPARATOR]
args.concat ['-n'] # No header
args.concat ['--units', 'G'] # Memory units in GB
args.concat ['--allocations'] unless show_steps # Show statistics relevant to the job, not taking steps into consideration
args.concat ['-o', fields.values.join(',')] # Required data
args.concat ['--state', states.join(',')] unless states.empty? # Filter by these states
args.concat ['-j', job_ids.join(',')] unless job_ids.empty? # Filter by these job ids
args.concat ['-S', from] if from # Filter from This date
args.concat ['-E', to] if to # Filter until this date

jobs_info = []
StringIO.open(call('sacct', *args)) do |output|
output.each_line do |line|
# Replace blank values with nil
values = line.strip.split(UNIT_SEPARATOR).map{ |value| value.to_s.empty? ? nil : value }
jobs_info << Hash[fields.keys.zip(values)] unless values.empty?
end
end
jobs_info
end

private
def str_to_queue_info(line)
hsh = line.split(' ').map do |token|
Expand Down Expand Up @@ -466,8 +531,23 @@ def squeue_attrs_for_info_attrs(attrs)
'SE' => :completed, # SPECIAL_EXIT
'ST' => :running, # STOPPED
'S' => :suspended, # SUSPENDED
'TO' => :completed, # TIMEOUT
'OOM' => :completed # OUT_OF_MEMORY
'TO' => :completed, # TIMEOUT
'OOM' => :completed, # OUT_OF_MEMORY

'BOOT_FAIL' => :completed,
'CANCELED' => :completed,
'COMPLETED' => :completed,
'DEADLINE' => :completed,
'FAILED' => :completed,
'NODE_FAIL' => :completed,
'OUT_OF_MEMORY' => :completed,
'PENDING' => :queued,
'PREEMPTED' => :completed,
'RUNNING' => :running,
'REQUEUED' => :queued,
'REVOKED' => :completed,
'SUSPENDED' => :suspended,
'TIMEOUT' => :completed,
}

# @api private
Expand Down Expand Up @@ -586,6 +666,45 @@ def info_all(attrs: nil)
raise JobAdapterError, e.message
end

# Retrieve historic info for all completed jobs from the resource manager.
#
# Known options:
# job_ids [Array<#to_s>] optional list of job ids to filter the results.
# states [Array<#to_s>] optional list of job state codes.
# Selects jobs based on their state during the time period given.
# from [#to_s] optional date string to filter jobs in any state after the specified time.
# If states are provided, filter jobs in these states after this period
# to [#to_s] optional date string to filter jobs in any state before the specified time.
# If states are provided, filter jobs in these states before this period.
# show_steps [#Boolean] optional boolean to filter job steps from the results.
#
# @return [Array<Info>] information describing submitted jobs
# @see Adapter#info_historic
def info_historic(opts: {})
job_ids = opts.fetch(:job_ids, [])
states = opts.fetch(:states, [])
from = opts.fetch(:from, nil)
to = opts.fetch(:to, nil)
show_steps = opts.fetch(:show_steps, false)
@slurm.sacct_info(job_ids, states, from, to, show_steps).map do |v|
Info.new(
id: v[:job_id],
status: get_state(v[:state]),
job_name: v[:job_name],
job_owner: v[:user],
procs: v[:alloc_cpus],
queue_name: v[:partition],
wallclock_time: duration_in_seconds(v[:elapsed]),
wallclock_limit: duration_in_seconds(v[:time_limit]),
cpu_time: duration_in_seconds(v[:total_cpu]),
submission_time: parse_time(v[:submit_time]),
dispatch_time: parse_time(v[:start_time]),
native: v,
gpus: self.class.gpus_from_gres(v[:gres])
)
end
end

# Retrieve job info from the resource manager
# @param id [#to_s] the id of the job
# @raise [JobAdapterError] if something goes wrong getting job info
Expand Down Expand Up @@ -718,6 +837,13 @@ def seconds_to_duration(time)
"%02d:%02d:%02d" % [time/3600, time/60%60, time%60]
end

# Parse date time string ignoring unknown values returned by Slurm
def parse_time(date_time)
return nil if date_time.empty? || %w[N/A NONE UNKNOWN].include?(date_time.to_s.upcase)

Time.parse(date_time)
end

# Convert host list string to individual nodes
# "em082"
# "em[014,055-056,161]"
Expand Down
4 changes: 4 additions & 0 deletions spec/fixtures/scripts/sacct.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env ruby

puts "ood\u001Food\u001F20251\u001FRDesktop\u001F00:00:00\u001F0.98G\u001F2\u001F2\u001F01:00:00\u001FPENDING\u001F00:00:00\u001F\u001Fnormal\u001F2025-01-03T16:54:57\u001FUnknown\u001FUnknown\u001Fbilling=2,cpu=2,gpu=1,mem=0.98G,node=2"
puts "ood\u001Food\u001F20252\u001FRStudio\u001F00:13:47\u001F0.49G\u001F1\u001F1\u001F01:00:00\u001FRUNNING\u001F00:00:00\u001F\u001Fnormal\u001F2025-01-03T16:55:37\u001F2025-01-03T16:55:37\u001FUnknown\u001Fbilling=1,cpu=1,mem=0.49G,node=1"
7 changes: 7 additions & 0 deletions spec/job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

it { is_expected.to respond_to(:submit).with(1).argument.and_keywords(:after, :afterok, :afternotok, :afterany) }
it { is_expected.to respond_to(:info_all).with(0).arguments.and_keywords(:attrs) }
it { is_expected.to respond_to(:info_historic).with(0).arguments.and_keywords(:opts) }
it { is_expected.to respond_to(:info_where_owner).with(1).argument.and_keywords(:attrs) }
it { is_expected.to respond_to(:info).with(1).argument }
it { is_expected.to respond_to(:status).with(1).argument }
Expand Down Expand Up @@ -34,6 +35,12 @@
end
end

describe "#info_historic" do
it "raises NotImplementedError" do
expect { subject.info_historic }.to raise_error(NotImplementedError)
end
end

describe "#info_where_owner" do
let(:bob_job) { double("bob", job_owner: "bob") }
let(:sam_job) { double("sam", job_owner: "sam") }
Expand Down
43 changes: 43 additions & 0 deletions spec/job/adapters/slurm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

it { is_expected.to respond_to(:submit).with(1).argument.and_keywords(:after, :afterok, :afternotok, :afterany) }
it { is_expected.to respond_to(:info_all).with(0).arguments.and_keywords(:attrs) }
it { is_expected.to respond_to(:info_historic).with(0).arguments.and_keywords(:opts) }
it { is_expected.to respond_to(:info_where_owner).with(1).argument.and_keywords(:attrs) }
it { is_expected.to respond_to(:info).with(1).argument }
it { is_expected.to respond_to(:status).with(1).argument }
Expand Down Expand Up @@ -358,6 +359,48 @@ def build_script(opts = {})
end
end

describe "#info_historic" do
context "when no jobs" do
it "returns an array of all the jobs" do
adapter = OodCore::Job::Adapters::Slurm.new(slurm: double(sacct_info: []))
expect(adapter.info_historic).to eq([])
end
end

context "when jobs" do
it "returns an array of all the jobs" do
batch = OodCore::Job::Adapters::Slurm::Batch.new(
conf: "/etc/slurm/conf/",
bin: nil,
bin_overrides: { "sacct" => "spec/fixtures/scripts/sacct.rb"}
)
jobs = OodCore::Job::Adapters::Slurm.new(slurm: batch).info_historic

expect(jobs.count).to eq(2)

j1 = jobs.first
expect(j1.id).to eq("20251")
expect(j1.job_name).to eq("RDesktop")
expect(j1.queue_name).to eq("normal")
expect(j1.status).to eq("queued")
expect(j1.status).to eq(OodCore::Job::Status.new(state: :queued))
expect(j1.status.to_s).to eq("queued")
expect(j1.gpus).to eq(1)
expect(j1.gpu?).to eq(true)

j2 = jobs.last
expect(j2.id).to eq("20252")
expect(j2.job_name).to eq("RStudio")
expect(j2.queue_name).to eq("normal")
expect(j2.status).to eq("running")
expect(j2.status).to eq(OodCore::Job::Status.new(state: :running))
expect(j2.status.to_s).to eq("running")
expect(j2.gpus).to eq(0)
expect(j2.gpu?).to eq(false)
end
end
end

describe "#info" do
def job_info(opts = {})
OodCore::Job::Info.new(
Expand Down

0 comments on commit be9ede9

Please sign in to comment.