From be9ede986e129da96131ad5ab4506580198a977d Mon Sep 17 00:00:00 2001 From: Aday BA Date: Fri, 7 Mar 2025 16:37:58 +0000 Subject: [PATCH] Added support to execute sacct to get job historic data for metrics (#857) * Added support to execute sacct to get job historic data for metrics * Updated sacct call to return an array of info objects * Added custom parse_time method to sacct call * Added info_historic adapter method * Added group_name to info_historic call --- lib/ood_core/job/adapter.rb | 12 +++ lib/ood_core/job/adapters/slurm.rb | 130 ++++++++++++++++++++++++++++- spec/fixtures/scripts/sacct.rb | 4 + spec/job/adapter_spec.rb | 7 ++ spec/job/adapters/slurm_spec.rb | 43 ++++++++++ 5 files changed, 194 insertions(+), 2 deletions(-) create mode 100755 spec/fixtures/scripts/sacct.rb diff --git a/lib/ood_core/job/adapter.rb b/lib/ood_core/job/adapter.rb index 4f543d6e3..a12f89a7e 100644 --- a/lib/ood_core/job/adapter.rb +++ b/lib/ood_core/job/adapter.rb @@ -57,6 +57,18 @@ def info_all(attrs: nil) raise NotImplementedError, "subclass did not define #info_all" end + # Retrieve historic info for all completed jobs from the resource manager. + # This depends on the data retention configuration of the resource manager. + # @abstract Subclass is expected to implement {#info_historic} + # @raise [NotImplementedError] if subclass did not define {#info_historic} + # + # @param opts [#to_h] options to filter jobs in the resource manager. + # + # @return [Array] information describing the jobs + def info_historic(opts: {}) + raise NotImplementedError, "subclass did not define #info_historic" + end + # Retrieve info for all jobs for a given owner or owners from the # resource manager # @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs diff --git a/lib/ood_core/job/adapters/slurm.rb b/lib/ood_core/job/adapters/slurm.rb index acd0d339d..dbc7bbb58 100644 --- a/lib/ood_core/job/adapters/slurm.rb +++ b/lib/ood_core/job/adapters/slurm.rb @@ -323,6 +323,46 @@ def all_squeue_fields } end + # Job info fields requested from a formatted `sacct` call + def sacct_info_fields + { + # The user name of the user who ran the job. + user: 'User', + # The group name of the user who ran the job. + group_name: 'Group', + # Job Id for reference + job_id: 'JobId', + # The name of the job or job step + job_name: 'JobName', + # The job's elapsed time. + elapsed: 'Elapsed', + # Minimum required memory for the job + req_mem: 'ReqMem', + # Count of allocated CPUs + alloc_cpus: 'AllocCPUS', + # Number of requested CPUs. + req_cpus: 'ReqCPUS', + # What the timelimit was/is for the job + time_limit: 'Timelimit', + # Displays the job status, or state + state: 'State', + # The sum of the SystemCPU and UserCPU time used by the job or job step + total_cpu: 'TotalCPU', + # Maximum resident set size of all tasks in job. + max_rss: 'MaxRSS', + # Identifies the partition on which the job ran. + partition: 'Partition', + # The time the job was submitted. In the same format as End. + submit_time: 'Submit', + # Initiation time of the job. In the same format as End. + start_time: 'Start', + # Termination time of the job. + end: 'End', + # Trackable resources. These are the minimum resource counts requested by the job/step at submission time. + gres: 'ReqTRES' + } + end + def queues info_raw = call('scontrol', 'show', 'part', '-o') @@ -357,6 +397,31 @@ def nodes end.compact end + def sacct_info(job_ids, states, from, to, show_steps) + # https://slurm.schedmd.com/sacct.html + fields = sacct_info_fields + args = ['-P'] # Output will be delimited + args.concat ['--delimiter', UNIT_SEPARATOR] + args.concat ['-n'] # No header + args.concat ['--units', 'G'] # Memory units in GB + args.concat ['--allocations'] unless show_steps # Show statistics relevant to the job, not taking steps into consideration + args.concat ['-o', fields.values.join(',')] # Required data + args.concat ['--state', states.join(',')] unless states.empty? # Filter by these states + args.concat ['-j', job_ids.join(',')] unless job_ids.empty? # Filter by these job ids + args.concat ['-S', from] if from # Filter from This date + args.concat ['-E', to] if to # Filter until this date + + jobs_info = [] + StringIO.open(call('sacct', *args)) do |output| + output.each_line do |line| + # Replace blank values with nil + values = line.strip.split(UNIT_SEPARATOR).map{ |value| value.to_s.empty? ? nil : value } + jobs_info << Hash[fields.keys.zip(values)] unless values.empty? + end + end + jobs_info + end + private def str_to_queue_info(line) hsh = line.split(' ').map do |token| @@ -466,8 +531,23 @@ def squeue_attrs_for_info_attrs(attrs) 'SE' => :completed, # SPECIAL_EXIT 'ST' => :running, # STOPPED 'S' => :suspended, # SUSPENDED - 'TO' => :completed, # TIMEOUT - 'OOM' => :completed # OUT_OF_MEMORY + 'TO' => :completed, # TIMEOUT + 'OOM' => :completed, # OUT_OF_MEMORY + + 'BOOT_FAIL' => :completed, + 'CANCELED' => :completed, + 'COMPLETED' => :completed, + 'DEADLINE' => :completed, + 'FAILED' => :completed, + 'NODE_FAIL' => :completed, + 'OUT_OF_MEMORY' => :completed, + 'PENDING' => :queued, + 'PREEMPTED' => :completed, + 'RUNNING' => :running, + 'REQUEUED' => :queued, + 'REVOKED' => :completed, + 'SUSPENDED' => :suspended, + 'TIMEOUT' => :completed, } # @api private @@ -586,6 +666,45 @@ def info_all(attrs: nil) raise JobAdapterError, e.message end + # Retrieve historic info for all completed jobs from the resource manager. + # + # Known options: + # job_ids [Array<#to_s>] optional list of job ids to filter the results. + # states [Array<#to_s>] optional list of job state codes. + # Selects jobs based on their state during the time period given. + # from [#to_s] optional date string to filter jobs in any state after the specified time. + # If states are provided, filter jobs in these states after this period + # to [#to_s] optional date string to filter jobs in any state before the specified time. + # If states are provided, filter jobs in these states before this period. + # show_steps [#Boolean] optional boolean to filter job steps from the results. + # + # @return [Array] information describing submitted jobs + # @see Adapter#info_historic + def info_historic(opts: {}) + job_ids = opts.fetch(:job_ids, []) + states = opts.fetch(:states, []) + from = opts.fetch(:from, nil) + to = opts.fetch(:to, nil) + show_steps = opts.fetch(:show_steps, false) + @slurm.sacct_info(job_ids, states, from, to, show_steps).map do |v| + Info.new( + id: v[:job_id], + status: get_state(v[:state]), + job_name: v[:job_name], + job_owner: v[:user], + procs: v[:alloc_cpus], + queue_name: v[:partition], + wallclock_time: duration_in_seconds(v[:elapsed]), + wallclock_limit: duration_in_seconds(v[:time_limit]), + cpu_time: duration_in_seconds(v[:total_cpu]), + submission_time: parse_time(v[:submit_time]), + dispatch_time: parse_time(v[:start_time]), + native: v, + gpus: self.class.gpus_from_gres(v[:gres]) + ) + end + end + # Retrieve job info from the resource manager # @param id [#to_s] the id of the job # @raise [JobAdapterError] if something goes wrong getting job info @@ -718,6 +837,13 @@ def seconds_to_duration(time) "%02d:%02d:%02d" % [time/3600, time/60%60, time%60] end + # Parse date time string ignoring unknown values returned by Slurm + def parse_time(date_time) + return nil if date_time.empty? || %w[N/A NONE UNKNOWN].include?(date_time.to_s.upcase) + + Time.parse(date_time) + end + # Convert host list string to individual nodes # "em082" # "em[014,055-056,161]" diff --git a/spec/fixtures/scripts/sacct.rb b/spec/fixtures/scripts/sacct.rb new file mode 100755 index 000000000..c0dbe8441 --- /dev/null +++ b/spec/fixtures/scripts/sacct.rb @@ -0,0 +1,4 @@ +#!/usr/bin/env ruby + +puts "ood\u001Food\u001F20251\u001FRDesktop\u001F00:00:00\u001F0.98G\u001F2\u001F2\u001F01:00:00\u001FPENDING\u001F00:00:00\u001F\u001Fnormal\u001F2025-01-03T16:54:57\u001FUnknown\u001FUnknown\u001Fbilling=2,cpu=2,gpu=1,mem=0.98G,node=2" +puts "ood\u001Food\u001F20252\u001FRStudio\u001F00:13:47\u001F0.49G\u001F1\u001F1\u001F01:00:00\u001FRUNNING\u001F00:00:00\u001F\u001Fnormal\u001F2025-01-03T16:55:37\u001F2025-01-03T16:55:37\u001FUnknown\u001Fbilling=1,cpu=1,mem=0.49G,node=1" diff --git a/spec/job/adapter_spec.rb b/spec/job/adapter_spec.rb index 79ee6ff2b..39e3ac68c 100644 --- a/spec/job/adapter_spec.rb +++ b/spec/job/adapter_spec.rb @@ -6,6 +6,7 @@ it { is_expected.to respond_to(:submit).with(1).argument.and_keywords(:after, :afterok, :afternotok, :afterany) } it { is_expected.to respond_to(:info_all).with(0).arguments.and_keywords(:attrs) } + it { is_expected.to respond_to(:info_historic).with(0).arguments.and_keywords(:opts) } it { is_expected.to respond_to(:info_where_owner).with(1).argument.and_keywords(:attrs) } it { is_expected.to respond_to(:info).with(1).argument } it { is_expected.to respond_to(:status).with(1).argument } @@ -34,6 +35,12 @@ end end + describe "#info_historic" do + it "raises NotImplementedError" do + expect { subject.info_historic }.to raise_error(NotImplementedError) + end + end + describe "#info_where_owner" do let(:bob_job) { double("bob", job_owner: "bob") } let(:sam_job) { double("sam", job_owner: "sam") } diff --git a/spec/job/adapters/slurm_spec.rb b/spec/job/adapters/slurm_spec.rb index d4c707fd9..55fce9755 100644 --- a/spec/job/adapters/slurm_spec.rb +++ b/spec/job/adapters/slurm_spec.rb @@ -10,6 +10,7 @@ it { is_expected.to respond_to(:submit).with(1).argument.and_keywords(:after, :afterok, :afternotok, :afterany) } it { is_expected.to respond_to(:info_all).with(0).arguments.and_keywords(:attrs) } + it { is_expected.to respond_to(:info_historic).with(0).arguments.and_keywords(:opts) } it { is_expected.to respond_to(:info_where_owner).with(1).argument.and_keywords(:attrs) } it { is_expected.to respond_to(:info).with(1).argument } it { is_expected.to respond_to(:status).with(1).argument } @@ -358,6 +359,48 @@ def build_script(opts = {}) end end + describe "#info_historic" do + context "when no jobs" do + it "returns an array of all the jobs" do + adapter = OodCore::Job::Adapters::Slurm.new(slurm: double(sacct_info: [])) + expect(adapter.info_historic).to eq([]) + end + end + + context "when jobs" do + it "returns an array of all the jobs" do + batch = OodCore::Job::Adapters::Slurm::Batch.new( + conf: "/etc/slurm/conf/", + bin: nil, + bin_overrides: { "sacct" => "spec/fixtures/scripts/sacct.rb"} + ) + jobs = OodCore::Job::Adapters::Slurm.new(slurm: batch).info_historic + + expect(jobs.count).to eq(2) + + j1 = jobs.first + expect(j1.id).to eq("20251") + expect(j1.job_name).to eq("RDesktop") + expect(j1.queue_name).to eq("normal") + expect(j1.status).to eq("queued") + expect(j1.status).to eq(OodCore::Job::Status.new(state: :queued)) + expect(j1.status.to_s).to eq("queued") + expect(j1.gpus).to eq(1) + expect(j1.gpu?).to eq(true) + + j2 = jobs.last + expect(j2.id).to eq("20252") + expect(j2.job_name).to eq("RStudio") + expect(j2.queue_name).to eq("normal") + expect(j2.status).to eq("running") + expect(j2.status).to eq(OodCore::Job::Status.new(state: :running)) + expect(j2.status.to_s).to eq("running") + expect(j2.gpus).to eq(0) + expect(j2.gpu?).to eq(false) + end + end + end + describe "#info" do def job_info(opts = {}) OodCore::Job::Info.new(