From 5d1c124ffcc68bdf1b9042200fe82cb2dc5faa41 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Wed, 8 Oct 2025 19:58:55 +0100 Subject: [PATCH 1/2] adjustments to work with sacct file --- gracehpc/cli.py | 30 ++++++++++++++++++++----- gracehpc/core/backend_utils.py | 8 +++++-- gracehpc/core/emissions_calculator.py | 9 ++++---- gracehpc/core/job_log_manager.py | 8 ++++++- gracehpc/interface/cli_script_output.py | 14 +++++++++--- 5 files changed, 54 insertions(+), 15 deletions(-) diff --git a/gracehpc/cli.py b/gracehpc/cli.py index 4f0e88e..9e5e994 100644 --- a/gracehpc/cli.py +++ b/gracehpc/cli.py @@ -108,6 +108,13 @@ def main(): help="Comma-separated list (no spaces) of all the HPC job IDs to filter on. Default: 'all_jobs'", default = "all_jobs") + # If sacct output is already saved to a file, path to the file + run_subcommand.add_argument("--sacct_file", + type=str, + help="Path to a pre-saved sacct output file (as pipe-delimited text). If provided, this file will be used instead of calling sacct directly. Default: None", + default = None) + + # Region argument for carbon intensity data run_subcommand.add_argument("--Region", type=str, @@ -138,6 +145,13 @@ def main(): "'all' : all of the above datasets saved to CSV files. Default: 'no_save'." )) + run_subcommand.add_argument("--allow-multiple-users", + action='store_true', + help=( + "Process jobs from multiple users." + "By default this is false to avoid accidentally processing jobs from multiple users." + )) + # Parse CLI arguments arguments = arg_parser.parse_args() @@ -153,11 +167,17 @@ def main(): # Handle the 'gracehpc run' command elif arguments.command == "run": - try: - confirm_date_args(arguments) # Check if the date arguments are valid - except ValueError as e: - print(f"❌ Date validation error: {e}") - sys.exit(1) # exit the script with an error code + if arguments.sacct_file is None: + try: + confirm_date_args(arguments) # Check if the date arguments are valid + except ValueError as e: + print(f"❌ Date validation error: {e}") + sys.exit(1) # exit the script with an error code + else: + print("Ignoring StartDate and EndDate arguments since a sacct_file has been provided.") + if not os.path.isfile(arguments.sacct_file): + print(f"❌ The provided sacct_file path does not exist or is not a file: {arguments.sacct_file}") + sys.exit(1) # exit the script with an error code # Execute the entire backend (core_engine) by passing the arguments full_df, daily_df, total_df = core_engine(arguments) diff --git a/gracehpc/core/backend_utils.py b/gracehpc/core/backend_utils.py index 2aafaf6..98dc0a4 100644 --- a/gracehpc/core/backend_utils.py +++ b/gracehpc/core/backend_utils.py @@ -151,6 +151,7 @@ def memory_conversion(self, value, unit_label): Args: value (float): The numeric value of memory unit_label (str): the unit associated with the memory value. Must be: + - T (terabytes) - M (megabytes) - G (gigabytes) - K (kilobytes) @@ -159,8 +160,11 @@ def memory_conversion(self, value, unit_label): float: Memory value converted to gigabytes """ # Check unit label is one of the expected - assert unit_label in ['M', 'G', 'K'], f"Invalid unit '{unit_label}. Expected to be either 'M', 'G', 'K']." + assert unit_label in ['T', 'M', 'G', 'K'], f"Invalid unit '{unit_label}. Expected to be either 'T', 'M', 'G', 'K']." + # If unit is terabytes, multiply by 1000 + if unit_label == 'T': + value = value * 1e3 # 1 GB = 0.001 TB # If unit is megabytes, divide by 1000 if unit_label == 'M': value = value / 1e3 # 1 GB = 1000 MB @@ -205,7 +209,7 @@ def requested_memory(self, job_record): total_memory_gb = float(raw_memory_requested[:-2]) * total_cpus # If the memory string ends with a standard unit, parse directly - elif raw_memory_requested[-1] in ['M', 'G', 'K']: + elif raw_memory_requested[-1] in ['T', 'M', 'G', 'K']: memory_unit = raw_memory_requested[-1] # extract unit (last character) total_memory_gb = float(raw_memory_requested[:-1]) diff --git a/gracehpc/core/emissions_calculator.py b/gracehpc/core/emissions_calculator.py index 4b73076..e15172d 100644 --- a/gracehpc/core/emissions_calculator.py +++ b/gracehpc/core/emissions_calculator.py @@ -221,10 +221,11 @@ def get_job_logs(arguments, hpc_config): # Ensure the processed (aggregated) dataframe is also not empty exit_if_no_jobs(JLP.filtered_df, arguments) - # Verify that the final df only contains logs from a single user - if len(set(JLP.final_df.UserName)) > 1: - raise ValueError(f"Multiple users found in the job logs: {set(JLP.final_df.UserName)}. Please ensure you are only processing logs for a single user.") - + if not arguments.allow_multiple_users: + # Verify that the final df only contains logs from a single user + if len(set(JLP.final_df.UserName)) > 1: + raise ValueError(f"Multiple users found in the job logs: {set(JLP.final_df.UserName)}. Please ensure you are only processing logs for a single user or use the --allow-multiple-users flag.") + # Return the final processed/filtered dataframe return JLP.final_df diff --git a/gracehpc/core/job_log_manager.py b/gracehpc/core/job_log_manager.py index 6cfa3e5..f0ddc13 100644 --- a/gracehpc/core/job_log_manager.py +++ b/gracehpc/core/job_log_manager.py @@ -67,7 +67,13 @@ def retrieve_job_logs(self): This method retrieves the accounting logs based on the arguments (e.g. start and end dates). The output includes raw job metadata which is parsed and processed later. """ - # Construct the SLURM command with the user arguments and correct formatting + # If the user has provided a sacct file, read from that instead of running sacct command + if self.arguments.sacct_file: + with open(self.arguments.sacct_file, 'rb') as f: + self.sacct_data = f.read() + return + + # Otherwise, construct the SLURM command with the user arguments and correct formatting slurm_command = [ "sacct", "--start", self.arguments.StartDate, diff --git a/gracehpc/interface/cli_script_output.py b/gracehpc/interface/cli_script_output.py index 341868a..8e14565 100644 --- a/gracehpc/interface/cli_script_output.py +++ b/gracehpc/interface/cli_script_output.py @@ -104,9 +104,17 @@ def results_terminal_display(full_df, daily_df, total_df, arguments, hpc_config) return # Extract variables from arguments and data - user_name = full_df.iloc[0].get("UserName", "N/A") - start_date = arguments.StartDate if hasattr(arguments, "StartDate") else "N/A" - end_date = arguments.EndDate if hasattr(arguments, "EndDate") else "N/A" + if arguments.allow_multiple_users: + user_name = "Multiple Users" + else: + user_name = full_df.iloc[0].get("UserName", "N/A") + if arguments.sacct_file is None: + start_date = arguments.StartDate if hasattr(arguments, "StartDate") else "N/A" + end_date = arguments.EndDate if hasattr(arguments, "EndDate") else "N/A" + else: + # See below + start_date = "See below" + end_date = "See below" hpc_name = hpc_config.get('hpc_system', 'Unknown HPC System') # Job ID to filter on From 1a457460e6c495312c1d72da682634e16cd2daa3 Mon Sep 17 00:00:00 2001 From: Rosie Wood Date: Thu, 9 Oct 2025 10:06:51 +0100 Subject: [PATCH 2/2] add aggregate-ci option --- gracehpc/cli.py | 11 +- gracehpc/core/backend_utils.py | 200 ++++++++++++++------------ gracehpc/core/emissions_calculator.py | 5 +- 3 files changed, 121 insertions(+), 95 deletions(-) diff --git a/gracehpc/cli.py b/gracehpc/cli.py index 9e5e994..370473d 100644 --- a/gracehpc/cli.py +++ b/gracehpc/cli.py @@ -124,7 +124,16 @@ def main(): "Options: 'North Scotland', 'South Scotland', 'North West England', 'North East England', 'Yorkshire', 'North Wales', 'South Wales', 'West Midlands', " "'East Midlands', 'East England', 'South West England', 'South England', 'London', 'South East England'. Default: 'UK_average'. " "E.g. 'South West England' for Isambard systems and South Scotland for Archer2.")) - + + # Allow aggregated CI values + run_subcommand.add_argument("--aggregate-ci", + action="store_true", + help=( + "Use aggregated carbon intensity values from the Carbon Intensity API to reduce the number of API calls. " + "Will use hourly aggregated values if set." + )) + + # Adding Scope 3 emissions or not run_subcommand.add_argument("--Scope3", type=str, diff --git a/gracehpc/core/backend_utils.py b/gracehpc/core/backend_utils.py index 98dc0a4..3bf9423 100644 --- a/gracehpc/core/backend_utils.py +++ b/gracehpc/core/backend_utils.py @@ -558,103 +558,119 @@ def save(df, filename): save(total_df[total_summary_columns], "total_data_summary") +class CarbonIntensityCalculator(): + def __init__(self): + self.cached_value = {} -# Function for querying the Carbon Intensity API for realtime carbon intensity data -def get_carbon_intensity(submission_times, arguments): - """ - For each job submission time, this function queries the Carbon Intensity API for the specified region - and returns the carbon intensity value (gCO2e/kWh) for that time. If the API fails, it falls back - to the UK average carbon intensity value (2024). + # Function for querying the Carbon Intensity API for realtime carbon intensity data + def get_carbon_intensity(self, submission_times, arguments): + """ + For each job submission time, this function queries the Carbon Intensity API for the specified region + and returns the carbon intensity value (gCO2e/kWh) for that time. If the API fails, it falls back + to the UK average carbon intensity value (2024). - API documentation: https://carbon-intensity.github.io/api-definitions/#get-regional-intensity-from-to-regionid-regionid + API documentation: https://carbon-intensity.github.io/api-definitions/#get-regional-intensity-from-to-regionid-regionid - Args: - submission_times (pd.Series): Series of datetime job submission timestamps ('SubmissionTime' column) - arguments (argparse.Namespace): User arguments entered in the CLI or script/JN usable function - must contain 'Region' attribute + Args: + submission_times (pd.Series): Series of datetime job submission timestamps ('SubmissionTime' column) + arguments (argparse.Namespace): User arguments entered in the CLI or script/JN usable function - must contain 'Region' attribute - Return: - pd.Series: Series of carbon intensity values (gCO2e/kWh) corresponding to each job. - """ - # Define constants for the API - Date_format_api = "%Y-%m-%dT%H:%MZ" - time_window = timedelta(minutes=30) # 30 minutes time window for the API query - default_CI = 124 # Average UK carbon intensity of electricity (gCO2e/kWh) - 2024 - https://www.carbonbrief.org/analysis-uks-electricity-was-cleanest-ever-in-2024/ - - # Map the Region Name provided by the user to the region ID used by the API - region_map = { - "North Scotland": 1, - "South Scotland": 2, - "North West England": 3, - "North East England": 4, - "Yorkshire": 5, - "North Wales": 6, - "South Wales": 7, - "West Midlands": 8, - "East Midlands": 9, - "East England": 10, - "South West England": 11, - "South England": 12, - "London": 13, - "South East England": 14 - } - - # Extract region name from user arguments and the corresponding region ID - region_name = arguments.Region - - # If the user has not specified a region, use the default UK average carbon intensity for all jobs - if region_name == "UK_average": - return pd.Series([default_CI] * len(submission_times), index=submission_times.index) - - # Confirm the region name given by the user is valid - region_id = region_map.get(region_name) - if region_id is None: - raise ValueError(f"Invalid region name: '{region_name}'. Must be one of:\n{['UK_average'] + list(region_map.keys())}") - - # Loop over each job submission time and query the API - carbon_intensity_values = [] - for DateTime in submission_times: - try: - # Confirm that the datetime is in UTC and timezone-aware for API compatibility - if DateTime.tzinfo is None: - # from_utc = DateTime.tz_localize("Europe/London").tz_convert("UTC") - from_utc = DateTime.replace(tzinfo=pytz.UTC) - else: - # from_utc = DateTime.tz_convert("UTC") - from_utc = DateTime.astimezone(pytz.UTC) - except Exception as e: - print(f"Error converting datetime {DateTime} to UTC: {e}") - carbon_intensity_values.append(default_CI) - continue - - to_utc = from_utc + time_window # the end time is the start time + 30 minutes - from_string = from_utc.strftime(Date_format_api) - to_string = to_utc.strftime(Date_format_api) - - # Querying the API (request) for each job - url = f"https://api.carbonintensity.org.uk/regional/intensity/{from_string}/{to_string}/regionid/{region_id}" - try: - # Make the GET request to the API - api_response = requests.get(url, headers={"Accept": "application/json"}, timeout=10) + Return: + pd.Series: Series of carbon intensity values (gCO2e/kWh) corresponding to each job. + """ + # Define constants for the API + Date_format_api = "%Y-%m-%dT%H:%MZ" + + if arguments.aggregate_ci: + time_window = timedelta(hours=1) # 1 hour time window for aggregated carbon intensity values + else: + time_window = timedelta(minutes=30) # 30 minutes time window for the API query + + default_CI = 124 # Average UK carbon intensity of electricity (gCO2e/kWh) - 2024 - https://www.carbonbrief.org/analysis-uks-electricity-was-cleanest-ever-in-2024/ + + # Map the Region Name provided by the user to the region ID used by the API + region_map = { + "North Scotland": 1, + "South Scotland": 2, + "North West England": 3, + "North East England": 4, + "Yorkshire": 5, + "North Wales": 6, + "South Wales": 7, + "West Midlands": 8, + "East Midlands": 9, + "East England": 10, + "South West England": 11, + "South England": 12, + "London": 13, + "South East England": 14 + } + + # Extract region name from user arguments and the corresponding region ID + region_name = arguments.Region + + # If the user has not specified a region, use the default UK average carbon intensity for all jobs + if region_name == "UK_average": + return pd.Series([default_CI] * len(submission_times), index=submission_times.index) + + # Confirm the region name given by the user is valid + region_id = region_map.get(region_name) + if region_id is None: + raise ValueError(f"Invalid region name: '{region_name}'. Must be one of:\n{['UK_average'] + list(region_map.keys())}") + + # Loop over each job submission time and query the API + carbon_intensity_values = [] + for DateTime in submission_times: + try: + # Confirm that the datetime is in UTC and timezone-aware for API compatibility + if DateTime.tzinfo is None: + # from_utc = DateTime.tz_localize("Europe/London").tz_convert("UTC") + from_utc = DateTime.replace(tzinfo=pytz.UTC) + else: + # from_utc = DateTime.tz_convert("UTC") + from_utc = DateTime.astimezone(pytz.UTC) + except Exception as e: + print(f"Error converting datetime {DateTime} to UTC: {e}") + carbon_intensity_values.append(default_CI) + continue - # raise an error if the request was unsuccessful - api_response.raise_for_status() - - # Parse the JSON response as JSON format - json_CI_response = api_response.json() - - # Extract the carbon intensity value (gCO2e/kWh) from the response - carbon_intensity = json_CI_response["data"]["data"][0]["intensity"]["forecast"] - - # Append the value to the list - carbon_intensity_values.append(carbon_intensity) - - except Exception as e: - # If the API request fails, use the default carbon intensity value (UK annual average) - print(f"Failed to get carbon intensity for {DateTime} from the API. Using UK average: {default_CI} gCO2e/kWh. Error: {e}") - carbon_intensity_values.append(default_CI) - - # Return the carbon intensity values as a pandas Series with the same index as submission_times - return pd.Series(carbon_intensity_values, index=submission_times.index) + if arguments.aggregate_ci: + # Drop minutes and seconds + from_utc = from_utc.replace(minute=0, second=0) + if from_utc in self.cached_value: + carbon_intensity_values.append(self.cached_value[from_utc]) + continue + + to_utc = from_utc + time_window # the end time is the start time + time window + from_string = from_utc.strftime(Date_format_api) + to_string = to_utc.strftime(Date_format_api) + + # Querying the API (request) for each job + url = f"https://api.carbonintensity.org.uk/regional/intensity/{from_string}/{to_string}/regionid/{region_id}" + try: + # Make the GET request to the API + api_response = requests.get(url, headers={"Accept": "application/json"}, timeout=10) + + # raise an error if the request was unsuccessful + api_response.raise_for_status() + + # Parse the JSON response as JSON format + json_CI_response = api_response.json() + + # Extract the carbon intensity value (gCO2e/kWh) from the response + carbon_intensity = json_CI_response["data"]["data"][0]["intensity"]["forecast"] + + # Append the value to the list + carbon_intensity_values.append(carbon_intensity) + self.cached_value[from_utc] = carbon_intensity # Cache the value + + except Exception as e: + # If the API request fails, use the default carbon intensity value (UK annual average) + print(f"Failed to get carbon intensity for {DateTime} from the API. Using UK average: {default_CI} gCO2e/kWh. Error: {e}") + carbon_intensity_values.append(default_CI) + + # Return the carbon intensity values as a pandas Series with the same index as submission_times + return pd.Series(carbon_intensity_values, index=submission_times.index) diff --git a/gracehpc/core/emissions_calculator.py b/gracehpc/core/emissions_calculator.py index e15172d..0199fed 100644 --- a/gracehpc/core/emissions_calculator.py +++ b/gracehpc/core/emissions_calculator.py @@ -24,7 +24,7 @@ import requests # Import functions/classes from other modules -from .backend_utils import exit_if_no_jobs, save_output_dfs, get_carbon_intensity +from .backend_utils import exit_if_no_jobs, save_output_dfs, CarbonIntensityCalculator from .job_log_manager import JobLogProcessor # Class to estimate energy consumption, scope 2 (operational emissions) and scope 3 (embodied emissions) @@ -252,7 +252,8 @@ def add_emissions_data (df_jobs, emissions_calculator, hpc_config): df_jobs['failed_energy_kwh'] = np.where(df_jobs['StateCode'] == 0, df_jobs.energy_estimated_kwh, 0) # Get the carbon intensity value at the submission time of each job from the web API - depends on the 'Region' argument given by the user - carbon_intensity_values = get_carbon_intensity(df_jobs['SubmissionTime'], emissions_calculator.arguments) + carbon_intensity_calculator = CarbonIntensityCalculator() + carbon_intensity_values = carbon_intensity_calculator.get_carbon_intensity(df_jobs['SubmissionTime'], emissions_calculator.arguments) # Store carbon intensity values in the dataframe df_jobs['CarbonIntensity_gCO2e_kwh'] = carbon_intensity_values