From 9a21b7da91de2e4e1cfb8745bbcc7780817ff478 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 16:36:25 -0800 Subject: [PATCH 01/32] clone aotc and get metrics from gcs --- .../aotc_reproducibility.py | 107 +++++++++++++++--- dags/map_reproducibility/nemo_gpt3.py | 17 +++ 2 files changed, 111 insertions(+), 13 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 64478503..d73c6f1a 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -15,10 +15,17 @@ "Bash helper commands for AOTC artifacts" import os - +import re +import sys +from google.cloud import storage +import logging +import uuid +from typing import Type +from airflow.hooks.subprocess import SubprocessHook def set_variables_cmds(): set_variables = ( + # "set -e", "export PROJECT=supercomputer-testing", "export CLUSTER=a3plus-benchmark", "export CLUSTER_REGION=australia-southeast1", @@ -37,14 +44,16 @@ def configure_project_and_cluster(): ) return set_project_command - -# This is required to get auth to access -# internal GoB repo def git_cookie_authdaemon(): auth_cmds = ( "git clone https://gerrit.googlesource.com/gcompute-tools", "echo 'trying to run git-cookie-authdaemon'", - "./gcompute-tools/git-cookie-authdaemon", + # Check if the daemon is already running + "if pgrep -f git-cookie-authdaemon; then " + " echo 'git-cookie-authdaemon is already running'; " + "else " + " ./gcompute-tools/git-cookie-authdaemon || echo 'Error running git-cookie-authdaemon'; " # Run if not running + "fi" ) return auth_cmds @@ -56,9 +65,17 @@ def clone_gob(): "reproducible-benchmark-recipes", "cd reproducible-benchmark-recipes/projects", "cd gpu-recipes", + "pwd", ) return gob_clone_cmds +def stop_git_daemon(): + cmd = ( + "git config --global --unset credential.helper", + "rm ~/.git-credentials", + ) + return cmd + def install_helm_cmds(): install_helm_cmd = ( @@ -101,9 +118,6 @@ def helm_install_cmds(): def wait_for_jobs_cmds(): wait_for_job = ( - "echo 'will wait for job to start running'", - "kubectl wait --for=condition=running job/$JOB_NAME" - " --namespace=default --timeout=10m", "echo 'will wait for jobs to finish'", "kubectl wait --for=condition=complete " "job/$JOB_NAME --namespace=default --timeout=100m", @@ -113,17 +127,15 @@ def wait_for_jobs_cmds(): def copy_bucket_cmds(): copy_bucket_contents = ( - "COMPLETE_JOB_NAME=$(gcloud storage ls " + "export COMPLETE_JOB_NAME=$(gcloud storage ls " "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", - "echo 'copying from' ", - "echo $COMPLETE_JOB_NAME", + "echo 'COMPLETE_JOB_NAME=$COMPLETE_JOB_NAME'", "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" "dllogger/rank-0/dllogger.json .", ) return copy_bucket_contents - def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe get_metrics = ( @@ -132,16 +144,85 @@ def get_metrics_cmds(): "--num_accelerators 256 " "--precision fp8 " "--model_type gpt3-175b " - "--accelerator_type h100 ", + "--accelerator_type h100 | " + "gsutil cp - ${COMPLETE_JOB_NAME}" + "/metrics.txt", ) return get_metrics +def get_aotc_repo(): + gob_clone_cmds = ( + "echo 'trying to clone GoB aotc repo'", + "git clone https://cmcs-perf-tooling-internal.googlesource.com/" + "benchmark-automation", + "cd benchmark-automation/aotc/src", + "export PYTHONPATH=$PWD", + "echo 'PYTHONPATH=$PYTHONPATH'", + ) + return gob_clone_cmds def cleanup_cmds(): cleanup = ( + "cd $REPO_ROOT", + "cd ../../..", "kubectl get pods " "--no-headers=true | awk '{print $1}' " "| grep $JOB_NAME | xargs kubectl delete pods", "helm uninstall $JOB_NAME", ) return cleanup + +def get_metrics_from_gcs(bucket_name, file_name): + # bucket_name = 'gunjanjalori-testing-xlml' + # file_name = 'nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/metrics.txt' + + # Initialize GCS and BigQuery clients + storage_client = storage.Client() + + # Get the bucket and file + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(file_name) + + # Download the file content + metrics_output = blob.download_as_string().decode('utf-8') + + # Parse the metrics (adjust based on your file format) + lines = metrics_output.splitlines() + average_step_time = float(lines[0].split(': ')[1]) + tflops_per_accelerator = float(lines[1].split(': ')[1]) + mfu = float(lines[2].split(': ')[1]) + + + print(f"Average Step Time: {average_step_time}") + print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") + print(f"MFU: {mfu}") + + +def extract_bucket_file_name(bash_result_output): + complete_job_name = None + for line in bash_result_output.splitlines(): + if line.startswith("COMPLETE_JOB_NAME="): + complete_job_name = line.split("=", 1)[1] + break + if complete_job_name: + # Extract bucket_name and file_name + bucket_name = re.search(r'gs://([^/]+)/', complete_job_name).group(1) + file_name = re.search(r'gs://[^/]+/(.+)', complete_job_name).group(1) + + print(f"Bucket name: {bucket_name}") + print(f"File name: {file_name}") + + return bucket_name, file_name + +def extract_python_path(bash_result_output): + python_path = None + for line in bash_result_output.splitlines(): + if line.startswith("PYTHONPATH="): + python_path = line.split("=", 1)[1] + break + + print(f"Pyhon path name: {python_path}") + + return python_path + + diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 74d3cc0f..12c76ad3 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -15,6 +15,9 @@ """DAGs to run Aotc reproducibility benchmarks.""" import datetime +import re +import sys + from airflow import models from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook @@ -30,6 +33,12 @@ from dags.map_reproducibility.aotc_reproducibility import git_cookie_authdaemon from dags.map_reproducibility.aotc_reproducibility import clone_gob from dags.map_reproducibility.aotc_reproducibility import helm_install_cmds +from dags.map_reproducibility.aotc_reproducibility import get_metrics_from_gcs +from dags.map_reproducibility.aotc_reproducibility import stop_git_daemon +from dags.map_reproducibility.aotc_reproducibility import get_aotc_repo +from dags.map_reproducibility.aotc_reproducibility import extract_bucket_file_name +from dags.map_reproducibility.aotc_reproducibility import extract_python_path + # Run once a day at 2 pm UTC (6 am PST) SCHEDULED_TIME = "0 14 * * *" if composer_env.is_prod_env() else None @@ -69,11 +78,19 @@ def run_aotc_workload(): + copy_bucket_cmds() + get_metrics_cmds() + cleanup_cmds() + + get_aotc_repo() + + stop_git_daemon() ), ], ) assert result.exit_code == 0, f"Command failed with code {result.exit_code}" + # Extract COMPLETE_JOB_NAME from the output + bucket_name, file_name = extract_bucket_file_name(result.output) + python_path = extract_python_path(result.output) + sys.path.append(python_path) + + get_metrics_from_gcs(bucket_name, file_name) with models.DAG( dag_id="reproducibility_nemo_gpt3_nighly_dag", From 970b9caaa1d8e789dfb75713e08b4096db9d5d9e Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 16:43:32 -0800 Subject: [PATCH 02/32] reformat --- .../aotc_reproducibility.py | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index d73c6f1a..75291952 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -14,14 +14,8 @@ "Bash helper commands for AOTC artifacts" -import os import re -import sys from google.cloud import storage -import logging -import uuid -from typing import Type -from airflow.hooks.subprocess import SubprocessHook def set_variables_cmds(): set_variables = ( @@ -53,7 +47,7 @@ def git_cookie_authdaemon(): " echo 'git-cookie-authdaemon is already running'; " "else " " ./gcompute-tools/git-cookie-authdaemon || echo 'Error running git-cookie-authdaemon'; " # Run if not running - "fi" + "fi", ) return auth_cmds @@ -184,14 +178,13 @@ def get_metrics_from_gcs(bucket_name, file_name): blob = bucket.blob(file_name) # Download the file content - metrics_output = blob.download_as_string().decode('utf-8') + metrics_output = blob.download_as_string().decode("utf-8") # Parse the metrics (adjust based on your file format) lines = metrics_output.splitlines() - average_step_time = float(lines[0].split(': ')[1]) - tflops_per_accelerator = float(lines[1].split(': ')[1]) - mfu = float(lines[2].split(': ')[1]) - + average_step_time = float(lines[0].split(": ")[1]) + tflops_per_accelerator = float(lines[1].split(": ")[1]) + mfu = float(lines[2].split(": ")[1]) print(f"Average Step Time: {average_step_time}") print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") @@ -206,8 +199,8 @@ def extract_bucket_file_name(bash_result_output): break if complete_job_name: # Extract bucket_name and file_name - bucket_name = re.search(r'gs://([^/]+)/', complete_job_name).group(1) - file_name = re.search(r'gs://[^/]+/(.+)', complete_job_name).group(1) + bucket_name = re.search(r"gs://([^/]+)/", complete_job_name).group(1) + file_name = re.search(r"gs://[^/]+/(.+)", complete_job_name).group(1) print(f"Bucket name: {bucket_name}") print(f"File name: {file_name}") From 5bdcef5034afbdd12d4252875d479ac8cdfb34e4 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 16:57:40 -0800 Subject: [PATCH 03/32] reformat --- dags/map_reproducibility/nemo_gpt3.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 12c76ad3..9586b809 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -87,10 +87,12 @@ def run_aotc_workload(): # Extract COMPLETE_JOB_NAME from the output bucket_name, file_name = extract_bucket_file_name(result.output) + get_metrics_from_gcs(bucket_name, file_name) + + # Extract PYTHONPATH from the output python_path = extract_python_path(result.output) sys.path.append(python_path) - get_metrics_from_gcs(bucket_name, file_name) with models.DAG( dag_id="reproducibility_nemo_gpt3_nighly_dag", From 257c148f8002031fae3699f452113c2b1cdd662a Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 16:59:20 -0800 Subject: [PATCH 04/32] reformat --- dags/map_reproducibility/aotc_reproducibility.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 75291952..2eb1ef56 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -207,6 +207,7 @@ def extract_bucket_file_name(bash_result_output): return bucket_name, file_name + def extract_python_path(bash_result_output): python_path = None for line in bash_result_output.splitlines(): From fa6219a426a78681cdf2c938b77b8c2fc79d3980 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 17:18:17 -0800 Subject: [PATCH 05/32] reformat --- dags/map_reproducibility/aotc_reproducibility.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 2eb1ef56..6e58f81a 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -133,14 +133,15 @@ def copy_bucket_cmds(): def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe get_metrics = ( + "METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt", "python3 process_training_results.py --file" " dllogger.json --batch_size 2048 " "--num_accelerators 256 " "--precision fp8 " "--model_type gpt3-175b " "--accelerator_type h100 | " - "gsutil cp - ${COMPLETE_JOB_NAME}" - "/metrics.txt", + "gsutil cp - $METRICS_FILE", + "echo 'METRICS_FILE=$METRICS_FILE'", ) return get_metrics From 443d36977536ddf7040006c0a292abcff28c7502 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 17:21:45 -0800 Subject: [PATCH 06/32] reformat --- .../aotc_reproducibility.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 6e58f81a..87b184d7 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -17,6 +17,7 @@ import re from google.cloud import storage + def set_variables_cmds(): set_variables = ( # "set -e", @@ -38,6 +39,7 @@ def configure_project_and_cluster(): ) return set_project_command + def git_cookie_authdaemon(): auth_cmds = ( "git clone https://gerrit.googlesource.com/gcompute-tools", @@ -63,6 +65,7 @@ def clone_gob(): ) return gob_clone_cmds + def stop_git_daemon(): cmd = ( "git config --global --unset credential.helper", @@ -130,6 +133,7 @@ def copy_bucket_cmds(): ) return copy_bucket_contents + def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe get_metrics = ( @@ -145,6 +149,7 @@ def get_metrics_cmds(): ) return get_metrics + def get_aotc_repo(): gob_clone_cmds = ( "echo 'trying to clone GoB aotc repo'", @@ -156,6 +161,7 @@ def get_aotc_repo(): ) return gob_clone_cmds + def cleanup_cmds(): cleanup = ( "cd $REPO_ROOT", @@ -167,6 +173,7 @@ def cleanup_cmds(): ) return cleanup + def get_metrics_from_gcs(bucket_name, file_name): # bucket_name = 'gunjanjalori-testing-xlml' # file_name = 'nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/metrics.txt' @@ -193,15 +200,15 @@ def get_metrics_from_gcs(bucket_name, file_name): def extract_bucket_file_name(bash_result_output): - complete_job_name = None + metrics_file = None for line in bash_result_output.splitlines(): - if line.startswith("COMPLETE_JOB_NAME="): - complete_job_name = line.split("=", 1)[1] + if line.startswith("METRICS_FILE="): + metrics_file = line.split("=", 1)[1] break - if complete_job_name: + if metrics_file: # Extract bucket_name and file_name - bucket_name = re.search(r"gs://([^/]+)/", complete_job_name).group(1) - file_name = re.search(r"gs://[^/]+/(.+)", complete_job_name).group(1) + bucket_name = re.search(r"gs://([^/]+)/", metrics_file).group(1) + file_name = re.search(r"gs://[^/]+/(.+)", metrics_file).group(1) print(f"Bucket name: {bucket_name}") print(f"File name: {file_name}") @@ -219,5 +226,3 @@ def extract_python_path(bash_result_output): print(f"Pyhon path name: {python_path}") return python_path - - From b10cee05f0f0472a944928fe838ee54f5379f3f6 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 21:40:42 -0800 Subject: [PATCH 07/32] minor fix --- dags/map_reproducibility/aotc_reproducibility.py | 1 - dags/map_reproducibility/nemo_gpt3.py | 1 - 2 files changed, 2 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 87b184d7..5cdf9f76 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -69,7 +69,6 @@ def clone_gob(): def stop_git_daemon(): cmd = ( "git config --global --unset credential.helper", - "rm ~/.git-credentials", ) return cmd diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 9586b809..99554f9d 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -15,7 +15,6 @@ """DAGs to run Aotc reproducibility benchmarks.""" import datetime -import re import sys from airflow import models From e1d5573a626bad02f418923bdd759b852f5a799e Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Thu, 5 Dec 2024 21:51:20 -0800 Subject: [PATCH 08/32] reformat --- dags/map_reproducibility/aotc_reproducibility.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/aotc_reproducibility.py index 5cdf9f76..eb69e198 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/aotc_reproducibility.py @@ -67,9 +67,7 @@ def clone_gob(): def stop_git_daemon(): - cmd = ( - "git config --global --unset credential.helper", - ) + cmd = ("git config --global --unset credential.helper",) return cmd From fae639b968f61ea51cc44f0897e6e2ac40b0f17b Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 15:19:03 -0800 Subject: [PATCH 09/32] merge the directory of hook and python task --- dags/map_reproducibility/nemo_gpt3.py | 96 ++++++++++--------- .../{aotc_reproducibility.py => utils.py} | 49 ++++++---- 2 files changed, 78 insertions(+), 67 deletions(-) rename dags/map_reproducibility/{aotc_reproducibility.py => utils.py} (82%) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 99554f9d..f09ed07f 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -16,27 +16,27 @@ import datetime import sys +import tempfile from airflow import models from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook from dags import composer_env -from dags.map_reproducibility.aotc_reproducibility import get_metrics_cmds -from dags.map_reproducibility.aotc_reproducibility import set_variables_cmds -from dags.map_reproducibility.aotc_reproducibility import configure_project_and_cluster -from dags.map_reproducibility.aotc_reproducibility import install_helm_cmds -from dags.map_reproducibility.aotc_reproducibility import namespace_cmds -from dags.map_reproducibility.aotc_reproducibility import wait_for_jobs_cmds -from dags.map_reproducibility.aotc_reproducibility import copy_bucket_cmds -from dags.map_reproducibility.aotc_reproducibility import cleanup_cmds -from dags.map_reproducibility.aotc_reproducibility import git_cookie_authdaemon -from dags.map_reproducibility.aotc_reproducibility import clone_gob -from dags.map_reproducibility.aotc_reproducibility import helm_install_cmds -from dags.map_reproducibility.aotc_reproducibility import get_metrics_from_gcs -from dags.map_reproducibility.aotc_reproducibility import stop_git_daemon -from dags.map_reproducibility.aotc_reproducibility import get_aotc_repo -from dags.map_reproducibility.aotc_reproducibility import extract_bucket_file_name -from dags.map_reproducibility.aotc_reproducibility import extract_python_path +from dags.map_reproducibility.utils import get_metrics_cmds +from dags.map_reproducibility.utils import set_variables_cmds +from dags.map_reproducibility.utils import configure_project_and_cluster +from dags.map_reproducibility.utils import install_helm_cmds +from dags.map_reproducibility.utils import namespace_cmds +from dags.map_reproducibility.utils import wait_for_jobs_cmds +from dags.map_reproducibility.utils import copy_bucket_cmds +from dags.map_reproducibility.utils import cleanup_cmds +from dags.map_reproducibility.utils import git_cookie_authdaemon +from dags.map_reproducibility.utils import clone_gob +from dags.map_reproducibility.utils import helm_install_cmds +from dags.map_reproducibility.utils import get_metrics_from_gcs +from dags.map_reproducibility.utils import get_aotc_repo +from dags.map_reproducibility.utils import extract_bucket_file_name +from dags.map_reproducibility.utils import extract_python_path # Run once a day at 2 pm UTC (6 am PST) @@ -58,39 +58,41 @@ def run_aotc_workload(): "export JOB_NAME=gpt3-xlml-$NOW-175b-nemo", ) - hook = SubprocessHook() - result = hook.run_command( - [ - "bash", - "-c", - ";".join( - set_variables_cmds() - + configure_project_and_cluster() - + git_cookie_authdaemon() - + clone_gob() - + gpu_recipe_cmd - + install_helm_cmds() - + namespace_cmds() - + workload_cmds - + helm_install_cmds() - + wait_for_jobs_cmds() - + copy_bucket_cmds() - + get_metrics_cmds() - + cleanup_cmds() - + get_aotc_repo() - + stop_git_daemon() - ), - ], - ) - assert result.exit_code == 0, f"Command failed with code {result.exit_code}" + with tempfile.TemporaryDirectory() as tmpdir: + + hook = SubprocessHook() + result = hook.run_command( + [ + "bash", + "-c", + ";".join( + set_variables_cmds() + + configure_project_and_cluster() + + git_cookie_authdaemon() + + clone_gob() + + gpu_recipe_cmd + + install_helm_cmds() + + namespace_cmds() + + workload_cmds + + helm_install_cmds() + + wait_for_jobs_cmds() + + copy_bucket_cmds() + + get_metrics_cmds() + + cleanup_cmds() + + get_aotc_repo() + ), + ], + cwd=tmpdir, + ) + assert result.exit_code == 0, f"Command failed with code {result.exit_code}" - # Extract COMPLETE_JOB_NAME from the output - bucket_name, file_name = extract_bucket_file_name(result.output) - get_metrics_from_gcs(bucket_name, file_name) + # Extract COMPLETE_JOB_NAME from the output + bucket_name, file_name, python_path = extract_bucket_file_name(result.output) + get_metrics_from_gcs(bucket_name, file_name) - # Extract PYTHONPATH from the output - python_path = extract_python_path(result.output) - sys.path.append(python_path) + # # Extract PYTHONPATH from the output + # python_path = extract_python_path(result.output) + sys.path.append(python_path) with models.DAG( diff --git a/dags/map_reproducibility/aotc_reproducibility.py b/dags/map_reproducibility/utils.py similarity index 82% rename from dags/map_reproducibility/aotc_reproducibility.py rename to dags/map_reproducibility/utils.py index eb69e198..b5cc32d8 100644 --- a/dags/map_reproducibility/aotc_reproducibility.py +++ b/dags/map_reproducibility/utils.py @@ -65,12 +65,6 @@ def clone_gob(): ) return gob_clone_cmds - -def stop_git_daemon(): - cmd = ("git config --global --unset credential.helper",) - return cmd - - def install_helm_cmds(): install_helm_cmd = ( "curl -fsSL -o get_helm.sh " @@ -123,7 +117,8 @@ def copy_bucket_cmds(): copy_bucket_contents = ( "export COMPLETE_JOB_NAME=$(gcloud storage ls " "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", - "echo 'COMPLETE_JOB_NAME=$COMPLETE_JOB_NAME'", + # "COMPLETE_JOB_NAME=gs://gunjanjalori-testing-xlml/nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/", + 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" "dllogger/rank-0/dllogger.json .", @@ -142,7 +137,7 @@ def get_metrics_cmds(): "--model_type gpt3-175b " "--accelerator_type h100 | " "gsutil cp - $METRICS_FILE", - "echo 'METRICS_FILE=$METRICS_FILE'", + 'echo "METRICS_FILE=${METRICS_FILE}"', ) return get_metrics @@ -154,7 +149,7 @@ def get_aotc_repo(): "benchmark-automation", "cd benchmark-automation/aotc/src", "export PYTHONPATH=$PWD", - "echo 'PYTHONPATH=$PYTHONPATH'", + 'echo "PYTHONPATH=$PYTHONPATH and METRICS_FILE=$METRICS_FILE"', ) return gob_clone_cmds @@ -163,10 +158,10 @@ def cleanup_cmds(): cleanup = ( "cd $REPO_ROOT", "cd ../../..", - "kubectl get pods " - "--no-headers=true | awk '{print $1}' " - "| grep $JOB_NAME | xargs kubectl delete pods", - "helm uninstall $JOB_NAME", + # "kubectl get pods " + # "--no-headers=true | awk '{print $1}' " + # "| grep $JOB_NAME | xargs kubectl delete pods", + # "helm uninstall $JOB_NAME", ) return cleanup @@ -196,12 +191,24 @@ def get_metrics_from_gcs(bucket_name, file_name): print(f"MFU: {mfu}") -def extract_bucket_file_name(bash_result_output): +def extract_bucket_file_name(last_line): metrics_file = None - for line in bash_result_output.splitlines(): - if line.startswith("METRICS_FILE="): - metrics_file = line.split("=", 1)[1] - break + # for line in bash_result_output.splitlines(): + # print(f"Line: {line}") + # if line.startswith("METRICS_FILE"): + # print(f"Line: {line} with metrics file") + # metrics_file = line.split("=", 1)[1] + # break + + match = re.search(r'PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)', last_line) + if match: + python_path = match.group(1) + metrics_file = match.group(2) + print(f"PYTHONPATH in python: {python_path}") + print(f"METRICS_FILE: {metrics_file}") + else: + print("Error: Could not extract PYTHONPATH and METRICS_FILE") + print(f"Metrics file name: {metrics_file}") if metrics_file: # Extract bucket_name and file_name bucket_name = re.search(r"gs://([^/]+)/", metrics_file).group(1) @@ -209,14 +216,16 @@ def extract_bucket_file_name(bash_result_output): print(f"Bucket name: {bucket_name}") print(f"File name: {file_name}") + else: + print("Metrics file not found in the output.") - return bucket_name, file_name + return bucket_name, file_name, python_path def extract_python_path(bash_result_output): python_path = None for line in bash_result_output.splitlines(): - if line.startswith("PYTHONPATH="): + if line.startswith("PYTHONPATH"): python_path = line.split("=", 1)[1] break From b305ecc11f1149191d0a7c33d3343d33e8f8d42b Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 15:35:17 -0800 Subject: [PATCH 10/32] reformat --- dags/map_reproducibility/utils.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index b5cc32d8..fa6586ba 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -193,21 +193,15 @@ def get_metrics_from_gcs(bucket_name, file_name): def extract_bucket_file_name(last_line): metrics_file = None - # for line in bash_result_output.splitlines(): - # print(f"Line: {line}") - # if line.startswith("METRICS_FILE"): - # print(f"Line: {line} with metrics file") - # metrics_file = line.split("=", 1)[1] - # break - - match = re.search(r'PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)', last_line) + + match = re.search(r"PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)", last_line) if match: - python_path = match.group(1) - metrics_file = match.group(2) - print(f"PYTHONPATH in python: {python_path}") - print(f"METRICS_FILE: {metrics_file}") + python_path = match.group(1) + metrics_file = match.group(2) + print(f"PYTHONPATH in python: {python_path}") + print(f"METRICS_FILE: {metrics_file}") else: - print("Error: Could not extract PYTHONPATH and METRICS_FILE") + print("Error: Could not extract PYTHONPATH and METRICS_FILE") print(f"Metrics file name: {metrics_file}") if metrics_file: # Extract bucket_name and file_name From 697e7598f90a048f37920759b8f9515f91b7c40a Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 15:38:46 -0800 Subject: [PATCH 11/32] reformat --- dags/map_reproducibility/nemo_gpt3.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index f09ed07f..fa9cf82c 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -87,7 +87,9 @@ def run_aotc_workload(): assert result.exit_code == 0, f"Command failed with code {result.exit_code}" # Extract COMPLETE_JOB_NAME from the output - bucket_name, file_name, python_path = extract_bucket_file_name(result.output) + bucket_name, file_name, python_path = extract_bucket_file_name( + result.output + ) get_metrics_from_gcs(bucket_name, file_name) # # Extract PYTHONPATH from the output From 1d811aeea138171b21dafdfc4962ee26a7cbd065 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 15:39:31 -0800 Subject: [PATCH 12/32] reformat --- dags/map_reproducibility/nemo_gpt3.py | 1 - dags/map_reproducibility/utils.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index fa9cf82c..4535aa48 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -59,7 +59,6 @@ def run_aotc_workload(): ) with tempfile.TemporaryDirectory() as tmpdir: - hook = SubprocessHook() result = hook.run_command( [ diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index fa6586ba..56a0f9ba 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -65,6 +65,7 @@ def clone_gob(): ) return gob_clone_cmds + def install_helm_cmds(): install_helm_cmd = ( "curl -fsSL -o get_helm.sh " From 0a3c5f7ab1d9a3350579da6489cee154b04f04c4 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Fri, 6 Dec 2024 16:58:48 -0800 Subject: [PATCH 13/32] reformat --- dags/map_reproducibility/utils.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 56a0f9ba..977888e3 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -159,17 +159,15 @@ def cleanup_cmds(): cleanup = ( "cd $REPO_ROOT", "cd ../../..", - # "kubectl get pods " - # "--no-headers=true | awk '{print $1}' " - # "| grep $JOB_NAME | xargs kubectl delete pods", - # "helm uninstall $JOB_NAME", + "kubectl get pods " + "--no-headers=true | awk '{print $1}' " + "| grep $JOB_NAME | xargs kubectl delete pods", + "helm uninstall $JOB_NAME", ) return cleanup def get_metrics_from_gcs(bucket_name, file_name): - # bucket_name = 'gunjanjalori-testing-xlml' - # file_name = 'nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/metrics.txt' # Initialize GCS and BigQuery clients storage_client = storage.Client() From 4ef3684e3f107734e5e46b33ebf6be893216f8d0 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Sun, 8 Dec 2024 10:00:09 -0800 Subject: [PATCH 14/32] reformat --- dags/map_reproducibility/nemo_gpt3.py | 3 ++- dags/map_reproducibility/utils.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 4535aa48..9ba82db6 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -87,7 +87,7 @@ def run_aotc_workload(): # Extract COMPLETE_JOB_NAME from the output bucket_name, file_name, python_path = extract_bucket_file_name( - result.output + result.output ) get_metrics_from_gcs(bucket_name, file_name) @@ -96,6 +96,7 @@ def run_aotc_workload(): sys.path.append(python_path) + with models.DAG( dag_id="reproducibility_nemo_gpt3_nighly_dag", schedule=SCHEDULED_TIME, diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 977888e3..3bc7ca08 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -168,7 +168,6 @@ def cleanup_cmds(): def get_metrics_from_gcs(bucket_name, file_name): - # Initialize GCS and BigQuery clients storage_client = storage.Client() From bfdb497dd16bf260b94c9ca34982b933ca8bf3ea Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 07:24:22 -0800 Subject: [PATCH 15/32] reformat --- dags/map_reproducibility/nemo_gpt3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 9ba82db6..116e8a7a 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -96,7 +96,6 @@ def run_aotc_workload(): sys.path.append(python_path) - with models.DAG( dag_id="reproducibility_nemo_gpt3_nighly_dag", schedule=SCHEDULED_TIME, From 19c144fbc2fdcddc5f5f43b7f7810586690c81ea Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 07:27:54 -0800 Subject: [PATCH 16/32] resolve comments --- dags/map_reproducibility/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 3bc7ca08..beda3029 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -20,7 +20,6 @@ def set_variables_cmds(): set_variables = ( - # "set -e", "export PROJECT=supercomputer-testing", "export CLUSTER=a3plus-benchmark", "export CLUSTER_REGION=australia-southeast1", @@ -118,7 +117,6 @@ def copy_bucket_cmds(): copy_bucket_contents = ( "export COMPLETE_JOB_NAME=$(gcloud storage ls " "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", - # "COMPLETE_JOB_NAME=gs://gunjanjalori-testing-xlml/nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/", 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" From db99876bbd96c33d8e6d0115ad55b58d4e24ea35 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 21:34:44 -0800 Subject: [PATCH 17/32] resolve comments --- dags/map_reproducibility/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index beda3029..e9957282 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -38,7 +38,7 @@ def configure_project_and_cluster(): ) return set_project_command - +# This is required to get auth to access def git_cookie_authdaemon(): auth_cmds = ( "git clone https://gerrit.googlesource.com/gcompute-tools", @@ -190,6 +190,7 @@ def get_metrics_from_gcs(bucket_name, file_name): def extract_bucket_file_name(last_line): metrics_file = None + # We match here because subprocesshook only outputs the last line. match = re.search(r"PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)", last_line) if match: python_path = match.group(1) From d1dbb73904fd2fb8f1ffb78a6718e37d18529307 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 21:37:19 -0800 Subject: [PATCH 18/32] resolve comments --- dags/map_reproducibility/nemo_gpt3.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 116e8a7a..cf20b601 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -91,8 +91,6 @@ def run_aotc_workload(): ) get_metrics_from_gcs(bucket_name, file_name) - # # Extract PYTHONPATH from the output - # python_path = extract_python_path(result.output) sys.path.append(python_path) From 24b0cc28bdde6ba32d6236f043fc508b499ed1f9 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 9 Dec 2024 21:40:54 -0800 Subject: [PATCH 19/32] reformat --- dags/map_reproducibility/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index e9957282..89fcb8ab 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -38,6 +38,7 @@ def configure_project_and_cluster(): ) return set_project_command + # This is required to get auth to access def git_cookie_authdaemon(): auth_cmds = ( From 757e6ee319c9e448c5bf41b1b6b1e4b7a3cd76d0 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Wed, 11 Dec 2024 14:34:55 -0800 Subject: [PATCH 20/32] Add Dan and Di as owners for aotc --- .github/CODEOWNERS | 2 ++ .github/requirements.txt | 1 + 2 files changed, 3 insertions(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e68ed421..07ad9a96 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -16,3 +16,5 @@ dags/sparsity_diffusion_devx/project_bite* @RissyRan @parambole @jiangjy1982 @ai dags/sparsity_diffusion_devx/configs/project_bite* @RissyRan @parambole @jiangjy1982 @aireenmei @michelle-yooh @jiya-zhang dags/inference @yeandy @vipannalla @morgandu @mailvijayasingh @sixiang-google @joezijunzhou @singh-mitali + +dags/map_reproducibility @crankshaw-google @polydier1 diff --git a/.github/requirements.txt b/.github/requirements.txt index ebf875bf..1c283211 100644 --- a/.github/requirements.txt +++ b/.github/requirements.txt @@ -10,3 +10,4 @@ tensorflow-cpu kubernetes pyarrow apache-airflow-providers-google +dacite From 9402023b41cfa3d79d554b9d2367a7d989d561fb Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Wed, 11 Dec 2024 16:30:13 -0800 Subject: [PATCH 21/32] Add bq writing logic --- dags/map_reproducibility/benchmarkdb_utils.py | 298 ++++++++++++++++++ dags/map_reproducibility/nemo_gpt3.py | 37 ++- dags/map_reproducibility/utils.py | 79 ++--- 3 files changed, 354 insertions(+), 60 deletions(-) create mode 100644 dags/map_reproducibility/benchmarkdb_utils.py diff --git a/dags/map_reproducibility/benchmarkdb_utils.py b/dags/map_reproducibility/benchmarkdb_utils.py new file mode 100644 index 00000000..425b13a2 --- /dev/null +++ b/dags/map_reproducibility/benchmarkdb_utils.py @@ -0,0 +1,298 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"Bash helper commands for AOTC artifacts" +import sys +import os + + +def write_benchmark_db( + avergae_step_time, tflops_per_accelerator, mfu, writer_path +): + sys.path.append(writer_path) + for path in sys.path: + print("**path: " + path) + # sys.path.append("") + module_list = list(sys.modules.keys()) + + print(f"current dir: {os.getcwd()}") + for module in module_list: + if 'aotc' in module: + print("aotc module: " + module) + # for module in sys.modules.keys(): + # print("**modules: " + module) + + # pylint: disable=import-outside-toplevel + import logging + import uuid + from typing import Type + + from aotc.benchmark_db_writer import bq_writer_utils + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import workload_benchmark_v2_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import model_info_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import software_info_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import hardware_info_schema + # pylint: enable=import-outside-toplevel + logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + ) + logger = logging.getLogger(__name__) + + def get_db_client( + table: str, dataclass_type: Type, is_test: bool = False + ) -> bq_writer_utils.create_bq_writer_object: + """Creates a BigQuery client object. + + Args: + table: The name of the BigQuery table. + dataclass_type: The dataclass type corresponding to the table schema. + is_test: Whether to use the testing project or the production project. + + Returns: + A BigQuery client object. + """ + + project = "supercomputer-testing" if is_test else "ml-workload-benchmarks" + dataset = "mantaray_v2" if is_test else "benchmark_dataset_v2" + return bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + def _validate_id( + id_value: str, + table_name: str, + id_field: str, + dataclass_type: Type, + is_test: bool = False, + ) -> bool: + """Generic function to validate an ID against a BigQuery table. + + Args: + id_value: The ID value to validate. + table_name: The name of the BigQuery table. + id_field: The name of the ID field in the table. + is_test: Whether to use the testing project or the production project. + + Returns: + True if the ID is valid, False otherwise. + """ + + client = get_db_client(table_name, dataclass_type, is_test) + result = client.query(where={id_field: id_value}) + + if not result: + logger.info( + "%s: %s is not present in the %s table ", + id_field.capitalize(), + id_value, + table_name, + ) + logger.info( + "Please add %s specific row in %s table before adding to run summary table", + id_value, + table_name, + ) + return False + return True + + def validate_model_id(model_id: str, is_test: bool = False) -> bool: + """Validates a model ID against the model_info table.""" + + id_val = _validate_id( + model_id, "model_info", "model_id", model_info_schema.ModelInfo, is_test + ) + if not id_val: + print("model id validation failed") + return False + return True + + + def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: + """Validates a hardware ID against the hardware_info table.""" + id_val = _validate_id( + hardware_id, + "hardware_info", + "hardware_id", + hardware_info_schema.HardwareInfo, + is_test, + ) + if not id_val: + print("hardware id validation failed") + return False + return True + + + def validate_software_id(software_id: str, is_test: bool = False) -> bool: + """Validates a software ID against the software_info table.""" + id_val = _validate_id( + software_id, + "software_info", + "software_id", + software_info_schema.SoftwareInfo, + is_test, + ) + + if not id_val: + print("software id validation failed") + return False + return True + + + def write_run( + model_id: str, + hardware_id: str, + software_id: str, + number_of_nodes: int, + number_of_chips: int, + container_image_name: str, + global_batch_size: int, + precision: str, + optimizer: str, + seq_length: int, + median_step_time: float, + e2e_time: float, + number_of_steps: int, + mfu: float, + tokens_per_second: float, + run_type: str = "perf_optimization", + run_release_status: str = "local", + other_metrics_in_json: str = "", + nccl_driver_nickname: str = None, + env_variables: str = "", + framework_config_in_json: str = "", + xla_flags: str = "", + topology: str = "", + dataset: str = "", + num_of_superblock: int = None, + update_person_ldap: str = os.getenv("USER"), + comment: str = "", + is_test: bool = False, + ) -> None: + """Writes a workload benchmark run manually to the database. + + This function validates the provided IDs and, if valid, constructs a + WorkloadBenchmarkV2Schema object with the given data and writes it to the + "run_summary" table in BigQuery. + + Args: + model_id: The ID of the model used in the run. + hardware_id: The ID of the hardware used in the run. + software_id: The ID of the software stack used in the run. + number_of_nodes: The number of nodes used in the run. + number_of_chips: The number of chips used in the run. + container_image_name: The name of the container image used in the run. + global_batch_size: The global batch size used in the run. + precision: The precision used in the run (e.g., fp32, bf16). + optimizer: The optimizer used in the run (e.g., adam, sgd). + seq_length: The sequence length used in the run. + median_step_time: The median step time of the run. + e2e_time: The end-to-end time of the run. + number_of_steps: The number of steps taken in the run. + mfu: The MFU (model flops utilization) achieved in the run. + tokens_per_second: The tokens per second achieved in the run. + run_type: The type of run (default: "perf_optimization"). + run_release_status: possible values "local" ( code changes are done locally), "prep_release" ( all code code changes are present in the image) + other_metrics_in_json: A JSON string containing other metrics. + nccl_driver_nickname: The nickname of the NCCL driver used. + env_variables: A string containing environment variables. + framework_config_in_json: A JSON string containing framework configurations. + xla_flags: A json string containing all the XLA flags. + topology: The topology of the hardware used in the run. ( valid for TPUs) + dataset: The dataset used in the run. + num_of_superblock: The number of superblocks in the hardware. ( valid for GPUs) + update_person_ldap: The LDAP ID of the person updating the record (default: current user). + comment: A comment about the run. + is_test: Whether to use the testing project or the production project. + + Raises: + ValueError: If any of the IDs are invalid. + """ + + if ( + validate_model_id(model_id, is_test) + and validate_hardware_id(hardware_id, is_test) + and validate_software_id(software_id, is_test) + ): + + summary = workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema( + run_id=f"run-{uuid.uuid4()}", + model_id=model_id, + software_id=software_id, + hardware_id=hardware_id, + hardware_num_chips=number_of_chips, + hardware_num_nodes=number_of_nodes, + result_success=True, + configs_framework=framework_config_in_json, + configs_env=env_variables, + configs_container_version=container_image_name, + configs_xla_flags=xla_flags, + configs_dataset=dataset, + logs_artifact_directory="", + update_person_ldap=update_person_ldap, + run_source="manual", + run_type=run_type, + run_release_status=run_release_status, + workload_precision=precision, + workload_gbs=global_batch_size, + workload_optimizer=optimizer, + workload_sequence_length=seq_length, + metrics_e2e_time=e2e_time, + metrics_mfu=mfu, + metrics_step_time=median_step_time, + metrics_tokens_per_second=tokens_per_second, + metrics_steps_for_convergence=number_of_steps, + metrics_other=other_metrics_in_json, + hardware_nccl_driver_nickname=nccl_driver_nickname, + hardware_topology=topology, + hardware_num_superblocks=num_of_superblock, + logs_comments=comment, + ) + + client = get_db_client( + "run_summary", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ) + client.write([summary]) + + else: + raise ValueError("Could not upload data in run summary table") + + write_run( + model_id="gpt3", + hardware_id="a3mega", + software_id="pytorch_nemo", + number_of_nodes=3, + number_of_chips=24, + container_image_name="sample_docker", + global_batch_size=1024, + precision="bf16", + optimizer="adam", + seq_length=12, + median_step_time=avergae_step_time, + e2e_time=0, + number_of_steps=1, + mfu=mfu, + tokens_per_second=1, + topology="2X2", + comment="Test run", + is_test=True, + ) + + logger.info("Wrote Benchmark DB, eureka!") diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index cf20b601..77174a06 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -16,11 +16,13 @@ import datetime import sys +import os import tempfile from airflow import models from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook +import subprocess from dags import composer_env from dags.map_reproducibility.utils import get_metrics_cmds from dags.map_reproducibility.utils import set_variables_cmds @@ -33,10 +35,10 @@ from dags.map_reproducibility.utils import git_cookie_authdaemon from dags.map_reproducibility.utils import clone_gob from dags.map_reproducibility.utils import helm_install_cmds -from dags.map_reproducibility.utils import get_metrics_from_gcs +from dags.map_reproducibility.utils import get_metrics from dags.map_reproducibility.utils import get_aotc_repo -from dags.map_reproducibility.utils import extract_bucket_file_name from dags.map_reproducibility.utils import extract_python_path +from dags.map_reproducibility.benchmarkdb_utils import write_benchmark_db # Run once a day at 2 pm UTC (6 am PST) @@ -73,11 +75,11 @@ def run_aotc_workload(): + install_helm_cmds() + namespace_cmds() + workload_cmds - + helm_install_cmds() - + wait_for_jobs_cmds() + # + helm_install_cmds() + # + wait_for_jobs_cmds() + copy_bucket_cmds() + get_metrics_cmds() - + cleanup_cmds() + # + cleanup_cmds() + get_aotc_repo() ), ], @@ -85,13 +87,28 @@ def run_aotc_workload(): ) assert result.exit_code == 0, f"Command failed with code {result.exit_code}" - # Extract COMPLETE_JOB_NAME from the output - bucket_name, file_name, python_path = extract_bucket_file_name( - result.output + # # Extract COMPLETE_JOB_NAME from the output + # bucket_name, file_name, python_path = extract_bucket_file_name( + # result.output + # ) + + # # Extract PYTHONPATH from the output + # python_path = extract_python_path(result.output) + python_base_path, python_path_to_bq_writer = extract_python_path( + result.output.splitlines()[-1] ) - get_metrics_from_gcs(bucket_name, file_name) + print(f"Base path in python: {python_base_path}") + print(f"python to bq: {python_path_to_bq_writer}") - sys.path.append(python_path) + average_step_time, tflops_per_accelerator, mfu = get_metrics( + python_base_path + ) + write_benchmark_db( + average_step_time, + tflops_per_accelerator, + mfu, + python_path_to_bq_writer, + ) with models.DAG( diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 89fcb8ab..5dcb7a4e 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -15,6 +15,7 @@ "Bash helper commands for AOTC artifacts" import re +import os from google.cloud import storage @@ -116,8 +117,9 @@ def wait_for_jobs_cmds(): def copy_bucket_cmds(): copy_bucket_contents = ( - "export COMPLETE_JOB_NAME=$(gcloud storage ls " - "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", + # "export COMPLETE_JOB_NAME=$(gcloud storage ls " + # "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", + "COMPLETE_JOB_NAME=gs://gunjanjalori-testing-xlml/nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/", 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" @@ -129,7 +131,8 @@ def copy_bucket_cmds(): def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe get_metrics = ( - "METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt", + # "METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt", + "METRICS_FILE=metrics.txt", "python3 process_training_results.py --file" " dllogger.json --batch_size 2048 " "--num_accelerators 256 " @@ -141,15 +144,15 @@ def get_metrics_cmds(): ) return get_metrics - def get_aotc_repo(): gob_clone_cmds = ( "echo 'trying to clone GoB aotc repo'", + "pip install dacite", "git clone https://cmcs-perf-tooling-internal.googlesource.com/" "benchmark-automation", - "cd benchmark-automation/aotc/src", + "ls", "export PYTHONPATH=$PWD", - 'echo "PYTHONPATH=$PYTHONPATH and METRICS_FILE=$METRICS_FILE"', + 'echo "PYTHONPATH=$PYTHONPATH"', ) return gob_clone_cmds @@ -160,25 +163,29 @@ def cleanup_cmds(): "cd ../../..", "kubectl get pods " "--no-headers=true | awk '{print $1}' " - "| grep $JOB_NAME | xargs kubectl delete pods", + "| grep $JOB_NAME | xargs kubectl delete pods", "helm uninstall $JOB_NAME", ) return cleanup -def get_metrics_from_gcs(bucket_name, file_name): - # Initialize GCS and BigQuery clients - storage_client = storage.Client() +def get_metrics(metrics_path): + # # Initialize GCS and BigQuery clients + # storage_client = storage.Client() + + # # Get the bucket and file + # bucket = storage_client.bucket(bucket_name) + # blob = bucket.blob(file_name) - # Get the bucket and file - bucket = storage_client.bucket(bucket_name) - blob = bucket.blob(file_name) + # # Download the file content + # metrics_output = blob.download_as_string().decode("utf-8") - # Download the file content - metrics_output = blob.download_as_string().decode("utf-8") + file_content = "" + with open(metrics_path + "/metrics.txt", "r", encoding="utf-8") as file: + file_content = file.read() # Parse the metrics (adjust based on your file format) - lines = metrics_output.splitlines() + lines = file_content.splitlines() average_step_time = float(lines[0].split(": ")[1]) tflops_per_accelerator = float(lines[1].split(": ")[1]) mfu = float(lines[2].split(": ")[1]) @@ -187,40 +194,12 @@ def get_metrics_from_gcs(bucket_name, file_name): print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") print(f"MFU: {mfu}") + return average_step_time, tflops_per_accelerator, mfu -def extract_bucket_file_name(last_line): - metrics_file = None - - # We match here because subprocesshook only outputs the last line. - match = re.search(r"PYTHONPATH=(.*?)\s+METRICS_FILE=(.*)", last_line) - if match: - python_path = match.group(1) - metrics_file = match.group(2) - print(f"PYTHONPATH in python: {python_path}") - print(f"METRICS_FILE: {metrics_file}") - else: - print("Error: Could not extract PYTHONPATH and METRICS_FILE") - print(f"Metrics file name: {metrics_file}") - if metrics_file: - # Extract bucket_name and file_name - bucket_name = re.search(r"gs://([^/]+)/", metrics_file).group(1) - file_name = re.search(r"gs://[^/]+/(.+)", metrics_file).group(1) - - print(f"Bucket name: {bucket_name}") - print(f"File name: {file_name}") - else: - print("Metrics file not found in the output.") - - return bucket_name, file_name, python_path - - -def extract_python_path(bash_result_output): - python_path = None - for line in bash_result_output.splitlines(): - if line.startswith("PYTHONPATH"): - python_path = line.split("=", 1)[1] - break - print(f"Pyhon path name: {python_path}") +def extract_python_path(last_line): + # metrics_file = None + python_path = last_line.split("=")[1] + python_path_to_bq_writer = python_path + "/benchmark-automation/aotc/src" + return python_path, python_path_to_bq_writer - return python_path From 0dca000ea42b28a0eff9bb62cf57ad03020b34eb Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 13:26:54 -0800 Subject: [PATCH 22/32] Add bq writer --- dags/map_reproducibility/benchmarkdb_utils.py | 361 +++++++++--------- dags/map_reproducibility/nemo_gpt3.py | 48 ++- dags/map_reproducibility/utils.py | 40 +- 3 files changed, 240 insertions(+), 209 deletions(-) diff --git a/dags/map_reproducibility/benchmarkdb_utils.py b/dags/map_reproducibility/benchmarkdb_utils.py index 425b13a2..d661368b 100644 --- a/dags/map_reproducibility/benchmarkdb_utils.py +++ b/dags/map_reproducibility/benchmarkdb_utils.py @@ -15,175 +15,40 @@ "Bash helper commands for AOTC artifacts" import sys import os - - -def write_benchmark_db( - avergae_step_time, tflops_per_accelerator, mfu, writer_path +import getpass + + +def write_run( + model_id: str, + hardware_id: str, + software_id: str, + number_of_nodes: int, + number_of_chips: int, + container_image_name: str, + global_batch_size: int, + precision: str, + optimizer: str, + seq_length: int, + median_step_time: float, + e2e_time: float, + number_of_steps: int, + mfu: float, + tokens_per_second: float, + writer_path: str, + run_type: str = "perf_optimization", + run_release_status: str = "local", + other_metrics_in_json: str = "", + nccl_driver_nickname: str = None, + env_variables: str = "", + framework_config_in_json: str = "", + xla_flags: str = "", + topology: str = "", + dataset: str = "", + num_of_superblock: int = None, + update_person_ldap: str = getpass.getuser(), + comment: str = "", + is_test: bool = False ): - sys.path.append(writer_path) - for path in sys.path: - print("**path: " + path) - # sys.path.append("") - module_list = list(sys.modules.keys()) - - print(f"current dir: {os.getcwd()}") - for module in module_list: - if 'aotc' in module: - print("aotc module: " + module) - # for module in sys.modules.keys(): - # print("**modules: " + module) - - # pylint: disable=import-outside-toplevel - import logging - import uuid - from typing import Type - - from aotc.benchmark_db_writer import bq_writer_utils - from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import workload_benchmark_v2_schema - from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import model_info_schema - from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import software_info_schema - from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import hardware_info_schema - # pylint: enable=import-outside-toplevel - logging.basicConfig( - format="%(asctime)s %(levelname)-8s %(message)s", - level=logging.INFO, - datefmt="%Y-%m-%d %H:%M:%S", - ) - logger = logging.getLogger(__name__) - - def get_db_client( - table: str, dataclass_type: Type, is_test: bool = False - ) -> bq_writer_utils.create_bq_writer_object: - """Creates a BigQuery client object. - - Args: - table: The name of the BigQuery table. - dataclass_type: The dataclass type corresponding to the table schema. - is_test: Whether to use the testing project or the production project. - - Returns: - A BigQuery client object. - """ - - project = "supercomputer-testing" if is_test else "ml-workload-benchmarks" - dataset = "mantaray_v2" if is_test else "benchmark_dataset_v2" - return bq_writer_utils.create_bq_writer_object( - project=project, - dataset=dataset, - table=table, - dataclass_type=dataclass_type, - ) - - def _validate_id( - id_value: str, - table_name: str, - id_field: str, - dataclass_type: Type, - is_test: bool = False, - ) -> bool: - """Generic function to validate an ID against a BigQuery table. - - Args: - id_value: The ID value to validate. - table_name: The name of the BigQuery table. - id_field: The name of the ID field in the table. - is_test: Whether to use the testing project or the production project. - - Returns: - True if the ID is valid, False otherwise. - """ - - client = get_db_client(table_name, dataclass_type, is_test) - result = client.query(where={id_field: id_value}) - - if not result: - logger.info( - "%s: %s is not present in the %s table ", - id_field.capitalize(), - id_value, - table_name, - ) - logger.info( - "Please add %s specific row in %s table before adding to run summary table", - id_value, - table_name, - ) - return False - return True - - def validate_model_id(model_id: str, is_test: bool = False) -> bool: - """Validates a model ID against the model_info table.""" - - id_val = _validate_id( - model_id, "model_info", "model_id", model_info_schema.ModelInfo, is_test - ) - if not id_val: - print("model id validation failed") - return False - return True - - - def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: - """Validates a hardware ID against the hardware_info table.""" - id_val = _validate_id( - hardware_id, - "hardware_info", - "hardware_id", - hardware_info_schema.HardwareInfo, - is_test, - ) - if not id_val: - print("hardware id validation failed") - return False - return True - - - def validate_software_id(software_id: str, is_test: bool = False) -> bool: - """Validates a software ID against the software_info table.""" - id_val = _validate_id( - software_id, - "software_info", - "software_id", - software_info_schema.SoftwareInfo, - is_test, - ) - - if not id_val: - print("software id validation failed") - return False - return True - - - def write_run( - model_id: str, - hardware_id: str, - software_id: str, - number_of_nodes: int, - number_of_chips: int, - container_image_name: str, - global_batch_size: int, - precision: str, - optimizer: str, - seq_length: int, - median_step_time: float, - e2e_time: float, - number_of_steps: int, - mfu: float, - tokens_per_second: float, - run_type: str = "perf_optimization", - run_release_status: str = "local", - other_metrics_in_json: str = "", - nccl_driver_nickname: str = None, - env_variables: str = "", - framework_config_in_json: str = "", - xla_flags: str = "", - topology: str = "", - dataset: str = "", - num_of_superblock: int = None, - update_person_ldap: str = os.getenv("USER"), - comment: str = "", - is_test: bool = False, - ) -> None: """Writes a workload benchmark run manually to the database. This function validates the provided IDs and, if valid, constructs a @@ -224,6 +89,143 @@ def write_run( ValueError: If any of the IDs are invalid. """ + sys.path.append(writer_path) + for path in sys.path: + print("**path: " + path) + # sys.path.append("") + module_list = list(sys.modules.keys()) + + print(f"current dir: {os.getcwd()}") + for module in module_list: + if 'aotc' in module: + print("aotc module: " + module) + # for module in sys.modules.keys(): + # print("**modules: " + module) + + # pylint: disable=import-outside-toplevel + import logging + import uuid + from typing import Type + + from aotc.benchmark_db_writer import bq_writer_utils + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import workload_benchmark_v2_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import model_info_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import software_info_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import hardware_info_schema + # pylint: enable=import-outside-toplevel + logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + ) + logger = logging.getLogger(__name__) + + def get_db_client( + table: str, dataclass_type: Type, is_test: bool = False + ) -> bq_writer_utils.create_bq_writer_object: + """Creates a BigQuery client object. + + Args: + table: The name of the BigQuery table. + dataclass_type: The dataclass type corresponding to the table schema. + is_test: Whether to use the testing project or the production project. + + Returns: + A BigQuery client object. + """ + + project = "supercomputer-testing" if is_test else "ml-workload-benchmarks" + dataset = "mantaray_v2" if is_test else "benchmark_dataset_v2" + return bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, + ) + + def _validate_id( + id_value: str, + table_name: str, + id_field: str, + dataclass_type: Type, + is_test: bool = False, + ) -> bool: + """Generic function to validate an ID against a BigQuery table. + + Args: + id_value: The ID value to validate. + table_name: The name of the BigQuery table. + id_field: The name of the ID field in the table. + is_test: Whether to use the testing project or the production project. + + Returns: + True if the ID is valid, False otherwise. + """ + + client = get_db_client(table_name, dataclass_type, is_test) + result = client.query(where={id_field: id_value}) + + if not result: + logger.info( + "%s: %s is not present in the %s table ", + id_field.capitalize(), + id_value, + table_name, + ) + logger.info( + "Please add %s specific row in %s table before adding to run summary table", + id_value, + table_name, + ) + return False + return True + + def validate_model_id(model_id: str, is_test: bool = False) -> bool: + """Validates a model ID against the model_info table.""" + + print("model id: " + model_id) + id_val = _validate_id( + model_id, "model_info", "model_id", model_info_schema.ModelInfo, is_test + ) + if not id_val: + print("model id validation failed") + return False + return True + + + def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: + """Validates a hardware ID against the hardware_info table.""" + id_val = _validate_id( + hardware_id, + "hardware_info", + "hardware_id", + hardware_info_schema.HardwareInfo, + is_test, + ) + if not id_val: + print("hardware id validation failed") + return False + return True + + + def validate_software_id(software_id: str, is_test: bool = False) -> bool: + """Validates a software ID against the software_info table.""" + id_val = _validate_id( + software_id, + "software_info", + "software_id", + software_info_schema.SoftwareInfo, + is_test, + ) + + if not id_val: + print("software id validation failed") + return False + return True + + + print(model_id) + if ( validate_model_id(model_id, is_test) and validate_hardware_id(hardware_id, is_test) @@ -274,25 +276,4 @@ def write_run( else: raise ValueError("Could not upload data in run summary table") - write_run( - model_id="gpt3", - hardware_id="a3mega", - software_id="pytorch_nemo", - number_of_nodes=3, - number_of_chips=24, - container_image_name="sample_docker", - global_batch_size=1024, - precision="bf16", - optimizer="adam", - seq_length=12, - median_step_time=avergae_step_time, - e2e_time=0, - number_of_steps=1, - mfu=mfu, - tokens_per_second=1, - topology="2X2", - comment="Test run", - is_test=True, - ) - - logger.info("Wrote Benchmark DB, eureka!") + logger.info("Wrote Benchmark DB, eureka!") diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 9669cb52..44eeb8eb 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -38,7 +38,8 @@ from dags.map_reproducibility.utils import get_metrics from dags.map_reproducibility.utils import get_aotc_repo from dags.map_reproducibility.utils import extract_python_path -from dags.map_reproducibility.benchmarkdb_utils import write_benchmark_db +from dags.map_reproducibility.benchmarkdb_utils import write_run +from dags.map_reproducibility.utils import extract_gpus # Run once a day at 2 pm UTC (6 am PST) @@ -100,14 +101,47 @@ def run_aotc_workload(): print(f"Base path in python: {python_base_path}") print(f"python to bq: {python_path_to_bq_writer}") - average_step_time, tflops_per_accelerator, mfu = get_metrics( + yaml_file_path = "reproducible-benchmark-recipes/projects/gpu-recipes/training/a3mega/gpt3-175b/nemo-pretraining-gke/values.yaml" + config_yaml_path = "reproducible-benchmark-recipes/projects/gpu-recipes/src/frameworks/a3mega/nemo-configs/gpt3-175b-256gpus-fp8.yaml" + ( + number_of_nodes, + global_batch_size, + optimizer, + precision, + seq_length, + max_steps, + ) = extract_gpus(tmpdir, yaml_file_path, config_yaml_path) + print( + f"batch size: {global_batch_size}, number of nodes: {number_of_nodes}" + ) + average_step_time, mfu = get_metrics( python_base_path ) - write_benchmark_db( - average_step_time, - tflops_per_accelerator, - mfu, - python_path_to_bq_writer, + model_id = "gpt3-175b" + hardware_id = "a3mega" + software_id = "pytorch_nemo" + number_of_chips = number_of_nodes * 8 + + write_run( + model_id=model_id, + hardware_id=hardware_id, + software_id=software_id, + number_of_nodes=number_of_nodes, + number_of_chips=number_of_chips, + container_image_name="sample_docker", + global_batch_size=global_batch_size, + precision=precision, + optimizer=optimizer, + seq_length=seq_length, + median_step_time=average_step_time, + e2e_time=0, + number_of_steps=max_steps, + mfu=mfu, + tokens_per_second=1, + writer_path=python_path_to_bq_writer, + topology="2X2", + comment="Regression tests", + is_test=True, ) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 555dada5..d0e5dbf4 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -17,7 +17,7 @@ import re import os from google.cloud import storage - +import yaml def set_variables_cmds(): set_variables = ( @@ -147,7 +147,6 @@ def get_metrics_cmds(): def get_aotc_repo(): gob_clone_cmds = ( "echo 'trying to clone GoB aotc repo'", - "pip install dacite", "git clone https://cmcs-perf-tooling-internal.googlesource.com/" "benchmark-automation", "ls", @@ -170,15 +169,6 @@ def cleanup_cmds(): def get_metrics(metrics_path): - # # Initialize GCS and BigQuery clients - # storage_client = storage.Client() - - # # Get the bucket and file - # bucket = storage_client.bucket(bucket_name) - # blob = bucket.blob(file_name) - - # # Download the file content - # metrics_output = blob.download_as_string().decode("utf-8") file_content = "" with open(metrics_path + "/metrics.txt", "r", encoding="utf-8") as file: @@ -194,7 +184,7 @@ def get_metrics(metrics_path): print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") print(f"MFU: {mfu}") - return average_step_time, tflops_per_accelerator, mfu + return average_step_time, mfu def extract_python_path(last_line): @@ -203,3 +193,29 @@ def extract_python_path(last_line): python_path_to_bq_writer = python_path + "/benchmark-automation/aotc/src" return python_path, python_path_to_bq_writer +def extract_gpus(tmpdir, yaml_file, config_path): + gpus = None + batch_size = None + optimizer = None + try: + yaml_file_path = os.path.join(tmpdir, yaml_file) + with open(yaml_file_path, "r", encoding="utf-8") as file: + config = yaml.safe_load(file) + gpus = config.get("workload", {}).get("gpus") + except (FileNotFoundError, yaml.YAMLError) as e: + print(f"Error: {e}") + return None + try: + config_path = os.path.join(tmpdir, config_path) + with open(config_path, "r", encoding="utf-8") as file: + config = yaml.safe_load(file) + batch_size = config.get("model", {}).get("global_batch_size") + precision = config.get("trainer", {}).get("precision") + optimizer = config.get("model", {}).get("optim", {}).get("name") + seq_length = config.get("model", {}).get("data", {}).get("seq_length") + max_steps = config.get("trainer", {}).get("max_steps") + except (FileNotFoundError, yaml.YAMLError) as e: + print(f"Error: {e}") + return None + return gpus, batch_size, optimizer, precision, seq_length, max_steps + From 1b3c39f49afdbf515b127d715ee44acd65f4a674 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 13:30:56 -0800 Subject: [PATCH 23/32] format fix --- dags/map_reproducibility/benchmarkdb_utils.py | 430 +++++++++--------- 1 file changed, 215 insertions(+), 215 deletions(-) diff --git a/dags/map_reproducibility/benchmarkdb_utils.py b/dags/map_reproducibility/benchmarkdb_utils.py index d661368b..b1186117 100644 --- a/dags/map_reproducibility/benchmarkdb_utils.py +++ b/dags/map_reproducibility/benchmarkdb_utils.py @@ -35,7 +35,7 @@ def write_run( mfu: float, tokens_per_second: float, writer_path: str, - run_type: str = "perf_optimization", + run_type: str = "perf_regression", run_release_status: str = "local", other_metrics_in_json: str = "", nccl_driver_nickname: str = None, @@ -49,231 +49,231 @@ def write_run( comment: str = "", is_test: bool = False ): - """Writes a workload benchmark run manually to the database. - - This function validates the provided IDs and, if valid, constructs a - WorkloadBenchmarkV2Schema object with the given data and writes it to the - "run_summary" table in BigQuery. + """Writes a workload benchmark run manually to the database. + + This function validates the provided IDs and, if valid, constructs a + WorkloadBenchmarkV2Schema object with the given data and writes it to the + "run_summary" table in BigQuery. + + Args: + model_id: The ID of the model used in the run. + hardware_id: The ID of the hardware used in the run. + software_id: The ID of the software stack used in the run. + number_of_nodes: The number of nodes used in the run. + number_of_chips: The number of chips used in the run. + container_image_name: The name of the container image used in the run. + global_batch_size: The global batch size used in the run. + precision: The precision used in the run (e.g., fp32, bf16). + optimizer: The optimizer used in the run (e.g., adam, sgd). + seq_length: The sequence length used in the run. + median_step_time: The median step time of the run. + e2e_time: The end-to-end time of the run. + number_of_steps: The number of steps taken in the run. + mfu: The MFU (model flops utilization) achieved in the run. + tokens_per_second: The tokens per second achieved in the run. + run_type: The type of run (default: "perf_optimization"). + run_release_status: possible values "local" ( code changes are done locally), "prep_release" ( all code code changes are present in the image) + other_metrics_in_json: A JSON string containing other metrics. + nccl_driver_nickname: The nickname of the NCCL driver used. + env_variables: A string containing environment variables. + framework_config_in_json: A JSON string containing framework configurations. + xla_flags: A json string containing all the XLA flags. + topology: The topology of the hardware used in the run. ( valid for TPUs) + dataset: The dataset used in the run. + num_of_superblock: The number of superblocks in the hardware. ( valid for GPUs) + update_person_ldap: The LDAP ID of the person updating the record (default: current user). + comment: A comment about the run. + is_test: Whether to use the testing project or the production project. + + Raises: + ValueError: If any of the IDs are invalid. + """ + + sys.path.append(writer_path) + for path in sys.path: + print("**path: " + path) + # sys.path.append("") + module_list = list(sys.modules.keys()) + + print(f"current dir: {os.getcwd()}") + for module in module_list: + if 'aotc' in module: + print("aotc module: " + module) + # for module in sys.modules.keys(): + # print("**modules: " + module) + + # pylint: disable=import-outside-toplevel + import logging + import uuid + from typing import Type + + from aotc.benchmark_db_writer import bq_writer_utils + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import workload_benchmark_v2_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import model_info_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import software_info_schema + from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import hardware_info_schema + # pylint: enable=import-outside-toplevel + logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + ) + logger = logging.getLogger(__name__) + + def get_db_client( + table: str, dataclass_type: Type, is_test: bool = False + ) -> bq_writer_utils.create_bq_writer_object: + """Creates a BigQuery client object. Args: - model_id: The ID of the model used in the run. - hardware_id: The ID of the hardware used in the run. - software_id: The ID of the software stack used in the run. - number_of_nodes: The number of nodes used in the run. - number_of_chips: The number of chips used in the run. - container_image_name: The name of the container image used in the run. - global_batch_size: The global batch size used in the run. - precision: The precision used in the run (e.g., fp32, bf16). - optimizer: The optimizer used in the run (e.g., adam, sgd). - seq_length: The sequence length used in the run. - median_step_time: The median step time of the run. - e2e_time: The end-to-end time of the run. - number_of_steps: The number of steps taken in the run. - mfu: The MFU (model flops utilization) achieved in the run. - tokens_per_second: The tokens per second achieved in the run. - run_type: The type of run (default: "perf_optimization"). - run_release_status: possible values "local" ( code changes are done locally), "prep_release" ( all code code changes are present in the image) - other_metrics_in_json: A JSON string containing other metrics. - nccl_driver_nickname: The nickname of the NCCL driver used. - env_variables: A string containing environment variables. - framework_config_in_json: A JSON string containing framework configurations. - xla_flags: A json string containing all the XLA flags. - topology: The topology of the hardware used in the run. ( valid for TPUs) - dataset: The dataset used in the run. - num_of_superblock: The number of superblocks in the hardware. ( valid for GPUs) - update_person_ldap: The LDAP ID of the person updating the record (default: current user). - comment: A comment about the run. + table: The name of the BigQuery table. + dataclass_type: The dataclass type corresponding to the table schema. is_test: Whether to use the testing project or the production project. - Raises: - ValueError: If any of the IDs are invalid. + Returns: + A BigQuery client object. """ - sys.path.append(writer_path) - for path in sys.path: - print("**path: " + path) - # sys.path.append("") - module_list = list(sys.modules.keys()) - - print(f"current dir: {os.getcwd()}") - for module in module_list: - if 'aotc' in module: - print("aotc module: " + module) - # for module in sys.modules.keys(): - # print("**modules: " + module) - - # pylint: disable=import-outside-toplevel - import logging - import uuid - from typing import Type - - from aotc.benchmark_db_writer import bq_writer_utils - from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import workload_benchmark_v2_schema - from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import model_info_schema - from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import software_info_schema - from aotc.benchmark_db_writer.schema.workload_benchmark_v2 import hardware_info_schema - # pylint: enable=import-outside-toplevel - logging.basicConfig( - format="%(asctime)s %(levelname)-8s %(message)s", - level=logging.INFO, - datefmt="%Y-%m-%d %H:%M:%S", + project = "supercomputer-testing" if is_test else "ml-workload-benchmarks" + dataset = "mantaray_v2" if is_test else "benchmark_dataset_v2" + return bq_writer_utils.create_bq_writer_object( + project=project, + dataset=dataset, + table=table, + dataclass_type=dataclass_type, ) - logger = logging.getLogger(__name__) - - def get_db_client( - table: str, dataclass_type: Type, is_test: bool = False - ) -> bq_writer_utils.create_bq_writer_object: - """Creates a BigQuery client object. - - Args: - table: The name of the BigQuery table. - dataclass_type: The dataclass type corresponding to the table schema. - is_test: Whether to use the testing project or the production project. - - Returns: - A BigQuery client object. - """ - - project = "supercomputer-testing" if is_test else "ml-workload-benchmarks" - dataset = "mantaray_v2" if is_test else "benchmark_dataset_v2" - return bq_writer_utils.create_bq_writer_object( - project=project, - dataset=dataset, - table=table, - dataclass_type=dataclass_type, - ) - def _validate_id( - id_value: str, - table_name: str, - id_field: str, - dataclass_type: Type, - is_test: bool = False, - ) -> bool: - """Generic function to validate an ID against a BigQuery table. - - Args: - id_value: The ID value to validate. - table_name: The name of the BigQuery table. - id_field: The name of the ID field in the table. - is_test: Whether to use the testing project or the production project. - - Returns: - True if the ID is valid, False otherwise. - """ - - client = get_db_client(table_name, dataclass_type, is_test) - result = client.query(where={id_field: id_value}) - - if not result: - logger.info( - "%s: %s is not present in the %s table ", - id_field.capitalize(), - id_value, - table_name, - ) - logger.info( - "Please add %s specific row in %s table before adding to run summary table", - id_value, - table_name, - ) - return False - return True - - def validate_model_id(model_id: str, is_test: bool = False) -> bool: - """Validates a model ID against the model_info table.""" - - print("model id: " + model_id) - id_val = _validate_id( - model_id, "model_info", "model_id", model_info_schema.ModelInfo, is_test - ) - if not id_val: - print("model id validation failed") - return False - return True - - - def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: - """Validates a hardware ID against the hardware_info table.""" - id_val = _validate_id( - hardware_id, - "hardware_info", - "hardware_id", - hardware_info_schema.HardwareInfo, - is_test, + def _validate_id( + id_value: str, + table_name: str, + id_field: str, + dataclass_type: Type, + is_test: bool = False, + ) -> bool: + """Generic function to validate an ID against a BigQuery table. + + Args: + id_value: The ID value to validate. + table_name: The name of the BigQuery table. + id_field: The name of the ID field in the table. + is_test: Whether to use the testing project or the production project. + + Returns: + True if the ID is valid, False otherwise. + """ + + client = get_db_client(table_name, dataclass_type, is_test) + result = client.query(where={id_field: id_value}) + + if not result: + logger.info( + "%s: %s is not present in the %s table ", + id_field.capitalize(), + id_value, + table_name, ) - if not id_val: - print("hardware id validation failed") - return False - return True - - - def validate_software_id(software_id: str, is_test: bool = False) -> bool: - """Validates a software ID against the software_info table.""" - id_val = _validate_id( - software_id, - "software_info", - "software_id", - software_info_schema.SoftwareInfo, - is_test, + logger.info( + "Please add %s specific row in %s table before adding to run summary table", + id_value, + table_name, ) + return False + return True - if not id_val: - print("software id validation failed") - return False - return True - - - print(model_id) - - if ( - validate_model_id(model_id, is_test) - and validate_hardware_id(hardware_id, is_test) - and validate_software_id(software_id, is_test) - ): - - summary = workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema( - run_id=f"run-{uuid.uuid4()}", - model_id=model_id, - software_id=software_id, - hardware_id=hardware_id, - hardware_num_chips=number_of_chips, - hardware_num_nodes=number_of_nodes, - result_success=True, - configs_framework=framework_config_in_json, - configs_env=env_variables, - configs_container_version=container_image_name, - configs_xla_flags=xla_flags, - configs_dataset=dataset, - logs_artifact_directory="", - update_person_ldap=update_person_ldap, - run_source="manual", - run_type=run_type, - run_release_status=run_release_status, - workload_precision=precision, - workload_gbs=global_batch_size, - workload_optimizer=optimizer, - workload_sequence_length=seq_length, - metrics_e2e_time=e2e_time, - metrics_mfu=mfu, - metrics_step_time=median_step_time, - metrics_tokens_per_second=tokens_per_second, - metrics_steps_for_convergence=number_of_steps, - metrics_other=other_metrics_in_json, - hardware_nccl_driver_nickname=nccl_driver_nickname, - hardware_topology=topology, - hardware_num_superblocks=num_of_superblock, - logs_comments=comment, - ) + def validate_model_id(model_id: str, is_test: bool = False) -> bool: + """Validates a model ID against the model_info table.""" - client = get_db_client( - "run_summary", - workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, - is_test, - ) - client.write([summary]) + print("model id: " + model_id) + id_val = _validate_id( + model_id, "model_info", "model_id", model_info_schema.ModelInfo, is_test + ) + if not id_val: + print("model id validation failed") + return False + return True + + + def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: + """Validates a hardware ID against the hardware_info table.""" + id_val = _validate_id( + hardware_id, + "hardware_info", + "hardware_id", + hardware_info_schema.HardwareInfo, + is_test, + ) + if not id_val: + print("hardware id validation failed") + return False + return True + + + def validate_software_id(software_id: str, is_test: bool = False) -> bool: + """Validates a software ID against the software_info table.""" + id_val = _validate_id( + software_id, + "software_info", + "software_id", + software_info_schema.SoftwareInfo, + is_test, + ) + + if not id_val: + print("software id validation failed") + return False + return True + + + print(model_id) + + if ( + validate_model_id(model_id, is_test) + and validate_hardware_id(hardware_id, is_test) + and validate_software_id(software_id, is_test) + ): + + summary = workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema( + run_id=f"run-{uuid.uuid4()}", + model_id=model_id, + software_id=software_id, + hardware_id=hardware_id, + hardware_num_chips=number_of_chips, + hardware_num_nodes=number_of_nodes, + result_success=True, + configs_framework=framework_config_in_json, + configs_env=env_variables, + configs_container_version=container_image_name, + configs_xla_flags=xla_flags, + configs_dataset=dataset, + logs_artifact_directory="", + update_person_ldap=update_person_ldap, + run_source="manual", + run_type=run_type, + run_release_status=run_release_status, + workload_precision=precision, + workload_gbs=global_batch_size, + workload_optimizer=optimizer, + workload_sequence_length=seq_length, + metrics_e2e_time=e2e_time, + metrics_mfu=mfu, + metrics_step_time=median_step_time, + metrics_tokens_per_second=tokens_per_second, + metrics_steps_for_convergence=number_of_steps, + metrics_other=other_metrics_in_json, + hardware_nccl_driver_nickname=nccl_driver_nickname, + hardware_topology=topology, + hardware_num_superblocks=num_of_superblock, + logs_comments=comment, + ) + + client = get_db_client( + "run_summary", + workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema, + is_test, + ) + client.write([summary]) - else: - raise ValueError("Could not upload data in run summary table") + else: + raise ValueError("Could not upload data in run summary table") - logger.info("Wrote Benchmark DB, eureka!") + logger.info("Wrote Benchmark DB, eureka!") From 30eaed52ac17ba0f0dddcb4060df99932efa5c75 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 14:00:20 -0800 Subject: [PATCH 24/32] fix fields name --- dags/map_reproducibility/benchmarkdb_utils.py | 5 +++-- dags/map_reproducibility/nemo_gpt3.py | 10 ++-------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/dags/map_reproducibility/benchmarkdb_utils.py b/dags/map_reproducibility/benchmarkdb_utils.py index b1186117..2ed30c66 100644 --- a/dags/map_reproducibility/benchmarkdb_utils.py +++ b/dags/map_reproducibility/benchmarkdb_utils.py @@ -35,6 +35,7 @@ def write_run( mfu: float, tokens_per_second: float, writer_path: str, + run_success: bool = True, # True because if mfu is none, writing to db will fail anyway. run_type: str = "perf_regression", run_release_status: str = "local", other_metrics_in_json: str = "", @@ -239,7 +240,7 @@ def validate_software_id(software_id: str, is_test: bool = False) -> bool: hardware_id=hardware_id, hardware_num_chips=number_of_chips, hardware_num_nodes=number_of_nodes, - result_success=True, + result_success=run_success, configs_framework=framework_config_in_json, configs_env=env_variables, configs_container_version=container_image_name, @@ -247,7 +248,7 @@ def validate_software_id(software_id: str, is_test: bool = False) -> bool: configs_dataset=dataset, logs_artifact_directory="", update_person_ldap=update_person_ldap, - run_source="manual", + run_source="automation", run_type=run_type, run_release_status=run_release_status, workload_precision=precision, diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 44eeb8eb..76bf7adf 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -88,13 +88,6 @@ def run_aotc_workload(): ) assert result.exit_code == 0, f"Command failed with code {result.exit_code}" - # # Extract COMPLETE_JOB_NAME from the output - # bucket_name, file_name, python_path = extract_bucket_file_name( - # result.output - # ) - - # # Extract PYTHONPATH from the output - # python_path = extract_python_path(result.output) python_base_path, python_path_to_bq_writer = extract_python_path( result.output.splitlines()[-1] ) @@ -120,6 +113,7 @@ def run_aotc_workload(): model_id = "gpt3-175b" hardware_id = "a3mega" software_id = "pytorch_nemo" + image_version = "nemo_workload:24.07" number_of_chips = number_of_nodes * 8 write_run( @@ -128,7 +122,7 @@ def run_aotc_workload(): software_id=software_id, number_of_nodes=number_of_nodes, number_of_chips=number_of_chips, - container_image_name="sample_docker", + container_image_name=image_version, global_batch_size=global_batch_size, precision=precision, optimizer=optimizer, From b69b9d9facbeb2d9d1d0202071ab361d4ad4b67a Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 17:37:14 -0800 Subject: [PATCH 25/32] clean code --- dags/map_reproducibility/nemo_gpt3.py | 7 +++---- dags/map_reproducibility/utils.py | 9 +++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 76bf7adf..d7c73caf 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -22,7 +22,6 @@ from airflow import models from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook -import subprocess from dags import composer_env from dags.map_reproducibility.utils import get_metrics_cmds from dags.map_reproducibility.utils import set_variables_cmds @@ -76,11 +75,11 @@ def run_aotc_workload(): + install_helm_cmds() + namespace_cmds() + workload_cmds - # + helm_install_cmds() - # + wait_for_jobs_cmds() + + helm_install_cmds() + + wait_for_jobs_cmds() + copy_bucket_cmds() + get_metrics_cmds() - # + cleanup_cmds() + + cleanup_cmds() + get_aotc_repo() ), ], diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index d0e5dbf4..5d346fa7 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -19,6 +19,7 @@ from google.cloud import storage import yaml + def set_variables_cmds(): set_variables = ( "export PROJECT=supercomputer-testing", @@ -117,9 +118,8 @@ def wait_for_jobs_cmds(): def copy_bucket_cmds(): copy_bucket_contents = ( - # "export COMPLETE_JOB_NAME=$(gcloud storage ls " - # "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", - "COMPLETE_JOB_NAME=gs://gunjanjalori-testing-xlml/nemo-experiments/gpt3-xlml-1731373474-175b-nemo-1731373494-ic5n/", + "export COMPLETE_JOB_NAME=$(gcloud storage ls " + "gs://$BUCKET_NAME/nemo-experiments/ | grep $JOB_NAME)", 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', "cd $REPO_ROOT/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" @@ -144,6 +144,7 @@ def get_metrics_cmds(): ) return get_metrics + def get_aotc_repo(): gob_clone_cmds = ( "echo 'trying to clone GoB aotc repo'", @@ -193,6 +194,7 @@ def extract_python_path(last_line): python_path_to_bq_writer = python_path + "/benchmark-automation/aotc/src" return python_path, python_path_to_bq_writer + def extract_gpus(tmpdir, yaml_file, config_path): gpus = None batch_size = None @@ -218,4 +220,3 @@ def extract_gpus(tmpdir, yaml_file, config_path): print(f"Error: {e}") return None return gpus, batch_size, optimizer, precision, seq_length, max_steps - From 93470fd60117f6205ebead29d15fef5ed8c2cb23 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 17:38:36 -0800 Subject: [PATCH 26/32] clean code --- dags/map_reproducibility/nemo_gpt3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index d7c73caf..7d6129e2 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -33,7 +33,7 @@ from dags.map_reproducibility.utils import cleanup_cmds from dags.map_reproducibility.utils import git_cookie_authdaemon from dags.map_reproducibility.utils import clone_gob -from dags.map_reproducibility.utils import helm_install_cmds +from dags.map_reproducibility.utils import helm_apply_cmds from dags.map_reproducibility.utils import get_metrics from dags.map_reproducibility.utils import get_aotc_repo from dags.map_reproducibility.utils import extract_python_path @@ -75,7 +75,7 @@ def run_aotc_workload(): + install_helm_cmds() + namespace_cmds() + workload_cmds - + helm_install_cmds() + + helm_apply_cmds() + wait_for_jobs_cmds() + copy_bucket_cmds() + get_metrics_cmds() From 77b981d24d2510a7f5af10170ed825494c5a6f0e Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 17:46:32 -0800 Subject: [PATCH 27/32] clean code --- dags/map_reproducibility/nemo_gpt3.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 7d6129e2..79c9fbdc 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -38,7 +38,7 @@ from dags.map_reproducibility.utils import get_aotc_repo from dags.map_reproducibility.utils import extract_python_path from dags.map_reproducibility.benchmarkdb_utils import write_run -from dags.map_reproducibility.utils import extract_gpus +from dags.map_reproducibility.utils import extract_run_details # Run once a day at 2 pm UTC (6 am PST) @@ -95,6 +95,7 @@ def run_aotc_workload(): yaml_file_path = "reproducible-benchmark-recipes/projects/gpu-recipes/training/a3mega/gpt3-175b/nemo-pretraining-gke/values.yaml" config_yaml_path = "reproducible-benchmark-recipes/projects/gpu-recipes/src/frameworks/a3mega/nemo-configs/gpt3-175b-256gpus-fp8.yaml" + ( number_of_nodes, global_batch_size, @@ -102,7 +103,7 @@ def run_aotc_workload(): precision, seq_length, max_steps, - ) = extract_gpus(tmpdir, yaml_file_path, config_yaml_path) + ) = extract_run_details(tmpdir, yaml_file_path, config_yaml_path) print( f"batch size: {global_batch_size}, number of nodes: {number_of_nodes}" ) From 464637dd7a2cfd55c7e133e2d935dcf2f4b9980e Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 17:50:54 -0800 Subject: [PATCH 28/32] clean code --- dags/map_reproducibility/benchmarkdb_utils.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/dags/map_reproducibility/benchmarkdb_utils.py b/dags/map_reproducibility/benchmarkdb_utils.py index 2ed30c66..41a20dcc 100644 --- a/dags/map_reproducibility/benchmarkdb_utils.py +++ b/dags/map_reproducibility/benchmarkdb_utils.py @@ -35,7 +35,7 @@ def write_run( mfu: float, tokens_per_second: float, writer_path: str, - run_success: bool = True, # True because if mfu is none, writing to db will fail anyway. + run_success: bool = True, # True because if mfu is none, writing to db will fail anyway. run_type: str = "perf_regression", run_release_status: str = "local", other_metrics_in_json: str = "", @@ -48,7 +48,7 @@ def write_run( num_of_superblock: int = None, update_person_ldap: str = getpass.getuser(), comment: str = "", - is_test: bool = False + is_test: bool = False, ): """Writes a workload benchmark run manually to the database. @@ -91,17 +91,6 @@ def write_run( """ sys.path.append(writer_path) - for path in sys.path: - print("**path: " + path) - # sys.path.append("") - module_list = list(sys.modules.keys()) - - print(f"current dir: {os.getcwd()}") - for module in module_list: - if 'aotc' in module: - print("aotc module: " + module) - # for module in sys.modules.keys(): - # print("**modules: " + module) # pylint: disable=import-outside-toplevel import logging @@ -277,4 +266,3 @@ def validate_software_id(software_id: str, is_test: bool = False) -> bool: else: raise ValueError("Could not upload data in run summary table") - logger.info("Wrote Benchmark DB, eureka!") From fe91242e63cd44730bc0cb81379fc2b48b7953b0 Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 17:54:21 -0800 Subject: [PATCH 29/32] fix format --- dags/map_reproducibility/benchmarkdb_utils.py | 4 ---- dags/map_reproducibility/nemo_gpt3.py | 4 +--- dags/map_reproducibility/utils.py | 9 ++++----- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/dags/map_reproducibility/benchmarkdb_utils.py b/dags/map_reproducibility/benchmarkdb_utils.py index 41a20dcc..61a2c78a 100644 --- a/dags/map_reproducibility/benchmarkdb_utils.py +++ b/dags/map_reproducibility/benchmarkdb_utils.py @@ -182,7 +182,6 @@ def validate_model_id(model_id: str, is_test: bool = False) -> bool: return False return True - def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: """Validates a hardware ID against the hardware_info table.""" id_val = _validate_id( @@ -197,7 +196,6 @@ def validate_hardware_id(hardware_id: str, is_test: bool = False) -> bool: return False return True - def validate_software_id(software_id: str, is_test: bool = False) -> bool: """Validates a software ID against the software_info table.""" id_val = _validate_id( @@ -213,7 +211,6 @@ def validate_software_id(software_id: str, is_test: bool = False) -> bool: return False return True - print(model_id) if ( @@ -221,7 +218,6 @@ def validate_software_id(software_id: str, is_test: bool = False) -> bool: and validate_hardware_id(hardware_id, is_test) and validate_software_id(software_id, is_test) ): - summary = workload_benchmark_v2_schema.WorkloadBenchmarkV2Schema( run_id=f"run-{uuid.uuid4()}", model_id=model_id, diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 79c9fbdc..85f7b006 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -107,9 +107,7 @@ def run_aotc_workload(): print( f"batch size: {global_batch_size}, number of nodes: {number_of_nodes}" ) - average_step_time, mfu = get_metrics( - python_base_path - ) + average_step_time, mfu = get_metrics(python_base_path) model_id = "gpt3-175b" hardware_id = "a3mega" software_id = "pytorch_nemo" diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 5d346fa7..08a0c99b 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -91,7 +91,7 @@ def namespace_cmds(): return namespace -def helm_install_cmds(): +def helm_apply_cmds(): helm_cmds = ( " helm install -f values.yaml " "--namespace default " @@ -130,7 +130,7 @@ def copy_bucket_cmds(): def get_metrics_cmds(): # TODO(gunjanj007): get these parameters from the recipe - get_metrics = ( + cmds = ( # "METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt", "METRICS_FILE=metrics.txt", "python3 process_training_results.py --file" @@ -142,7 +142,7 @@ def get_metrics_cmds(): "gsutil cp - $METRICS_FILE", 'echo "METRICS_FILE=${METRICS_FILE}"', ) - return get_metrics + return cmds def get_aotc_repo(): @@ -170,7 +170,6 @@ def cleanup_cmds(): def get_metrics(metrics_path): - file_content = "" with open(metrics_path + "/metrics.txt", "r", encoding="utf-8") as file: file_content = file.read() @@ -195,7 +194,7 @@ def extract_python_path(last_line): return python_path, python_path_to_bq_writer -def extract_gpus(tmpdir, yaml_file, config_path): +def extract_run_details(tmpdir, yaml_file, config_path): gpus = None batch_size = None optimizer = None From 6b5724bd31813fec5031f137e9beb93a7eb47a6c Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Mon, 16 Dec 2024 17:57:20 -0800 Subject: [PATCH 30/32] fix format --- dags/map_reproducibility/benchmarkdb_utils.py | 3 +-- dags/map_reproducibility/utils.py | 3 +++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dags/map_reproducibility/benchmarkdb_utils.py b/dags/map_reproducibility/benchmarkdb_utils.py index 61a2c78a..24f34767 100644 --- a/dags/map_reproducibility/benchmarkdb_utils.py +++ b/dags/map_reproducibility/benchmarkdb_utils.py @@ -35,7 +35,7 @@ def write_run( mfu: float, tokens_per_second: float, writer_path: str, - run_success: bool = True, # True because if mfu is none, writing to db will fail anyway. + run_success: bool = True, # True because if mfu is none, writing to db will fail anyway. run_type: str = "perf_regression", run_release_status: str = "local", other_metrics_in_json: str = "", @@ -261,4 +261,3 @@ def validate_software_id(software_id: str, is_test: bool = False) -> bool: else: raise ValueError("Could not upload data in run summary table") - diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 08a0c99b..908d1fa4 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -198,6 +198,7 @@ def extract_run_details(tmpdir, yaml_file, config_path): gpus = None batch_size = None optimizer = None + try: yaml_file_path = os.path.join(tmpdir, yaml_file) with open(yaml_file_path, "r", encoding="utf-8") as file: @@ -206,6 +207,7 @@ def extract_run_details(tmpdir, yaml_file, config_path): except (FileNotFoundError, yaml.YAMLError) as e: print(f"Error: {e}") return None + try: config_path = os.path.join(tmpdir, config_path) with open(config_path, "r", encoding="utf-8") as file: @@ -218,4 +220,5 @@ def extract_run_details(tmpdir, yaml_file, config_path): except (FileNotFoundError, yaml.YAMLError) as e: print(f"Error: {e}") return None + return gpus, batch_size, optimizer, precision, seq_length, max_steps From 39c86ad3bcca6591fcabc7c250ebfd1a09166bbf Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Tue, 17 Dec 2024 14:03:06 -0800 Subject: [PATCH 31/32] resolve comments --- dags/map_reproducibility/nemo_gpt3.py | 19 ++++++++++++++++--- dags/map_reproducibility/utils.py | 15 ++++++++------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/dags/map_reproducibility/nemo_gpt3.py b/dags/map_reproducibility/nemo_gpt3.py index 85f7b006..3fbde9e7 100644 --- a/dags/map_reproducibility/nemo_gpt3.py +++ b/dags/map_reproducibility/nemo_gpt3.py @@ -44,6 +44,12 @@ # Run once a day at 2 pm UTC (6 am PST) SCHEDULED_TIME = "0 14 * * *" if composer_env.is_prod_env() else None +MODEL_ID = "gpt3-175b" +BATCH_SIZE = 2048 +NUM_ACCELERATORS = 256 +PRECISION = "fp8" +ACCELERATOR_TYPE = "h100" + @task def run_aotc_workload(): @@ -62,6 +68,7 @@ def run_aotc_workload(): with tempfile.TemporaryDirectory() as tmpdir: hook = SubprocessHook() + # TODO(gunjanjalori): clone recipe first and extract params result = hook.run_command( [ "bash", @@ -78,7 +85,13 @@ def run_aotc_workload(): + helm_apply_cmds() + wait_for_jobs_cmds() + copy_bucket_cmds() - + get_metrics_cmds() + + get_metrics_cmds( + BATCH_SIZE, + NUM_ACCELERATORS, + PRECISION, + MODEL_ID, + ACCELERATOR_TYPE, + ) + cleanup_cmds() + get_aotc_repo() ), @@ -93,7 +106,7 @@ def run_aotc_workload(): print(f"Base path in python: {python_base_path}") print(f"python to bq: {python_path_to_bq_writer}") - yaml_file_path = "reproducible-benchmark-recipes/projects/gpu-recipes/training/a3mega/gpt3-175b/nemo-pretraining-gke/values.yaml" + value_yaml_path = "reproducible-benchmark-recipes/projects/gpu-recipes/training/a3mega/gpt3-175b/nemo-pretraining-gke/values.yaml" config_yaml_path = "reproducible-benchmark-recipes/projects/gpu-recipes/src/frameworks/a3mega/nemo-configs/gpt3-175b-256gpus-fp8.yaml" ( @@ -103,7 +116,7 @@ def run_aotc_workload(): precision, seq_length, max_steps, - ) = extract_run_details(tmpdir, yaml_file_path, config_yaml_path) + ) = extract_run_details(tmpdir, value_yaml_path, config_yaml_path) print( f"batch size: {global_batch_size}, number of nodes: {number_of_nodes}" ) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 908d1fa4..5310f7b1 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -128,17 +128,18 @@ def copy_bucket_cmds(): return copy_bucket_contents -def get_metrics_cmds(): +def get_metrics_cmds( + batch_size, num_accelerators, precision, model_id, accelertator_type +): # TODO(gunjanj007): get these parameters from the recipe cmds = ( - # "METRICS_FILE=$COMPLETE_JOB_NAME/metrics.txt", "METRICS_FILE=metrics.txt", "python3 process_training_results.py --file" - " dllogger.json --batch_size 2048 " - "--num_accelerators 256 " - "--precision fp8 " - "--model_type gpt3-175b " - "--accelerator_type h100 | " + f" dllogger.json --batch_size {batch_size} " + f"--num_accelerators {num_accelerators} " + f"--precision {precision} " + f"--model_type {model_id} " + f"--accelerator_type {accelertator_type} | " "gsutil cp - $METRICS_FILE", 'echo "METRICS_FILE=${METRICS_FILE}"', ) From dbebc8ba83bff0338adb779a84be57ecda915c7b Mon Sep 17 00:00:00 2001 From: Gunjan Jalori Date: Tue, 17 Dec 2024 14:17:45 -0800 Subject: [PATCH 32/32] resolve comments --- dags/map_reproducibility/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dags/map_reproducibility/utils.py b/dags/map_reproducibility/utils.py index 5310f7b1..23026b12 100644 --- a/dags/map_reproducibility/utils.py +++ b/dags/map_reproducibility/utils.py @@ -189,7 +189,6 @@ def get_metrics(metrics_path): def extract_python_path(last_line): - # metrics_file = None python_path = last_line.split("=")[1] python_path_to_bq_writer = python_path + "/benchmark-automation/aotc/src" return python_path, python_path_to_bq_writer