Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions gracehpc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ def main():
help="Comma-separated list (no spaces) of all the HPC job IDs to filter on. Default: 'all_jobs'",
default = "all_jobs")

# If sacct output is already saved to a file, path to the file
run_subcommand.add_argument("--sacct_file",
type=str,
help="Path to a pre-saved sacct output file (as pipe-delimited text). If provided, this file will be used instead of calling sacct directly. Default: None",
default = None)


# Region argument for carbon intensity data
run_subcommand.add_argument("--Region",
type=str,
Expand All @@ -117,7 +124,16 @@ def main():
"Options: 'North Scotland', 'South Scotland', 'North West England', 'North East England', 'Yorkshire', 'North Wales', 'South Wales', 'West Midlands', "
"'East Midlands', 'East England', 'South West England', 'South England', 'London', 'South East England'. Default: 'UK_average'. "
"E.g. 'South West England' for Isambard systems and South Scotland for Archer2."))


# Allow aggregated CI values
run_subcommand.add_argument("--aggregate-ci",
action="store_true",
help=(
"Use aggregated carbon intensity values from the Carbon Intensity API to reduce the number of API calls. "
"Will use hourly aggregated values if set."
))


# Adding Scope 3 emissions or not
run_subcommand.add_argument("--Scope3",
type=str,
Expand All @@ -138,6 +154,13 @@ def main():
"'all' : all of the above datasets saved to CSV files. Default: 'no_save'."
))

run_subcommand.add_argument("--allow-multiple-users",
action='store_true',
help=(
"Process jobs from multiple users."
"By default this is false to avoid accidentally processing jobs from multiple users."
))


# Parse CLI arguments
arguments = arg_parser.parse_args()
Expand All @@ -153,11 +176,17 @@ def main():
# Handle the 'gracehpc run' command
elif arguments.command == "run":

try:
confirm_date_args(arguments) # Check if the date arguments are valid
except ValueError as e:
print(f"❌ Date validation error: {e}")
sys.exit(1) # exit the script with an error code
if arguments.sacct_file is None:
try:
confirm_date_args(arguments) # Check if the date arguments are valid
except ValueError as e:
print(f"❌ Date validation error: {e}")
sys.exit(1) # exit the script with an error code
else:
print("Ignoring StartDate and EndDate arguments since a sacct_file has been provided.")
if not os.path.isfile(arguments.sacct_file):
print(f"❌ The provided sacct_file path does not exist or is not a file: {arguments.sacct_file}")
sys.exit(1) # exit the script with an error code

