Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions gracehpc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ def main():
help="Comma-separated list (no spaces) of all the HPC job IDs to filter on. Default: 'all_jobs'",
default = "all_jobs")

# If sacct output is already saved to a file, path to the file
run_subcommand.add_argument("--sacct_file",
type=str,
help="Path to a pre-saved sacct output file (as pipe-delimited text). If provided, this file will be used instead of calling sacct directly. Default: None",
default = None)


# Region argument for carbon intensity data
run_subcommand.add_argument("--Region",
type=str,
Expand Down Expand Up @@ -138,6 +145,13 @@ def main():
"'all' : all of the above datasets saved to CSV files. Default: 'no_save'."
))

run_subcommand.add_argument("--allow-multiple-users",
action='store_true',
help=(
"Process jobs from multiple users."
"By default this is false to avoid accidentally processing jobs from multiple users."
))


# Parse CLI arguments
arguments = arg_parser.parse_args()
Expand All @@ -153,11 +167,17 @@ def main():
# Handle the 'gracehpc run' command
elif arguments.command == "run":

try:
confirm_date_args(arguments) # Check if the date arguments are valid
except ValueError as e:
print(f"❌ Date validation error: {e}")
sys.exit(1) # exit the script with an error code
if arguments.sacct_file is None:
try:
confirm_date_args(arguments) # Check if the date arguments are valid
except ValueError as e:
print(f"❌ Date validation error: {e}")
sys.exit(1) # exit the script with an error code
else:
print("Ignoring StartDate and EndDate arguments since a sacct_file has been provided.")
if not os.path.isfile(arguments.sacct_file):
print(f"❌ The provided sacct_file path does not exist or is not a file: {arguments.sacct_file}")
sys.exit(1) # exit the script with an error code

# Execute the entire backend (core_engine) by passing the arguments
full_df, daily_df, total_df = core_engine(arguments)
Expand Down
8 changes: 6 additions & 2 deletions gracehpc/core/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def memory_conversion(self, value, unit_label):
Args:
value (float): The numeric value of memory
unit_label (str): the unit associated with the memory value. Must be:
- T (terabytes)
- M (megabytes)
- G (gigabytes)
- K (kilobytes)
Expand All @@ -159,8 +160,11 @@ def memory_conversion(self, value, unit_label):
float: Memory value converted to gigabytes
"""
# Check unit label is one of the expected
assert unit_label in ['M', 'G', 'K'], f"Invalid unit '{unit_label}. Expected to be either 'M', 'G', 'K']."
assert unit_label in ['T', 'M', 'G', 'K'], f"Invalid unit '{unit_label}. Expected to be either 'T', 'M', 'G', 'K']."

# If unit is terabytes, multiply by 1000
if unit_label == 'T':
value = value * 1e3 # 1 GB = 0.001 TB
# If unit is megabytes, divide by 1000
if unit_label == 'M':
value = value / 1e3 # 1 GB = 1000 MB
Expand Down Expand Up @@ -205,7 +209,7 @@ def requested_memory(self, job_record):
total_memory_gb = float(raw_memory_requested[:-2]) * total_cpus

# If the memory string ends with a standard unit, parse directly
elif raw_memory_requested[-1] in ['M', 'G', 'K']:
elif raw_memory_requested[-1] in ['T', 'M', 'G', 'K']:
memory_unit = raw_memory_requested[-1] # extract unit (last character)
total_memory_gb = float(raw_memory_requested[:-1])

Expand Down
9 changes: 5 additions & 4 deletions gracehpc/core/emissions_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ def get_job_logs(arguments, hpc_config):
# Ensure the processed (aggregated) dataframe is also not empty
exit_if_no_jobs(JLP.filtered_df, arguments)

# Verify that the final df only contains logs from a single user
if len(set(JLP.final_df.UserName)) > 1:
raise ValueError(f"Multiple users found in the job logs: {set(JLP.final_df.UserName)}. Please ensure you are only processing logs for a single user.")

if not arguments.allow_multiple_users:
# Verify that the final df only contains logs from a single user
if len(set(JLP.final_df.UserName)) > 1:
raise ValueError(f"Multiple users found in the job logs: {set(JLP.final_df.UserName)}. Please ensure you are only processing logs for a single user or use the --allow-multiple-users flag.")

# Return the final processed/filtered dataframe
return JLP.final_df

Expand Down
8 changes: 7 additions & 1 deletion gracehpc/core/job_log_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ def retrieve_job_logs(self):
This method retrieves the accounting logs based on the arguments (e.g. start and end dates).
The output includes raw job metadata which is parsed and processed later.
"""
# Construct the SLURM command with the user arguments and correct formatting
# If the user has provided a sacct file, read from that instead of running sacct command
if self.arguments.sacct_file:
with open(self.arguments.sacct_file, 'rb') as f:
self.sacct_data = f.read()
return

# Otherwise, construct the SLURM command with the user arguments and correct formatting
slurm_command = [
"sacct",
"--start", self.arguments.StartDate,
Expand Down
14 changes: 11 additions & 3 deletions gracehpc/interface/cli_script_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,17 @@ def results_terminal_display(full_df, daily_df, total_df, arguments, hpc_config)
return

# Extract variables from arguments and data
user_name = full_df.iloc[0].get("UserName", "N/A")
start_date = arguments.StartDate if hasattr(arguments, "StartDate") else "N/A"
end_date = arguments.EndDate if hasattr(arguments, "EndDate") else "N/A"
if arguments.allow_multiple_users:
user_name = "Multiple Users"
else:
user_name = full_df.iloc[0].get("UserName", "N/A")
if arguments.sacct_file is None:
start_date = arguments.StartDate if hasattr(arguments, "StartDate") else "N/A"
end_date = arguments.EndDate if hasattr(arguments, "EndDate") else "N/A"
else:
# See below
start_date = "See below"
end_date = "See below"
hpc_name = hpc_config.get('hpc_system', 'Unknown HPC System')

# Job ID to filter on
Expand Down