diff --git a/dags/map_reproducibility/benchmarkdb_utils.py b/dags/map_reproducibility/benchmarkdb_utils.py new file mode 100644 index 00000000..24f34767 --- /dev/null +++ b/dags/map_reproducibility/benchmarkdb_utils.py @@ -0,0 +1,263 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"Bash helper commands for AOTC artifacts" +import sys +import os +import getpass + + +def write_run( + model_id: str, + hardware_id: str, + software_id: str, + number_of_nodes: int, + number_of_chips: int, + container_image_name: str, + global_batch_size: int, + precision: str, + optimizer: str, + seq_length: int, + median_step_time: float, + e2e_time: float, + number_of_steps: int, + mfu: float, + tokens_per_second: float, + writer_path: str, + run_success: bool = True, # True because if mfu is none, writing to db will fail anyway. + run_type: str = "perf_regression", + run_release_status: str = "local", + other_metrics_in_json: str = "", + nccl_driver_nickname: str = None, + env_variables: str = "", + framework_config_in_json: str = "", + xla_flags: str = "", + topology: str = "", + dataset: str = "", + num_of_superblock: int = None, + update_person_ldap: str = getpass.getuser(), + comment: str = "", + is_test: bool = False, +): + """Writes a workload benchmark run manually to the database. + + This function validates the provided IDs and, if valid, constructs a + WorkloadBenchmarkV2Schema object with the given data and writes it to the + "run_summary" table in BigQuery. + + Args: + model_id: The ID of the model used in the run. + hardware_id: The ID of the hardware used in the run. + software_id: The ID of the software stack used in the run. + number_of_nodes: The number of nodes used in the run. + number_of_chips: The number of chips used in the run. + container_image_name: The name of the container image used in the run. + global_batch_size: The global batch size used in the run. + precision: The precision used in the run (e.g., fp32, bf16). + optimizer: The optimizer used in the run (e.g., adam, sgd). + seq_length: The sequence length used in the run. + median_step_time: The median step time of the run. + e2e_time: The end-to-end time of the run. + number_of_steps: The number of steps taken in the run. + mfu: The MFU (model flops utilization) achieved in the run. + tokens_per_second: The tokens per second achieved in the run. + run_type: The type of run (default: "perf_optimization"). + run_release_status: possible values "local" ( code changes are done locally), "prep_release" ( all code code changes are present in the image) + other_metrics_in_json: A JSON string containing other metrics. + nccl_driver_nickname: The nickname of the NCCL driver used. + env_variables: A string containing environment variables. + framework_config_in_json: A JSON string containing framework configurations. + xla_flags: A json string containing all the XLA flags. + topology: The topology of the hardware used in the run. ( valid for TPUs) + dataset: The dataset used in the run. + num_of_superblock: The number of superblocks in the hardware. ( valid for GPUs) + update_person_ldap: The LDAP ID of the person updating the record (default: current user). + comment: A comment about the run. + is_test: Whether to use the testing project or the production project. + + Raises: + ValueError: If any of the IDs are invalid. + """ + + sys.path.append(writer_path) + + # pylint: disable=import-outside-toplevel + import logging + import uuid + from typing import Type + + from aotc.benchmark_db_writer import bq_writer_utils + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import workload_benchmark_v2_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import model_info_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import software_info_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import hardware_info_schema + # pylint: enable=import-outside-toplevel + logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + ) + logger = logging.getLogger(__name__) + + def get_db_client( + table: str, dataclass_type: Type, is_test: bool = False + ) -> bq_writer_utils.create_bq_writer_object: + """Creates a BigQuery client object. + + Args: + table: The name of the BigQuery table. + dataclass_type: The dataclass type corresponding to the table schema. + is_test: Whether to use the testing project or the production project. + + Returns: + A BigQuery client object. + """ + + project = "supercomputer-testing" if is_test else "ml-workload-benchmarks" + dataset = "mantaray_v2" if is_test else "benchmark_dataset_v2" + return bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + def _validate_id( + id_value: str, + table_name: str, + id_field: str, + dataclass_type: Type, + is_test: bool = False, + ) -> bool: + """Generic function to validate an ID against a BigQuery table. + + Args: + id_value: The ID value to validate. + table_name: The name of the BigQuery table. + id_field: The name of the ID field in the table. + is_test: Whether to use the testing project or the production project. + + Returns: + True if the ID is valid, False otherwise. + """ + + client = get_db_client(table_name, dataclass_type, is_test) + result = client.query(where={id_field: id_value}) + + if not result: + logger.info( + "%s: %s is not present in the %s table ", + id_field.capitalize(), + id_value, + table_name, + ) + logger.info( + "Please add %s specific row in %s table before adding to run summary table", + id_value, + table_name, + ) + return False + return True + + def validate_model_id(model_id: str, is_test: bool = False) -> bool: + """Validates a model ID against the model_info table.""" + + print("model id: " + model_id) + id_val = _validate_id( + model_id, "model_info", "model_id", model_info_schema.ModelInfo, is_test + ) + if not id_val: + print("model id validation failed") + return False + return True + + def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: + """Validates a hardware ID against the hardware_info table.""" + id_val = _validate_id( + hardware_id, + "hardware_info", + "hardware_id", + hardware_info_schema.HardwareInfo, + is_test, + ) + if not id_val: + print("hardware id validation failed") + return False + return True + + def validate_software_id(software_id: str, is_test: bool = False) -> bool: + """Validates a software ID against the software_info table.""" + id_val = _validate_id( + software_id, + "software_info", + "software_id", + software_info_schema.SoftwareInfo, + is_test, + ) + + if not id_val: + print("software id validation failed") + return False + return True + + print(model_id) + + if ( + validate_model_id(model_id, is_test) + and validate_hardware_id(hardware_id, is_test) + and validate_software_id(software_id, is_test) + ): + summary = workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema( + run_id=f"run-{uuid.uuid4()}", + model_id=model_id, + software_id=software_id, + hardware_id=hardware_id, + hardware_num_chips=number_of_chips, + hardware_num_nodes=number_of_nodes, + result_success=run_success, + configs_framework=framework_config_in_json, + configs_env=env_variables, + configs_container_version=container_image_name, + configs_xla_flags=xla_flags, + configs_dataset=dataset, + logs_artifact_directory="", + update_person_ldap=update_person_ldap, + run_source="automation", + run_type=run_type, + run_release_status=run_release_status, + workload_precision=precision, + workload_gbs=global_batch_size, + workload_optimizer=optimizer, + workload_sequence_length=seq_length, + metrics_e2e_time=e2e_time, + metrics_mfu=mfu, + metrics_step_time=median_step_time, + metrics_tokens_per_second=tokens_per_second, + metrics_steps_for_convergence=number_of_steps, + metrics_other=other_metrics_in_json, + hardware_nccl_driver_nickname=nccl_driver_nickname, + hardware_topology=topology, + hardware_num_superblocks=num_of_superblock, + logs_comments=comment, + ) + + client = get_db_client( + "run_summary", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ) + client.write([summary]) + + else: + raise ValueError("Could not upload data in run summary table") diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 04dd8bdb..3fbde9e7 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -16,6 +16,7 @@ import datetime import sys +import os import tempfile from airflow import models @@ -32,16 +33,23 @@ from dags.map_reproducibility.utils import cleanup_cmds from dags.map_reproducibility.utils import git_cookie_authdaemon from dags.map_reproducibility.utils import clone_gob -from dags.map_reproducibility.utils import helm_install_cmds -from dags.map_reproducibility.utils import get_metrics_from_gcs +from dags.map_reproducibility.utils import helm_apply_cmds +from dags.map_reproducibility.utils import get_metrics from dags.map_reproducibility.utils import get_aotc_repo -from dags.map_reproducibility.utils import extract_bucket_file_name from dags.map_reproducibility.utils import extract_python_path +from dags.map_reproducibility.benchmarkdb_utils import write_run +from dags.map_reproducibility.utils import extract_run_details # Run once a day at 2 pm UTC (6 am PST) SCHEDULED_TIME = "0 14 * * *" if composer_env.is_prod_env() else None +MODEL_ID = "gpt3-175b" +BATCH_SIZE = 2048 +NUM_ACCELERATORS = 256 +PRECISION = "fp8" +ACCELERATOR_TYPE = "h100" + @task def run_aotc_workload(): @@ -60,6 +68,7 @@ def run_aotc_workload(): with tempfile.TemporaryDirectory() as tmpdir: hook = SubprocessHook() + # TODO(gunjanjalori): clone recipe first and extract params result = hook.run_command( [ "bash", @@ -73,10 +82,16 @@ def run_aotc_workload(): + install_helm_cmds() + namespace_cmds() + workload_cmds - + helm_install_cmds() + + helm_apply_cmds() + wait_for_jobs_cmds() + copy_bucket_cmds() - + get_metrics_cmds() + + get_metrics_cmds( + BATCH_SIZE, + NUM_ACCELERATORS, + PRECISION, + MODEL_ID, + ACCELERATOR_TYPE, + ) + cleanup_cmds() + get_aotc_repo() ), @@ -85,13 +100,54 @@ def run_aotc_workload(): ) assert result.exit_code == 0, f"Command failed with code {result.exit_code}" - # Extract COMPLETE_JOB_NAME from the output - bucket_name, file_name, python_path = extract_bucket_file_name( - result.output + python_base_path, python_path_to_bq_writer = extract_python_path( + result.output.splitlines()[-1] ) - get_metrics_from_gcs(bucket_name, file_name) + print(f"Base path in python: {python_base_path}") + print(f"python to bq: {python_path_to_bq_writer}") + + value_yaml_path = "reproducible-benchmark-recipes/projects/gpu-recipes/training/a3mega/gpt3-175b/nemo-pretraining-gke/values.yaml" + config_yaml_path = "reproducible-benchmark-recipes/projects/gpu-recipes/src/frameworks/a3mega/nemo-configs/gpt3-175b-256gpus-fp8.yaml" - sys.path.append(python_path) + ( + number_of_nodes, + global_batch_size, + optimizer, + precision, + seq_length, + max_steps, + ) = extract_run_details(tmpdir, value_yaml_path, config_yaml_path) + print( + f"batch size: {global_batch_size}, number of nodes: {number_of_nodes}" + ) + average_step_time, mfu = get_metrics(python_base_path) + model_id = "gpt3-175b" + hardware_id = "a3mega" + software_id = "pytorch_nemo" + image_version = "nemo_workload:24.07" + number_of_chips = number_of_nodes * 8 + + write_run( + model_id=model_id, + hardware_id=hardware_id, + software_id=software_id, + number_of_nodes=number_of_nodes, + number_of_chips=number_of_chips, + container_image_name=image_version, + global_batch_size=global_batch_size, + precision=precision, + optimizer=optimizer, + seq_length=seq_length, + median_step_time=average_step_time, + e2e_time=0, + number_of_steps=max_steps, + mfu=mfu, + tokens_per_second=1, + writer_path=python_path_to_bq_writer, + topology="2X2", + comment="Regression tests", + is_test=True, + ) with models.DAG( diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 3cc43b90..23026b12 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -15,7 +15,9 @@ "Bash helper commands for AOTC artifacts" import re +import os from google.cloud import storage +import yaml def set_variables_cmds(): @@ -89,7 +91,7 @@ def namespace_cmds(): return namespace -def helm_install_cmds(): +def helm_apply_cmds(): helm_cmds = ( " helm install -f values.yaml " "--namespace default " @@ -126,20 +128,22 @@ def copy_bucket_cmds(): return copy_bucket_contents -def get_metrics_cmds(): +def get_metrics_cmds( + batch_size, num_accelerators, precision, model_id, accelertator_type +): # TODO(gunjanj007): get these parameters from the recipe - get_metrics = ( - "METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt", + cmds = ( + "METRICS_FILE=metrics.txt", "python3 process_training_results.py --file" - " dllogger.json --batch_size 2048 " - "--num_accelerators 256 " - "--precision fp8 " - "--model_type gpt3-175b " - "--accelerator_type h100 | " + f" dllogger.json --batch_size {batch_size} " + f"--num_accelerators {num_accelerators} " + f"--precision {precision} " + f"--model_type {model_id} " + f"--accelerator_type {accelertator_type} | " "gsutil cp - $METRICS_FILE", 'echo "METRICS_FILE=${METRICS_FILE}"', ) - return get_metrics + return cmds def get_aotc_repo(): @@ -147,9 +151,9 @@ def get_aotc_repo(): "echo 'trying to clone GoB aotc repo'", "git clone https://cmcs-perf-tooling-internal.googlesource.com/" "benchmark-automation", - "cd benchmark-automation/aotc/src", + "ls", "export PYTHONPATH=$PWD", - 'echo "PYTHONPATH=$PYTHONPATH and METRICS_FILE=$METRICS_FILE"', + 'echo "PYTHONPATH=$PYTHONPATH"', ) return gob_clone_cmds @@ -160,25 +164,19 @@ def cleanup_cmds(): "cd ../../..", "kubectl get pods " "--no-headers=true | awk '{print $1}' " - "| grep $JOB_NAME | xargs kubectl delete pods", + "| grep $JOB_NAME | xargs kubectl delete pods", "helm uninstall $JOB_NAME", ) return cleanup -def get_metrics_from_gcs(bucket_name, file_name): - # Initialize GCS and BigQuery clients - storage_client = storage.Client() - - # Get the bucket and file - bucket = storage_client.bucket(bucket_name) - blob = bucket.blob(file_name) - - # Download the file content - metrics_output = blob.download_as_string().decode("utf-8") +def get_metrics(metrics_path): + file_content = "" + with open(metrics_path + "/metrics.txt", "r", encoding="utf-8") as file: + file_content = file.read() # Parse the metrics (adjust based on your file format) - lines = metrics_output.splitlines() + lines = file_content.splitlines() average_step_time = float(lines[0].split(": ")[1]) tflops_per_accelerator = float(lines[1].split(": ")[1]) mfu = float(lines[2].split(": ")[1]) @@ -187,40 +185,40 @@ def get_metrics_from_gcs(bucket_name, file_name): print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") print(f"MFU: {mfu}") - -def extract_bucket_file_name(last_line): - metrics_file = None - - # We match here because subprocesshook only outputs the last line. - match = re.search(r"PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)", last_line) - if match: - python_path = match.group(1) - metrics_file = match.group(2) - print(f"PYTHONPATH in python: {python_path}") - print(f"METRICS_FILE: {metrics_file}") - else: - print("Error: Could not extract PYTHONPATH and METRICS_FILE") - print(f"Metrics file name: {metrics_file}") - if metrics_file: - # Extract bucket_name and file_name - bucket_name = re.search(r"gs://([^/]+)/", metrics_file).group(1) - file_name = re.search(r"gs://[^/]+/(.+)", metrics_file).group(1) - - print(f"Bucket name: {bucket_name}") - print(f"File name: {file_name}") - else: - print("Metrics file not found in the output.") - - return bucket_name, file_name, python_path - - -def extract_python_path(bash_result_output): - python_path = None - for line in bash_result_output.splitlines(): - if line.startswith("PYTHONPATH"): - python_path = line.split("=", 1)[1] - break - - print(f"Pyhon path name: {python_path}") - - return python_path + return average_step_time, mfu + + +def extract_python_path(last_line): + python_path = last_line.split("=")[1] + python_path_to_bq_writer = python_path + "/benchmark-automation/aotc/src" + return python_path, python_path_to_bq_writer + + +def extract_run_details(tmpdir, yaml_file, config_path): + gpus = None + batch_size = None + optimizer = None + + try: + yaml_file_path = os.path.join(tmpdir, yaml_file) + with open(yaml_file_path, "r", encoding="utf-8") as file: + config = yaml.safe_load(file) + gpus = config.get("workload", {}).get("gpus") + except (FileNotFoundError, yaml.YAMLError) as e: + print(f"Error: {e}") + return None + + try: + config_path = os.path.join(tmpdir, config_path) + with open(config_path, "r", encoding="utf-8") as file: + config = yaml.safe_load(file) + batch_size = config.get("model", {}).get("global_batch_size") + precision = config.get("trainer", {}).get("precision") + optimizer = config.get("model", {}).get("optim", {}).get("name") + seq_length = config.get("model", {}).get("data", {}).get("seq_length") + max_steps = config.get("trainer", {}).get("max_steps") + except (FileNotFoundError, yaml.YAMLError) as e: + print(f"Error: {e}") + return None + + return gpus, batch_size, optimizer, precision, seq_length, max_steps