# Execute the entire backend (core_engine) by passing the arguments
full_df, daily_df, total_df = core_engine(arguments)
Expand Down
208 changes: 114 additions & 94 deletions gracehpc/core/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def memory_conversion(self, value, unit_label):
Args:
value (float): The numeric value of memory
unit_label (str): the unit associated with the memory value. Must be:
- T (terabytes)
- M (megabytes)
- G (gigabytes)
- K (kilobytes)
Expand All @@ -159,8 +160,11 @@ def memory_conversion(self, value, unit_label):
float: Memory value converted to gigabytes
"""
# Check unit label is one of the expected
assert unit_label in ['M', 'G', 'K'], f"Invalid unit '{unit_label}. Expected to be either 'M', 'G', 'K']."
assert unit_label in ['T', 'M', 'G', 'K'], f"Invalid unit '{unit_label}. Expected to be either 'T', 'M', 'G', 'K']."

# If unit is terabytes, multiply by 1000
if unit_label == 'T':
value = value * 1e3 # 1 GB = 0.001 TB
# If unit is megabytes, divide by 1000
if unit_label == 'M':
value = value / 1e3 # 1 GB = 1000 MB
Expand Down Expand Up @@ -205,7 +209,7 @@ def requested_memory(self, job_record):
total_memory_gb = float(raw_memory_requested[:-2]) * total_cpus

# If the memory string ends with a standard unit, parse directly
elif raw_memory_requested[-1] in ['M', 'G', 'K']:
elif raw_memory_requested[-1] in ['T', 'M', 'G', 'K']:
memory_unit = raw_memory_requested[-1] # extract unit (last character)
total_memory_gb = float(raw_memory_requested[:-1])

Expand Down Expand Up @@ -554,103 +558,119 @@ def save(df, filename):
save(total_df[total_summary_columns], "total_data_summary")


class CarbonIntensityCalculator():
def __init__(self):
self.cached_value = {}

# Function for querying the Carbon Intensity API for realtime carbon intensity data
def get_carbon_intensity(submission_times, arguments):
"""
For each job submission time, this function queries the Carbon Intensity API for the specified region
and returns the carbon intensity value (gCO2e/kWh) for that time. If the API fails, it falls back
to the UK average carbon intensity value (2024).
# Function for querying the Carbon Intensity API for realtime carbon intensity data
def get_carbon_intensity(self, submission_times, arguments):
"""
For each job submission time, this function queries the Carbon Intensity API for the specified region
and returns the carbon intensity value (gCO2e/kWh) for that time. If the API fails, it falls back
to the UK average carbon intensity value (2024).

API documentation: https://carbon-intensity.github.io/api-definitions/#get-regional-intensity-from-to-regionid-regionid
API documentation: https://carbon-intensity.github.io/api-definitions/#get-regional-intensity-from-to-regionid-regionid

Args:
submission_times (pd.Series): Series of datetime job submission timestamps ('SubmissionTime' column)
arguments (argparse.Namespace): User arguments entered in the CLI or script/JN usable function - must contain 'Region' attribute
Args:
submission_times (pd.Series): Series of datetime job submission timestamps ('SubmissionTime' column)
arguments (argparse.Namespace): User arguments entered in the CLI or script/JN usable function - must contain 'Region' attribute

Return:
pd.Series: Series of carbon intensity values (gCO2e/kWh) corresponding to each job.
"""
# Define constants for the API
Date_format_api = "%Y-%m-%dT%H:%MZ"
time_window = timedelta(minutes=30) # 30 minutes time window for the API query
default_CI = 124 # Average UK carbon intensity of electricity (gCO2e/kWh) - 2024 - https://www.carbonbrief.org/analysis-uks-electricity-was-cleanest-ever-in-2024/

# Map the Region Name provided by the user to the region ID used by the API
region_map = {
"North Scotland": 1,
"South Scotland": 2,
"North West England": 3,
"North East England": 4,
"Yorkshire": 5,
"North Wales": 6,
"South Wales": 7,
"West Midlands": 8,
"East Midlands": 9,
"East England": 10,
"South West England": 11,
"South England": 12,
"London": 13,
"South East England": 14
}

# Extract region name from user arguments and the corresponding region ID
region_name = arguments.Region

# If the user has not specified a region, use the default UK average carbon intensity for all jobs
if region_name == "UK_average":
return pd.Series([default_CI] * len(submission_times), index=submission_times.index)

# Confirm the region name given by the user is valid
region_id = region_map.get(region_name)
if region_id is None:
raise ValueError(f"Invalid region name: '{region_name}'. Must be one of:\n{['UK_average'] + list(region_map.keys())}")

# Loop over each job submission time and query the API
carbon_intensity_values = []
for DateTime in submission_times:
try:
# Confirm that the datetime is in UTC and timezone-aware for API compatibility
if DateTime.tzinfo is None:
# from_utc = DateTime.tz_localize("Europe/London").tz_convert("UTC")
from_utc = DateTime.replace(tzinfo=pytz.UTC)
else:
# from_utc = DateTime.tz_convert("UTC")
from_utc = DateTime.astimezone(pytz.UTC)
except Exception as e:
print(f"Error converting datetime {DateTime} to UTC: {e}")
carbon_intensity_values.append(default_CI)
continue

to_utc = from_utc + time_window # the end time is the start time + 30 minutes
from_string = from_utc.strftime(Date_format_api)
to_string = to_utc.strftime(Date_format_api)

