From 92c8fb2ab1d59f168c1e7a5159780c09276bba44 Mon Sep 17 00:00:00 2001 From: chandrasekhard2 <98771505+chandrasekhard2@users.noreply.github.com> Date: Mon, 23 Dec 2024 10:04:39 -0800 Subject: [PATCH] =?UTF-8?q?DLRM=20v2=20is=20too=20big=20for=20single=20hos?= =?UTF-8?q?t.=20Reverting=20it=20to=20DLRM=20v1=20for=20singl=E2=80=A6=20(?= =?UTF-8?q?#517)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * DLRM v2 is too big for single host. Reverting it to DLRM v1 for single host and DLRM v2 for multihost * minor changes in function names --------- Co-authored-by: Richard Liu <39319471+richardsliu@users.noreply.github.com> --- ...lutionsteam_tf_nightly_supported_config.py | 171 +++++++++++++++++- .../solutionsteam_tf_nightly_supported.py | 10 +- 2 files changed, 173 insertions(+), 8 deletions(-) diff --git a/dags/solutions_team/configs/tensorflow/solutionsteam_tf_nightly_supported_config.py b/dags/solutions_team/configs/tensorflow/solutionsteam_tf_nightly_supported_config.py index b9e249b0..71832bcd 100644 --- a/dags/solutions_team/configs/tensorflow/solutionsteam_tf_nightly_supported_config.py +++ b/dags/solutions_team/configs/tensorflow/solutionsteam_tf_nightly_supported_config.py @@ -53,7 +53,10 @@ def get_tf_keras_config( # Add default_var to pass DAG check # TODO(ranran): replace Variable.get() to XCOM when it applies tpu_name = Variable.get(benchmark_id, default_var=None) if is_pod else "local" - env_variable = common.export_env_variables(tpu_name, is_pod, is_pjrt) + env_variable = ( + common.export_env_variables(tpu_name, is_pod, is_pjrt) + + " TF_CPP_MIN_LOG_LEVEL=0 TPU_STDERR_LOG_LEVEL=0 TPU_MIN_LOG_LEVEL=0 " + ) skipped_tag = "--tags=-failspod" if is_pod else "" run_model_cmds = ( "sudo chmod -R 777 /tmp/", @@ -185,7 +188,171 @@ def get_tf_resnet_config( ) -def get_tf_dlrm_config( +def get_tf_dlrm_v1_config( + tpu_version: TpuVersion, + tpu_cores: int, + tpu_zone: str, + time_out_in_min: int, + bottom_mlp: List[int], + embedding_dim: int, + train_steps: int, + extraFlags: str = "", + project_name: str = Project.CLOUD_ML_AUTO_SOLUTIONS.value, + runtime_version: str = RuntimeVersion.TPU_VM_TF_NIGHTLY.value, + is_pod: bool = False, + is_pjrt: bool = True, + criteo_dir: str = gcs_bucket.CRITEO_DIR, + network: str = "default", + subnetwork: str = "default", + global_batch_size=16384, +): + job_gcp_config = gcp_config.GCPConfig( + project_name=project_name, + zone=tpu_zone, + dataset_name=metric_config.DatasetOption.XLML_DATASET, + ) + + # Add default_var to pass DAG check + # TODO(ranran): replace Variable.get() to XCOM when it applies + test_name = "tf_dlrm_criteo" + benchmark_id = f"{test_name}-v{tpu_version.value}-{tpu_cores}" + tpu_name = Variable.get(benchmark_id, default_var=None) if is_pod else "local" + is_v5p = tpu_version == TpuVersion.V5P + env_variable = ( + common.export_env_variables(tpu_name, is_pod, is_pjrt, is_v5p_sc=is_v5p) + + " TF_CPP_MIN_LOG_LEVEL=0 TPU_STDERR_LOG_LEVEL=0 TPU_MIN_LOG_LEVEL=0 " + ) + + set_up_cmds = common.set_up_tensorflow_models() + common.install_tf() + if is_pod: + if not is_pjrt: + set_up_cmds += common.set_up_se() + else: + set_up_cmds += common.set_up_pjrt() + params_override = { + "runtime": {"distribution_strategy": "tpu"}, + "task": { + "use_synthetic_data": False, + "train_data": { + "input_path": "gs://agagik-dev/data/criteo/terabyte_processed_golden_shuffled/train/*", + "global_batch_size": global_batch_size, + }, + "validation_data": { + "input_path": "gs://agagik-dev/data/criteo/terabyte_processed_golden_shuffled/eval/*", + "global_batch_size": global_batch_size, + }, + "model": { + "num_dense_features": 13, + "bottom_mlp": [512, 256, 16], + "embedding_dim": 16, + "top_mlp": [1024, 1024, 512, 256, 1], + "interaction": "dot", + "vocab_sizes": [ + 39884406, + 39043, + 17289, + 7420, + 20263, + 3, + 7120, + 1543, + 63, + 38532951, + 2953546, + 403346, + 10, + 2208, + 11938, + 155, + 4, + 976, + 14, + 39979771, + 25641295, + 39664984, + 585935, + 12972, + 108, + 36, + ], + }, + }, + "trainer": { + "optimizer_config": { + "embedding_optimizer": "SGD", + "dense_optimizer": "SGD", + "lr_config": { + "decay_exp": 2, + "decay_start_steps": 70000, + "decay_steps": 30000, + "learning_rate": 0.025, + "warmup_steps": 8000, + }, + "dense_sgd_config": { + "decay_exp": 2, + "decay_start_steps": 70000, + "decay_steps": 30000, + "learning_rate": 0.00025, + "warmup_steps": 8000, + }, + }, + "use_orbit": True, + "validation_interval": 1000, + "checkpoint_interval": 2000, + "validation_steps": 5000, + "train_steps": 10000, + "steps_per_loop": 1000, + "summary_interval": 1000, + "train_tf_function": True, + "train_tf_while_loop": True, + "pipeline_sparse_and_dense_execution": True, + }, + } + + model_dir = "/tmp" + + params_override["trainer"]["pipeline_sparse_and_dense_execution"] = "true" + tpu_id = Variable.get(benchmark_id, default_var=None) + # TODO (ericlefort): Replace the model_dir with this line when the var is available + # model_dir = metric_config.SshEnvVars.GCS_OUTPUT.value + f"/dlrm/v5p/{benchmark_id}" + epoch = time.time() + model_dir = f"{gcs_bucket.BASE_OUTPUT_DIR}/{test_owner.Team.SOLUTIONS_TEAM.value}/dlrm/{benchmark_id}/{epoch}" + + # Clean out the prior checkpoint if it exists + run_model_cmds = ( + ( + f'cd /usr/share/tpu/models && {env_variable} && LIBTPU_INIT_ARGS="--xla_sc_splitting_along_feature_dimension=auto --copy_with_dynamic_shape_op_output_pjrt_buffer=true --xla_tpu_embedding_table_oblongness_threshold=1 --xla_tpu_enable_all_experimental_scheduler_features=true"' + " numactl --cpunodebind=0 --membind=0 python3 official/recommendation/ranking/train.py" + f" --model_dir={model_dir} {extraFlags}" + f" --params_override='{params_override}'" + ), + ) + + job_test_config = test_config.TpuVmTest( + test_config.Tpu( + version=tpu_version, + cores=tpu_cores, + runtime_version=runtime_version, + reserved=True, + network=network, + subnetwork=subnetwork, + ), + test_name=test_name, + set_up_cmds=set_up_cmds, + run_model_cmds=run_model_cmds, + timeout=datetime.timedelta(minutes=time_out_in_min), + task_owner=test_owner.CHANDRA_D, + ) + + return task.run_queued_resource_test( + task_test_config=job_test_config, + task_gcp_config=job_gcp_config, + tpu_name_env_var=is_pod, + all_workers=False, + ) + + +def get_tf_dlrm_v2_config( tpu_version: TpuVersion, tpu_cores: int, tpu_zone: str, diff --git a/dags/solutions_team/solutionsteam_tf_nightly_supported.py b/dags/solutions_team/solutionsteam_tf_nightly_supported.py index 2e7c64e0..e6c59889 100644 --- a/dags/solutions_team/solutionsteam_tf_nightly_supported.py +++ b/dags/solutions_team/solutionsteam_tf_nightly_supported.py @@ -114,9 +114,8 @@ is_pod=True, runtime_version=RuntimeVersion.TPU_VM_TF_NIGHTLY_POD.value, ) - - embedding_dim = 16 - tf_dlrm_v5p_8 = tf_config.get_tf_dlrm_config( + embedding_dim = 8 + tf_dlrm_v5p_8 = tf_config.get_tf_dlrm_v1_config( project_name=Project.TPU_PROD_ENV_AUTOMATED.value, tpu_version=TpuVersion.V5P, tpu_cores=8, @@ -131,11 +130,10 @@ network=V5_NETWORKS, subnetwork=V5P_SUBNETWORKS, runtime_version=RuntimeVersion.V2_ALPHA_TPUV5.value, - global_batch_size=8192, + global_batch_size=4096, ) - embedding_dim = 128 - tf_dlrm_v5p_32 = tf_config.get_tf_dlrm_config( + tf_dlrm_v5p_32 = tf_config.get_tf_dlrm_v2_config( project_name=Project.TPU_PROD_ENV_AUTOMATED.value, tpu_version=TpuVersion.V5P, tpu_cores=32,