# Querying the API (request) for each job
url = f"https://api.carbonintensity.org.uk/regional/intensity/{from_string}/{to_string}/regionid/{region_id}"
try:
# Make the GET request to the API
api_response = requests.get(url, headers={"Accept": "application/json"}, timeout=10)
Return:
pd.Series: Series of carbon intensity values (gCO2e/kWh) corresponding to each job.
"""
# Define constants for the API
Date_format_api = "%Y-%m-%dT%H:%MZ"

if arguments.aggregate_ci:
time_window = timedelta(hours=1) # 1 hour time window for aggregated carbon intensity values
else:
time_window = timedelta(minutes=30) # 30 minutes time window for the API query

default_CI = 124 # Average UK carbon intensity of electricity (gCO2e/kWh) - 2024 - https://www.carbonbrief.org/analysis-uks-electricity-was-cleanest-ever-in-2024/

# Map the Region Name provided by the user to the region ID used by the API
region_map = {
"North Scotland": 1,
"South Scotland": 2,
"North West England": 3,
"North East England": 4,
"Yorkshire": 5,
"North Wales": 6,
"South Wales": 7,
"West Midlands": 8,
"East Midlands": 9,
"East England": 10,
"South West England": 11,
"South England": 12,
"London": 13,
"South East England": 14
}

# Extract region name from user arguments and the corresponding region ID
region_name = arguments.Region

# If the user has not specified a region, use the default UK average carbon intensity for all jobs
if region_name == "UK_average":
return pd.Series([default_CI] * len(submission_times), index=submission_times.index)

# Confirm the region name given by the user is valid
region_id = region_map.get(region_name)
if region_id is None:
raise ValueError(f"Invalid region name: '{region_name}'. Must be one of:\n{['UK_average'] + list(region_map.keys())}")

# Loop over each job submission time and query the API
carbon_intensity_values = []
for DateTime in submission_times:
try:
# Confirm that the datetime is in UTC and timezone-aware for API compatibility
if DateTime.tzinfo is None:
# from_utc = DateTime.tz_localize("Europe/London").tz_convert("UTC")
from_utc = DateTime.replace(tzinfo=pytz.UTC)
else:
# from_utc = DateTime.tz_convert("UTC")
from_utc = DateTime.astimezone(pytz.UTC)
except Exception as e:
print(f"Error converting datetime {DateTime} to UTC: {e}")
carbon_intensity_values.append(default_CI)
continue

# raise an error if the request was unsuccessful
api_response.raise_for_status()

# Parse the JSON response as JSON format
json_CI_response = api_response.json()

# Extract the carbon intensity value (gCO2e/kWh) from the response
carbon_intensity = json_CI_response["data"]["data"][0]["intensity"]["forecast"]

# Append the value to the list
carbon_intensity_values.append(carbon_intensity)

except Exception as e:
# If the API request fails, use the default carbon intensity value (UK annual average)
print(f"Failed to get carbon intensity for {DateTime} from the API. Using UK average: {default_CI} gCO2e/kWh. Error: {e}")
carbon_intensity_values.append(default_CI)

# Return the carbon intensity values as a pandas Series with the same index as submission_times
return pd.Series(carbon_intensity_values, index=submission_times.index)
if arguments.aggregate_ci:
# Drop minutes and seconds
from_utc = from_utc.replace(minute=0, second=0)
if from_utc in self.cached_value:
carbon_intensity_values.append(self.cached_value[from_utc])
continue

to_utc = from_utc + time_window # the end time is the start time + time window
from_string = from_utc.strftime(Date_format_api)
to_string = to_utc.strftime(Date_format_api)

# Querying the API (request) for each job
url = f"https://api.carbonintensity.org.uk/regional/intensity/{from_string}/{to_string}/regionid/{region_id}"
try:
# Make the GET request to the API
api_response = requests.get(url, headers={"Accept": "application/json"}, timeout=10)

# raise an error if the request was unsuccessful
api_response.raise_for_status()

# Parse the JSON response as JSON format
json_CI_response = api_response.json()

# Extract the carbon intensity value (gCO2e/kWh) from the response
carbon_intensity = json_CI_response["data"]["data"][0]["intensity"]["forecast"]

# Append the value to the list
carbon_intensity_values.append(carbon_intensity)
self.cached_value[from_utc] = carbon_intensity # Cache the value

except Exception as e:
# If the API request fails, use the default carbon intensity value (UK annual average)
print(f"Failed to get carbon intensity for {DateTime} from the API. Using UK average: {default_CI} gCO2e/kWh. Error: {e}")
carbon_intensity_values.append(default_CI)

# Return the carbon intensity values as a pandas Series with the same index as submission_times
return pd.Series(carbon_intensity_values, index=submission_times.index)



Expand Down
14 changes: 8 additions & 6 deletions gracehpc/core/emissions_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import requests

# Import functions/classes from other modules
from .backend_utils import exit_if_no_jobs, save_output_dfs, get_carbon_intensity
from .backend_utils import exit_if_no_jobs, save_output_dfs, CarbonIntensityCalculator
from .job_log_manager import JobLogProcessor

# Class to estimate energy consumption, scope 2 (operational emissions) and scope 3 (embodied emissions)
Expand Down Expand Up @@ -221,10 +221,11 @@ def get_job_logs(arguments, hpc_config):
# Ensure the processed (aggregated) dataframe is also not empty
exit_if_no_jobs(JLP.filtered_df, arguments)

# Verify that the final df only contains logs from a single user
if len(set(JLP.final_df.UserName)) > 1:
raise ValueError(f"Multiple users found in the job logs: {set(JLP.final_df.UserName)}. Please ensure you are only processing logs for a single user.")

if not arguments.allow_multiple_users:
# Verify that the final df only contains logs from a single user
if len(set(JLP.final_df.UserName)) > 1:
raise ValueError(f"Multiple users found in the job logs: {set(JLP.final_df.UserName)}. Please ensure you are only processing logs for a single user or use the --allow-multiple-users flag.")

# Return the final processed/filtered dataframe
return JLP.final_df

Expand All @@ -251,7 +252,8 @@ def add_emissions_data (df_jobs, emissions_calculator, hpc_config):
df_jobs['failed_energy_kwh'] = np.where(df_jobs['StateCode'] == 0, df_jobs.energy_estimated_kwh, 0)

# Get the carbon intensity value at the submission time of each job from the web API - depends on the 'Region' argument given by the user
carbon_intensity_values = get_carbon_intensity(df_jobs['SubmissionTime'], emissions_calculator.arguments)
carbon_intensity_calculator = CarbonIntensityCalculator()
carbon_intensity_values = carbon_intensity_calculator.get_carbon_intensity(df_jobs['SubmissionTime'], emissions_calculator.arguments)

# Store carbon intensity values in the dataframe
df_jobs['CarbonIntensity_gCO2e_kwh'] = carbon_intensity_values
Expand Down
8 changes: 7 additions & 1 deletion gracehpc/core/job_log_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ def retrieve_job_logs(self):
This method retrieves the accounting logs based on the arguments (e.g. start and end dates).
The output includes raw job metadata which is parsed and processed later.
"""
# Construct the SLURM command with the user arguments and correct formatting
# If the user has provided a sacct file, read from that instead of running sacct command
if self.arguments.sacct_file:
with open(self.arguments.sacct_file, 'rb') as f:
self.sacct_data = f.read()
return

# Otherwise, construct the SLURM command with the user arguments and correct formatting
slurm_command = [
"sacct",
"--start", self.arguments.StartDate,
Expand Down
14 changes: 11 additions & 3 deletions gracehpc/interface/cli_script_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,17 @@ def results_terminal_display(full_df, daily_df, total_df, arguments, hpc_config)
return

# Extract variables from arguments and data
user_name = full_df.iloc[0].get("UserName", "N/A")
start_date = arguments.StartDate if hasattr(arguments, "StartDate") else "N/A"
end_date = arguments.EndDate if hasattr(arguments, "EndDate") else "N/A"
if arguments.allow_multiple_users:
user_name = "Multiple Users"
else:
user_name = full_df.iloc[0].get("UserName", "N/A")
if arguments.sacct_file is None:
start_date = arguments.StartDate if hasattr(arguments, "StartDate") else "N/A"
end_date = arguments.EndDate if hasattr(arguments, "EndDate") else "N/A"
else:
# See below
start_date = "See below"
end_date = "See below"
hpc_name = hpc_config.get('hpc_system', 'Unknown HPC System')

# Job ID to filter on
Expand Down