From 4a471f9216107bd2e421e91c2ad87aa8d3bd4ca2 Mon Sep 17 00:00:00 2001 From: Yijia Date: Tue, 7 Jan 2025 13:27:16 -0800 Subject: [PATCH 1/7] use existed instance --- .../configs/trt_llm_inference_config.py | 2 + .../configs/trt_llm_mlperf_v40_config.py | 2 + dags/inference/trt_llm_inference.py | 1 + .../inference/trt_llm_mlperf_v40_inference.py | 1 + xlml/apis/task.py | 74 ++++++++++- xlml/utils/gpu.py | 123 ++++++++++++++++++ 6 files changed, 202 insertions(+), 1 deletion(-) diff --git a/dags/inference/configs/trt_llm_inference_config.py b/dags/inference/configs/trt_llm_inference_config.py index c08a91327..81f008fee 100644 --- a/dags/inference/configs/trt_llm_inference_config.py +++ b/dags/inference/configs/trt_llm_inference_config.py @@ -36,6 +36,7 @@ def get_trt_llm_gpu_config( project: Project, network: str, subnetwork: str, + existed_instance_name: str = None, ) -> task.GpuCreateResourceTask: set_up_cmds = ( "pip install --upgrade pip", @@ -160,4 +161,5 @@ def get_trt_llm_gpu_config( job_test_config, job_gcp_config, job_metric_config, + existed_instance_name=existed_instance_name, ) diff --git a/dags/inference/configs/trt_llm_mlperf_v40_config.py b/dags/inference/configs/trt_llm_mlperf_v40_config.py index d50b000a6..47aceb544 100644 --- a/dags/inference/configs/trt_llm_mlperf_v40_config.py +++ b/dags/inference/configs/trt_llm_mlperf_v40_config.py @@ -37,6 +37,7 @@ def get_trt_llm_mlperf_v40_gpu_config( project: Project, network: str, subnetwork: str, + existed_instance_name: str = None, model_configs: Dict = {}, ) -> task.GpuCreateResourceTask: docker_container_name = "mlperf-inference" @@ -152,4 +153,5 @@ def get_trt_llm_mlperf_v40_gpu_config( job_test_config, job_gcp_config, job_metric_config, + existed_instance_name=existed_instance_name, ) diff --git a/dags/inference/trt_llm_inference.py b/dags/inference/trt_llm_inference.py index 342e10b65..be1ac88b4 100644 --- a/dags/inference/trt_llm_inference.py +++ b/dags/inference/trt_llm_inference.py @@ -46,4 +46,5 @@ project=Project.CLOUD_TPU_INFERENCE_TEST, network=INFERENCE_NETWORKS, subnetwork=H100_INFERENCE_SUBNETWORKS, + existed_instance_name="yijiaj-auto-test-a3", ).run() diff --git a/dags/inference/trt_llm_mlperf_v40_inference.py b/dags/inference/trt_llm_mlperf_v40_inference.py index 2c4d2152c..769d11021 100644 --- a/dags/inference/trt_llm_mlperf_v40_inference.py +++ b/dags/inference/trt_llm_mlperf_v40_inference.py @@ -61,4 +61,5 @@ network=INFERENCE_NETWORKS, subnetwork=H100_INFERENCE_SUBNETWORKS, model_configs=model_configs, + existed_instance_name="yijiaj-auto-test-a3", ).run() diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 5e721cee6..b603ce7c7 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -349,6 +349,7 @@ class GpuCreateResourceTask(BaseTask): task_metric_config: metric configuration (e.g., result gcs path). gpu_create_timeout: timeout when waiting for the GPU vm creation. install_nvidia_drivers: whether to install Nvidia drivers. + existed_instance_name: whether a exited GPU instance shall be used. """ image_project: str @@ -358,16 +359,21 @@ class GpuCreateResourceTask(BaseTask): task_metric_config: Optional[metric_config.MetricConfig] = None gpu_create_timeout: datetime.timedelta = datetime.timedelta(minutes=60) install_nvidia_drivers: bool = False + existed_instance_name: str = None def run(self) -> DAGNode: """Run a test job. Returns: A task group with the following tasks chained: provision, run_model, - post_process, clean_up. + post_process, clean_up if none existed instance is used, or a task + group with run_model and post_process only. """ # piz: We skip the queued resource for GPU for now since there is no queued # resource command for GPU. + if self.existed_instance_name is not None: + return self.run_with_existed_instance() + with TaskGroup( group_id=self.task_test_config.benchmark_id, prefix_group_id=True ) as group: @@ -399,6 +405,58 @@ def run(self) -> DAGNode: provision >> run_model >> post_process >> clean_up return group + def run_with_existed_instance(self) -> DAGNode: + """Run a test job. + + Returns: + A task group with the following tasks chained: run_model and post_process. + """ + with TaskGroup( + group_id=self.task_test_config.benchmark_id, prefix_group_id=True + ) as group: + ( + provision, + ip_address, + ssh_keys, + gcs_location, + ) = self.provision_via_existed_instance() + if ( + self.task_metric_config + and self.task_metric_config.use_runtime_generated_gcs_folder + ): + env_variable = { + f"{metric_config.SshEnvVars.GCS_OUTPUT.name}": gcs_location + } + else: + env_variable = None + post_process = self.post_process(gcs_location) + run_model = self.run_model(ip_address, ssh_keys, env_variable) + clean_up = self.clean_up_existed_instance(ssh_keys) + provision >> run_model >> post_process >> clean_up + return group + + def provision_via_existed_instance( + self, + ) -> Tuple[DAGNode, airflow.XComArg, airflow.XComArg, airflow.XComArg,]: + """Provision an existed GPU accelerator. + + Returns: + A DAG node that will provision a GPU, an XCome value of the ip address + for the host,an XCom value for the SSH keys. + """ + with TaskGroup(group_id="provision") as group: + ssh_keys = ssh.generate_ssh_keys() + ip_address = gpu.get_existed_resource( + instance_name=self.existed_instance_name, + ssh_keys=ssh_keys, + gcp=self.task_gcp_config, + ) + gcs_location = name_format.generate_gcs_folder_location( + self.task_test_config.gcs_subfolder, + self.task_test_config.benchmark_id, + ) + return group, ip_address, ssh_keys, gcs_location + def provision( self, ) -> Tuple[ @@ -513,6 +571,20 @@ def clean_up( resource, project_id, zone ) + def clean_up_existed_instance(self, ssh_keys: airflow.XComArg) -> DAGNode: + """Clean up existed GPU resources - remove new generated ssh_keys. + + Args: + ssh_keys: generated GPU's SSH keys to be removed. + Returns: + A DAG node that cleaned up the ssh_keys. + """ + return gpu.clean_up_ssh_keys( + instance_name=self.existed_instance_name, + ssh_keys=ssh_keys, + gcp=self.task_gcp_config, + ) + # TODO(ranran): This class is big. Let's move it to a new file. @dataclasses.dataclass diff --git a/xlml/utils/gpu.py b/xlml/utils/gpu.py index 877c6059e..743f710ee 100644 --- a/xlml/utils/gpu.py +++ b/xlml/utils/gpu.py @@ -129,6 +129,129 @@ def generate_gpu_name() -> str: return f"gpu-{str(uuid.uuid4())}" +@task +def get_existed_resource( + instance_name: str, + ssh_keys: ssh.SshKeys, + gcp: gcp_config.GCPConfig, +) -> airflow.XComArg: + """Reach a resource node that is already created. + + Args: + instance_name: name of the existed instance. + ssh_keys: airflow.XComArg, + gcp: GCP project/zone configuration. + + Returns: + The ip address of the GPU VM. + """ + instance_client = compute_v1.InstancesClient() + instance_request = compute_v1.GetInstanceRequest( + instance=instance_name, + project=gcp.project_name, + zone=gcp.zone, + ) + instance = instance_client.get(request=instance_request) + logging.info( + f"Resource get status: {instance.status}, {instance.status_message}" + ) + + ip_address = instance.network_interfaces[0].access_configs[0].nat_i_p + logging.info(f"Resouce access: {instance.network_interfaces[0].access_configs[0]}") + metadata = instance.metadata + items = metadata.items or [] + ssh_key_exist = False + for item in metadata.items: + if item.key == "ssh-keys": + ssh_key_exist = True + item.value = ( + item.value + "\n" + f"cloud-ml-auto-solutions:{ssh_keys.public}" + ) + break + if not ssh_key_exist: + items.append({ + "key": "ssh-keys", + "value": f"cloud-ml-auto-solutions:{ssh_keys.public}", + }) + metadata.items = items + metadata_request = compute_v1.SetMetadataInstanceRequest( + instance=instance_name, + project=gcp.project_name, + zone=gcp.zone, + metadata_resource=metadata, + ) + operation = instance_client.set_metadata(request=metadata_request) + if operation.error: + logging.error( + ( + "Error during instance set metadata: [Code:" + f" {operation.http_error_status_code}]:" + f" {operation.http_error_message}" + f" {operation.error}" + ), + ) + raise operation.exception() or RuntimeError(operation.http_error_message) + elif operation.warnings: + logging.warning("Warnings during instance set metadata:\n") + for warning in operation.warnings: + logging.warning(f" - {warning.code}: {warning.message}") + + return ip_address + + +@task +def clean_up_ssh_keys( + instance_name: str, + ssh_keys: ssh.SshKeys, + gcp: gcp_config.GCPConfig, +) -> airflow.XComArg: + """Remove the generated ssh_keys from existed instance. + + Args: + instance_name: name of the existed instance. + ssh_keys: airflow.XComArg, + gcp: GCP project/zone configuration. + """ + instance_client = compute_v1.InstancesClient() + instance_request = compute_v1.GetInstanceRequest( + instance=instance_name, + project=gcp.project_name, + zone=gcp.zone, + ) + instance = instance_client.get(request=instance_request) + logging.info( + f"Resource get status: {instance.status}, {instance.status_message}" + ) + metadata = instance.metadata + for item in metadata.items: + if item.key == "ssh-keys": + item.value = item.value.replace( + f"\ncloud-ml-auto-solutions:{ssh_keys.public}", "" + ) + break + metadata_request = compute_v1.SetMetadataInstanceRequest( + instance=instance_name, + project=gcp.project_name, + zone=gcp.zone, + metadata_resource=metadata, + ) + operation = instance_client.set_metadata(request=metadata_request) + if operation.error: + logging.error( + ( + "Error during instance set metadata: [Code:" + f" {operation.http_error_status_code}]:" + f" {operation.http_error_message}" + f" {operation.error}" + ), + ) + raise operation.exception() or RuntimeError(operation.http_error_message) + elif operation.warnings: + logging.warning("Warnings during instance set metadata:\n") + for warning in operation.warnings: + logging.warning(f" - {warning.code}: {warning.message}") + + @task_group def create_resource( gpu_name: airflow.XComArg, From fcdcd474a78d59f1e6301abf867f78e793a8cba3 Mon Sep 17 00:00:00 2001 From: Yijia Date: Fri, 10 Jan 2025 11:39:46 -0800 Subject: [PATCH 2/7] Use existed instance and test on A3 --- .../configs/trt_llm_mlperf_v40_config.py | 1 + .../configs/trt_llm_mlperf_v41_config.py | 2 ++ dags/inference/trt_llm_inference.py | 1 - .../inference/trt_llm_mlperf_v40_inference.py | 2 +- xlml/apis/task.py | 7 ++++-- xlml/utils/gpu.py | 5 ++-- xlml/utils/metric.py | 24 +++++++++++-------- 7 files changed, 25 insertions(+), 17 deletions(-) diff --git a/dags/inference/configs/trt_llm_mlperf_v40_config.py b/dags/inference/configs/trt_llm_mlperf_v40_config.py index 47aceb544..380f830f2 100644 --- a/dags/inference/configs/trt_llm_mlperf_v40_config.py +++ b/dags/inference/configs/trt_llm_mlperf_v40_config.py @@ -110,6 +110,7 @@ def get_trt_llm_mlperf_v40_gpu_config( docker_cmd = " && ".join(docker_cmds) run_model_cmds = ( "pip install jsonlines", + f"docker restart {docker_container_name}", f'docker exec -i {docker_container_name} /bin/bash -c "{docker_cmd}"', make_jsonl_converter_cmd, "cat jsonl_converter.py", diff --git a/dags/inference/configs/trt_llm_mlperf_v41_config.py b/dags/inference/configs/trt_llm_mlperf_v41_config.py index 7371739a2..7117adc60 100644 --- a/dags/inference/configs/trt_llm_mlperf_v41_config.py +++ b/dags/inference/configs/trt_llm_mlperf_v41_config.py @@ -37,6 +37,7 @@ def get_trt_llm_mlperf_gpu_config( project: Project, network: str, subnetwork: str, + existed_instance_name: str = None, benchmark_configs: Dict = {}, model_parameters: Dict = {}, parameter_positions: Dict = {}, @@ -215,4 +216,5 @@ def get_trt_llm_mlperf_gpu_config( job_test_config, job_gcp_config, job_metric_config, + existed_instance_name=existed_instance_name, ) diff --git a/dags/inference/trt_llm_inference.py b/dags/inference/trt_llm_inference.py index be1ac88b4..342e10b65 100644 --- a/dags/inference/trt_llm_inference.py +++ b/dags/inference/trt_llm_inference.py @@ -46,5 +46,4 @@ project=Project.CLOUD_TPU_INFERENCE_TEST, network=INFERENCE_NETWORKS, subnetwork=H100_INFERENCE_SUBNETWORKS, - existed_instance_name="yijiaj-auto-test-a3", ).run() diff --git a/dags/inference/trt_llm_mlperf_v40_inference.py b/dags/inference/trt_llm_mlperf_v40_inference.py index 769d11021..b98715585 100644 --- a/dags/inference/trt_llm_mlperf_v40_inference.py +++ b/dags/inference/trt_llm_mlperf_v40_inference.py @@ -61,5 +61,5 @@ network=INFERENCE_NETWORKS, subnetwork=H100_INFERENCE_SUBNETWORKS, model_configs=model_configs, - existed_instance_name="yijiaj-auto-test-a3", + existed_instance_name="yijiaj-test-h100x8", ).run() diff --git a/xlml/apis/task.py b/xlml/apis/task.py index b603ce7c7..55a4c60b9 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -429,7 +429,7 @@ def run_with_existed_instance(self) -> DAGNode: } else: env_variable = None - post_process = self.post_process(gcs_location) + post_process = self.post_process(gcs_location, use_existed_instance=True) run_model = self.run_model(ip_address, ssh_keys, env_variable) clean_up = self.clean_up_existed_instance(ssh_keys) provision >> run_model >> post_process >> clean_up @@ -534,7 +534,9 @@ def run_model( ) def post_process( - self, result_location: Optional[airflow.XComArg] = None + self, + result_location: Optional[airflow.XComArg] = None, + use_existed_instance=False, ) -> DAGNode: """Process metrics and metadata, and insert them into BigQuery tables. @@ -549,6 +551,7 @@ def post_process( self.task_metric_config, self.task_gcp_config, folder_location=result_location, + use_existed_instance=use_existed_instance, ) return group diff --git a/xlml/utils/gpu.py b/xlml/utils/gpu.py index 743f710ee..9e54d4594 100644 --- a/xlml/utils/gpu.py +++ b/xlml/utils/gpu.py @@ -153,11 +153,10 @@ def get_existed_resource( ) instance = instance_client.get(request=instance_request) logging.info( - f"Resource get status: {instance.status}, {instance.status_message}" + f"Resource retrieve status: {instance.status}, {instance.status_message}" ) - ip_address = instance.network_interfaces[0].access_configs[0].nat_i_p - logging.info(f"Resouce access: {instance.network_interfaces[0].access_configs[0]}") + ip_address = instance.network_interfaces[0].network_i_p metadata = instance.metadata items = metadata.items or [] ssh_key_exist = False diff --git a/xlml/utils/metric.py b/xlml/utils/metric.py index 06879511d..a6eab3446 100644 --- a/xlml/utils/metric.py +++ b/xlml/utils/metric.py @@ -591,6 +591,7 @@ def get_gke_job_status( def get_gce_job_status( task_test_config: test_config.TestConfig[test_config.Accelerator], use_startup_script: bool, + use_existed_instance: bool, ) -> bigquery.JobStatus: """Get job status for the GCE run. @@ -614,9 +615,10 @@ def get_gce_job_status( task_id=f"{benchmark_id}.provision.create_queued_resource.wait_for_ready_queued_resource" ) elif isinstance(task_test_config, test_config.GpuVmTest): - wait_task = current_dag.get_task( - task_id=f"{benchmark_id}.provision.create_resource.get_ip_address" - ) + if use_existed_instance: + wait_task = current_dag.get_task(task_id=f"{benchmark_id}.provision.get_existed_resource") + else: + wait_task = current_dag.get_task(task_id= f"{benchmark_id}.provision.create_resource.get_ip_address") else: raise NotImplementedError( f"Unable to get task for {type(task_test_config.accelerator)}." @@ -631,12 +633,13 @@ def get_gce_job_status( return bigquery.JobStatus.MISSED # check setup status to see if setup step is successful - setup_task = current_dag.get_task(task_id=f"{benchmark_id}.provision.setup") - setup_ti = TaskInstance(setup_task, execution_date) - setup_state = setup_ti.current_state() - if setup_state == TaskState.FAILED.value: - logging.info("The setup state is failed, and the job status is failed.") - return bigquery.JobStatus.FAILED + if not use_existed_instance: + setup_task = current_dag.get_task(task_id=f"{benchmark_id}.provision.setup") + setup_ti = TaskInstance(setup_task, execution_date) + setup_state = setup_ti.current_state() + if setup_state == TaskState.FAILED.value: + logging.info("The setup state is failed, and the job status is failed.") + return bigquery.JobStatus.FAILED # check run_model status to see if run_model step is successful run_model_task = current_dag.get_task(task_id=f"{benchmark_id}.run_model") @@ -692,6 +695,7 @@ def process_metrics( task_metric_config: Optional[metric_config.MetricConfig], task_gcp_config: gcp_config.GCPConfig, use_startup_script: bool = False, + use_existed_instance: bool = False, folder_location: Optional[str] = None, ) -> None: benchmark_id = task_test_config.benchmark_id @@ -771,7 +775,7 @@ def process_metrics( elif isinstance(task_test_config, test_config.GpuGkeTest): test_job_status = get_gke_job_status(task_test_config) else: - test_job_status = get_gce_job_status(task_test_config, use_startup_script) + test_job_status = get_gce_job_status(task_test_config, use_startup_script, use_existed_instance) for index in range(len(metadata_history_rows_list)): job_history_row = bigquery.JobHistoryRow( From da236edcbcfb8f0613fdc6925324c3e7ba83f112 Mon Sep 17 00:00:00 2001 From: Yijia Date: Fri, 10 Jan 2025 11:50:13 -0800 Subject: [PATCH 3/7] format --- xlml/utils/metric.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/xlml/utils/metric.py b/xlml/utils/metric.py index a6eab3446..14f970647 100644 --- a/xlml/utils/metric.py +++ b/xlml/utils/metric.py @@ -616,9 +616,13 @@ def get_gce_job_status( ) elif isinstance(task_test_config, test_config.GpuVmTest): if use_existed_instance: - wait_task = current_dag.get_task(task_id=f"{benchmark_id}.provision.get_existed_resource") + wait_task = current_dag.get_task( + task_id=f"{benchmark_id}.provision.get_existed_resource" + ) else: - wait_task = current_dag.get_task(task_id= f"{benchmark_id}.provision.create_resource.get_ip_address") + wait_task = current_dag.get_task( + task_id=f"{benchmark_id}.provision.create_resource.get_ip_address" + ) else: raise NotImplementedError( f"Unable to get task for {type(task_test_config.accelerator)}." @@ -634,7 +638,9 @@ def get_gce_job_status( # check setup status to see if setup step is successful if not use_existed_instance: - setup_task = current_dag.get_task(task_id=f"{benchmark_id}.provision.setup") + setup_task = current_dag.get_task( + task_id=f"{benchmark_id}.provision.setup" + ) setup_ti = TaskInstance(setup_task, execution_date) setup_state = setup_ti.current_state() if setup_state == TaskState.FAILED.value: @@ -775,7 +781,9 @@ def process_metrics( elif isinstance(task_test_config, test_config.GpuGkeTest): test_job_status = get_gke_job_status(task_test_config) else: - test_job_status = get_gce_job_status(task_test_config, use_startup_script, use_existed_instance) + test_job_status = get_gce_job_status( + task_test_config, use_startup_script, use_existed_instance + ) for index in range(len(metadata_history_rows_list)): job_history_row = bigquery.JobHistoryRow( From 3f31dc1dfa752479cc0f3e0f66a9da08fc651016 Mon Sep 17 00:00:00 2001 From: Yijia Date: Fri, 10 Jan 2025 13:22:06 -0800 Subject: [PATCH 4/7] nit --- xlml/utils/gpu.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xlml/utils/gpu.py b/xlml/utils/gpu.py index 9e54d4594..36ae94d2b 100644 --- a/xlml/utils/gpu.py +++ b/xlml/utils/gpu.py @@ -198,7 +198,7 @@ def get_existed_resource( return ip_address -@task +@task(trigger_rule="all_done") def clean_up_ssh_keys( instance_name: str, ssh_keys: ssh.SshKeys, From 8a5d724d8953ca7e15ac82980070d6eb2a8f0d80 Mon Sep 17 00:00:00 2001 From: Yijia Date: Mon, 13 Jan 2025 14:56:59 -0800 Subject: [PATCH 5/7] nit --- .../configs/trt_llm_inference_config.py | 4 +- .../configs/trt_llm_mlperf_v40_config.py | 4 +- .../configs/trt_llm_mlperf_v41_config.py | 4 +- .../inference/trt_llm_mlperf_v40_inference.py | 2 +- xlml/apis/task.py | 43 +++++++++---------- xlml/utils/gpu.py | 8 ++-- xlml/utils/metric.py | 21 ++++++--- 7 files changed, 47 insertions(+), 39 deletions(-) diff --git a/dags/inference/configs/trt_llm_inference_config.py b/dags/inference/configs/trt_llm_inference_config.py index 81f008fee..ab55f30f1 100644 --- a/dags/inference/configs/trt_llm_inference_config.py +++ b/dags/inference/configs/trt_llm_inference_config.py @@ -36,7 +36,7 @@ def get_trt_llm_gpu_config( project: Project, network: str, subnetwork: str, - existed_instance_name: str = None, + existing_instance_name: str = None, ) -> task.GpuCreateResourceTask: set_up_cmds = ( "pip install --upgrade pip", @@ -161,5 +161,5 @@ def get_trt_llm_gpu_config( job_test_config, job_gcp_config, job_metric_config, - existed_instance_name=existed_instance_name, + existing_instance_name=existing_instance_name, ) diff --git a/dags/inference/configs/trt_llm_mlperf_v40_config.py b/dags/inference/configs/trt_llm_mlperf_v40_config.py index 380f830f2..3ca612c6c 100644 --- a/dags/inference/configs/trt_llm_mlperf_v40_config.py +++ b/dags/inference/configs/trt_llm_mlperf_v40_config.py @@ -37,7 +37,7 @@ def get_trt_llm_mlperf_v40_gpu_config( project: Project, network: str, subnetwork: str, - existed_instance_name: str = None, + existing_instance_name: str = None, model_configs: Dict = {}, ) -> task.GpuCreateResourceTask: docker_container_name = "mlperf-inference" @@ -154,5 +154,5 @@ def get_trt_llm_mlperf_v40_gpu_config( job_test_config, job_gcp_config, job_metric_config, - existed_instance_name=existed_instance_name, + existing_instance_name=existing_instance_name, ) diff --git a/dags/inference/configs/trt_llm_mlperf_v41_config.py b/dags/inference/configs/trt_llm_mlperf_v41_config.py index 7117adc60..8d7b504ca 100644 --- a/dags/inference/configs/trt_llm_mlperf_v41_config.py +++ b/dags/inference/configs/trt_llm_mlperf_v41_config.py @@ -37,7 +37,7 @@ def get_trt_llm_mlperf_gpu_config( project: Project, network: str, subnetwork: str, - existed_instance_name: str = None, + existing_instance_name: str = None, benchmark_configs: Dict = {}, model_parameters: Dict = {}, parameter_positions: Dict = {}, @@ -216,5 +216,5 @@ def get_trt_llm_mlperf_gpu_config( job_test_config, job_gcp_config, job_metric_config, - existed_instance_name=existed_instance_name, + existing_instance_name=existing_instance_name, ) diff --git a/dags/inference/trt_llm_mlperf_v40_inference.py b/dags/inference/trt_llm_mlperf_v40_inference.py index b98715585..f3dc1bef2 100644 --- a/dags/inference/trt_llm_mlperf_v40_inference.py +++ b/dags/inference/trt_llm_mlperf_v40_inference.py @@ -61,5 +61,5 @@ network=INFERENCE_NETWORKS, subnetwork=H100_INFERENCE_SUBNETWORKS, model_configs=model_configs, - existed_instance_name="yijiaj-test-h100x8", + existing_instance_name="yijiaj-test-h100x8", ).run() diff --git a/xlml/apis/task.py b/xlml/apis/task.py index 55a4c60b9..fbd5c38fd 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -349,7 +349,7 @@ class GpuCreateResourceTask(BaseTask): task_metric_config: metric configuration (e.g., result gcs path). gpu_create_timeout: timeout when waiting for the GPU vm creation. install_nvidia_drivers: whether to install Nvidia drivers. - existed_instance_name: whether a exited GPU instance shall be used. + existing_instance_name: whether a existing GPU instance shall be used. """ image_project: str @@ -359,20 +359,19 @@ class GpuCreateResourceTask(BaseTask): task_metric_config: Optional[metric_config.MetricConfig] = None gpu_create_timeout: datetime.timedelta = datetime.timedelta(minutes=60) install_nvidia_drivers: bool = False - existed_instance_name: str = None + existing_instance_name: str = None def run(self) -> DAGNode: """Run a test job. Returns: A task group with the following tasks chained: provision, run_model, - post_process, clean_up if none existed instance is used, or a task - group with run_model and post_process only. + post_process, clean_up. """ # piz: We skip the queued resource for GPU for now since there is no queued # resource command for GPU. - if self.existed_instance_name is not None: - return self.run_with_existed_instance() + if self.existing_instance_name is not None: + return self.run_with_existing_instance() with TaskGroup( group_id=self.task_test_config.benchmark_id, prefix_group_id=True @@ -405,11 +404,11 @@ def run(self) -> DAGNode: provision >> run_model >> post_process >> clean_up return group - def run_with_existed_instance(self) -> DAGNode: - """Run a test job. + def run_with_existing_instance(self) -> DAGNode: + """Run a test job via existing instance. Returns: - A task group with the following tasks chained: run_model and post_process. + A task group with the following tasks chained: provision, run_model and post_process, clean_up. """ with TaskGroup( group_id=self.task_test_config.benchmark_id, prefix_group_id=True @@ -419,7 +418,7 @@ def run_with_existed_instance(self) -> DAGNode: ip_address, ssh_keys, gcs_location, - ) = self.provision_via_existed_instance() + ) = self.provision_via_existing_instance() if ( self.task_metric_config and self.task_metric_config.use_runtime_generated_gcs_folder @@ -429,16 +428,16 @@ def run_with_existed_instance(self) -> DAGNode: } else: env_variable = None - post_process = self.post_process(gcs_location, use_existed_instance=True) + post_process = self.post_process(gcs_location, use_existing_instance=True) run_model = self.run_model(ip_address, ssh_keys, env_variable) - clean_up = self.clean_up_existed_instance(ssh_keys) + clean_up = self.clean_up_existing_instance(ssh_keys) provision >> run_model >> post_process >> clean_up return group - def provision_via_existed_instance( + def provision_via_existing_instance( self, ) -> Tuple[DAGNode, airflow.XComArg, airflow.XComArg, airflow.XComArg,]: - """Provision an existed GPU accelerator. + """Provision an existing GPU accelerator. Returns: A DAG node that will provision a GPU, an XCome value of the ip address @@ -446,8 +445,8 @@ def provision_via_existed_instance( """ with TaskGroup(group_id="provision") as group: ssh_keys = ssh.generate_ssh_keys() - ip_address = gpu.get_existed_resource( - instance_name=self.existed_instance_name, + ip_address = gpu.get_existing_resource( + instance_name=self.existing_instance_name, ssh_keys=ssh_keys, gcp=self.task_gcp_config, ) @@ -536,7 +535,7 @@ def run_model( def post_process( self, result_location: Optional[airflow.XComArg] = None, - use_existed_instance=False, + use_existing_instance=False, ) -> DAGNode: """Process metrics and metadata, and insert them into BigQuery tables. @@ -551,7 +550,7 @@ def post_process( self.task_metric_config, self.task_gcp_config, folder_location=result_location, - use_existed_instance=use_existed_instance, + use_existing_instance=use_existing_instance, ) return group @@ -574,16 +573,16 @@ def clean_up( resource, project_id, zone ) - def clean_up_existed_instance(self, ssh_keys: airflow.XComArg) -> DAGNode: - """Clean up existed GPU resources - remove new generated ssh_keys. + def clean_up_existing_instance(self, ssh_keys: airflow.XComArg) -> DAGNode: + """Clean up existing GPU resources - remove the one-time use generated ssh_keys. Args: - ssh_keys: generated GPU's SSH keys to be removed. + ssh_keys: generated GPU's one-time use SSH keys to be removed. Returns: A DAG node that cleaned up the ssh_keys. """ return gpu.clean_up_ssh_keys( - instance_name=self.existed_instance_name, + instance_name=self.existing_instance_name, ssh_keys=ssh_keys, gcp=self.task_gcp_config, ) diff --git a/xlml/utils/gpu.py b/xlml/utils/gpu.py index 36ae94d2b..e19a82d7c 100644 --- a/xlml/utils/gpu.py +++ b/xlml/utils/gpu.py @@ -130,7 +130,7 @@ def generate_gpu_name() -> str: @task -def get_existed_resource( +def get_existing_resource( instance_name: str, ssh_keys: ssh.SshKeys, gcp: gcp_config.GCPConfig, @@ -138,7 +138,7 @@ def get_existed_resource( """Reach a resource node that is already created. Args: - instance_name: name of the existed instance. + instance_name: name of the existing instance. ssh_keys: airflow.XComArg, gcp: GCP project/zone configuration. @@ -204,10 +204,10 @@ def clean_up_ssh_keys( ssh_keys: ssh.SshKeys, gcp: gcp_config.GCPConfig, ) -> airflow.XComArg: - """Remove the generated ssh_keys from existed instance. + """Remove the generated one-time use ssh_keys from existing instance. Args: - instance_name: name of the existed instance. + instance_name: name of the existing instance. ssh_keys: airflow.XComArg, gcp: GCP project/zone configuration. """ diff --git a/xlml/utils/metric.py b/xlml/utils/metric.py index 14f970647..f0bad340b 100644 --- a/xlml/utils/metric.py +++ b/xlml/utils/metric.py @@ -591,7 +591,7 @@ def get_gke_job_status( def get_gce_job_status( task_test_config: test_config.TestConfig[test_config.Accelerator], use_startup_script: bool, - use_existed_instance: bool, + use_existing_instance: bool, ) -> bigquery.JobStatus: """Get job status for the GCE run. @@ -615,9 +615,9 @@ def get_gce_job_status( task_id=f"{benchmark_id}.provision.create_queued_resource.wait_for_ready_queued_resource" ) elif isinstance(task_test_config, test_config.GpuVmTest): - if use_existed_instance: + if use_existing_instance: wait_task = current_dag.get_task( - task_id=f"{benchmark_id}.provision.get_existed_resource" + task_id=f"{benchmark_id}.provision.get_existing_resource" ) else: wait_task = current_dag.get_task( @@ -637,7 +637,16 @@ def get_gce_job_status( return bigquery.JobStatus.MISSED # check setup status to see if setup step is successful - if not use_existed_instance: + if use_existing_instance: + get_instance_task = current_dag.get_task( + task_id=f"{benchmark_id}.provision.get_existing_resource" + ) + get_instance_ti = TaskInstance(get_instance_task, execution_date) + get_instance_state = get_instance_ti.current_state() + if get_instance_state == TaskState.FAILED.value: + logging.info("The getting existing instance state is failed, and the job status is failed.") + return bigquery.JobStatus.FAILED + else: setup_task = current_dag.get_task( task_id=f"{benchmark_id}.provision.setup" ) @@ -701,7 +710,7 @@ def process_metrics( task_metric_config: Optional[metric_config.MetricConfig], task_gcp_config: gcp_config.GCPConfig, use_startup_script: bool = False, - use_existed_instance: bool = False, + use_existing_instance: bool = False, folder_location: Optional[str] = None, ) -> None: benchmark_id = task_test_config.benchmark_id @@ -782,7 +791,7 @@ def process_metrics( test_job_status = get_gke_job_status(task_test_config) else: test_job_status = get_gce_job_status( - task_test_config, use_startup_script, use_existed_instance + task_test_config, use_startup_script, use_existing_instance ) for index in range(len(metadata_history_rows_list)): From fa4923e7ba70c6a793cc1ae76212d47c645a5344 Mon Sep 17 00:00:00 2001 From: Yijia Date: Mon, 13 Jan 2025 15:24:50 -0800 Subject: [PATCH 6/7] format --- xlml/utils/metric.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/xlml/utils/metric.py b/xlml/utils/metric.py index f0bad340b..fe73581bc 100644 --- a/xlml/utils/metric.py +++ b/xlml/utils/metric.py @@ -644,7 +644,9 @@ def get_gce_job_status( get_instance_ti = TaskInstance(get_instance_task, execution_date) get_instance_state = get_instance_ti.current_state() if get_instance_state == TaskState.FAILED.value: - logging.info("The getting existing instance state is failed, and the job status is failed.") + logging.info( + "The getting existing instance state is failed, and the job status is failed." + ) return bigquery.JobStatus.FAILED else: setup_task = current_dag.get_task( From ccb74405e4a5cc571dd7545d9194a5f31b5b014c Mon Sep 17 00:00:00 2001 From: Yijia Date: Wed, 15 Jan 2025 09:43:30 -0800 Subject: [PATCH 7/7] nit and wrap --- dags/inference/configs/trt_llm_inference_config.py | 1 + dags/inference/configs/trt_llm_mlperf_v40_config.py | 1 + dags/inference/configs/trt_llm_mlperf_v41_config.py | 1 + .../configs/pytorchxla_torchbench_config.py | 1 + .../configs/vllm/vllm_benchmark_config.py | 1 + xlml/apis/task.py | 6 ++---- xlml/apis/test_config.py | 2 ++ xlml/utils/metric.py | 13 ++++++------- 8 files changed, 15 insertions(+), 11 deletions(-) diff --git a/dags/inference/configs/trt_llm_inference_config.py b/dags/inference/configs/trt_llm_inference_config.py index ab55f30f1..8bdfeb670 100644 --- a/dags/inference/configs/trt_llm_inference_config.py +++ b/dags/inference/configs/trt_llm_inference_config.py @@ -142,6 +142,7 @@ def get_trt_llm_gpu_config( timeout=datetime.timedelta(minutes=time_out_in_min), task_owner=test_owner.YIJIA_J, gcs_subfolder=f"{GCS_SUBFOLDER_PREFIX}/trt_llm", + use_existing_instance=existing_instance_name is not None, ) job_gcp_config = gcp_config.GCPConfig( diff --git a/dags/inference/configs/trt_llm_mlperf_v40_config.py b/dags/inference/configs/trt_llm_mlperf_v40_config.py index 3ca612c6c..3844c8548 100644 --- a/dags/inference/configs/trt_llm_mlperf_v40_config.py +++ b/dags/inference/configs/trt_llm_mlperf_v40_config.py @@ -135,6 +135,7 @@ def get_trt_llm_mlperf_v40_gpu_config( timeout=datetime.timedelta(minutes=time_out_in_min), task_owner=test_owner.YIJIA_J, gcs_subfolder=f"{GCS_SUBFOLDER_PREFIX}/trt_llm_mlperf_v40", + use_existing_instance=existing_instance_name is not None, ) job_gcp_config = gcp_config.GCPConfig( diff --git a/dags/inference/configs/trt_llm_mlperf_v41_config.py b/dags/inference/configs/trt_llm_mlperf_v41_config.py index 8d7b504ca..e75dccfa5 100644 --- a/dags/inference/configs/trt_llm_mlperf_v41_config.py +++ b/dags/inference/configs/trt_llm_mlperf_v41_config.py @@ -197,6 +197,7 @@ def get_trt_llm_mlperf_gpu_config( timeout=datetime.timedelta(minutes=time_out_in_min), task_owner=test_owner.YIJIA_J, gcs_subfolder=f"{GCS_SUBFOLDER_PREFIX}/trt_llm_mlperf_v41", + use_existing_instance=existing_instance_name is not None, ) job_gcp_config = gcp_config.GCPConfig( diff --git a/dags/pytorch_xla/configs/pytorchxla_torchbench_config.py b/dags/pytorch_xla/configs/pytorchxla_torchbench_config.py index 2a632fbe0..235a6fe76 100644 --- a/dags/pytorch_xla/configs/pytorchxla_torchbench_config.py +++ b/dags/pytorch_xla/configs/pytorchxla_torchbench_config.py @@ -475,6 +475,7 @@ def get_torchbench_gpu_config( timeout=datetime.timedelta(minutes=time_out_in_min), task_owner=test_owner.PEI_Z, gcs_subfolder=f"{GCS_SUBFOLDER_PREFIX}/torchbench", + use_existing_instance=False, ) job_metric_config = metric_config.MetricConfig( diff --git a/dags/solutions_team/configs/vllm/vllm_benchmark_config.py b/dags/solutions_team/configs/vllm/vllm_benchmark_config.py index 5623428fd..4d1d46f9d 100644 --- a/dags/solutions_team/configs/vllm/vllm_benchmark_config.py +++ b/dags/solutions_team/configs/vllm/vllm_benchmark_config.py @@ -186,6 +186,7 @@ def get_gpu_vllm_gce_config( timeout=datetime.timedelta(minutes=time_out_in_min), task_owner=test_owner.RICHARD_L, gcs_subfolder=f"{GCS_SUBFOLDER_PREFIX}/vllm_benchmark", + use_existing_instance=False, ) job_gcp_config = gcp_config.GCPConfig( diff --git a/xlml/apis/task.py b/xlml/apis/task.py index fbd5c38fd..40f0443b5 100644 --- a/xlml/apis/task.py +++ b/xlml/apis/task.py @@ -349,7 +349,7 @@ class GpuCreateResourceTask(BaseTask): task_metric_config: metric configuration (e.g., result gcs path). gpu_create_timeout: timeout when waiting for the GPU vm creation. install_nvidia_drivers: whether to install Nvidia drivers. - existing_instance_name: whether a existing GPU instance shall be used. + existing_instance_name: whether an existing GPU instance shall be used. """ image_project: str @@ -428,7 +428,7 @@ def run_with_existing_instance(self) -> DAGNode: } else: env_variable = None - post_process = self.post_process(gcs_location, use_existing_instance=True) + post_process = self.post_process(gcs_location) run_model = self.run_model(ip_address, ssh_keys, env_variable) clean_up = self.clean_up_existing_instance(ssh_keys) provision >> run_model >> post_process >> clean_up @@ -535,7 +535,6 @@ def run_model( def post_process( self, result_location: Optional[airflow.XComArg] = None, - use_existing_instance=False, ) -> DAGNode: """Process metrics and metadata, and insert them into BigQuery tables. @@ -550,7 +549,6 @@ def post_process( self.task_metric_config, self.task_gcp_config, folder_location=result_location, - use_existing_instance=use_existing_instance, ) return group diff --git a/xlml/apis/test_config.py b/xlml/apis/test_config.py index e6d724cc5..1704b95a9 100644 --- a/xlml/apis/test_config.py +++ b/xlml/apis/test_config.py @@ -224,11 +224,13 @@ class GpuVmTest(TestConfig[Gpu]): test_name: Unique name for this test/model. set_up_cmds: List of commands to run once when GPU is created. run_model_cmds: List of commands to run the model under test. + use_existing_instance: Whether to use an existing GPU instance. """ test_name: str set_up_cmds: Iterable[str] run_model_cmds: Iterable[str] + use_existing_instance: bool @property def benchmark_id(self) -> str: diff --git a/xlml/utils/metric.py b/xlml/utils/metric.py index fe73581bc..26010953a 100644 --- a/xlml/utils/metric.py +++ b/xlml/utils/metric.py @@ -591,7 +591,6 @@ def get_gke_job_status( def get_gce_job_status( task_test_config: test_config.TestConfig[test_config.Accelerator], use_startup_script: bool, - use_existing_instance: bool, ) -> bigquery.JobStatus: """Get job status for the GCE run. @@ -615,7 +614,7 @@ def get_gce_job_status( task_id=f"{benchmark_id}.provision.create_queued_resource.wait_for_ready_queued_resource" ) elif isinstance(task_test_config, test_config.GpuVmTest): - if use_existing_instance: + if task_test_config.use_existing_instance: wait_task = current_dag.get_task( task_id=f"{benchmark_id}.provision.get_existing_resource" ) @@ -637,7 +636,10 @@ def get_gce_job_status( return bigquery.JobStatus.MISSED # check setup status to see if setup step is successful - if use_existing_instance: + if ( + hasattr(task_test_config, "use_existing_instance") + and task_test_config.use_existing_instance + ): get_instance_task = current_dag.get_task( task_id=f"{benchmark_id}.provision.get_existing_resource" ) @@ -712,7 +714,6 @@ def process_metrics( task_metric_config: Optional[metric_config.MetricConfig], task_gcp_config: gcp_config.GCPConfig, use_startup_script: bool = False, - use_existing_instance: bool = False, folder_location: Optional[str] = None, ) -> None: benchmark_id = task_test_config.benchmark_id @@ -792,9 +793,7 @@ def process_metrics( elif isinstance(task_test_config, test_config.GpuGkeTest): test_job_status = get_gke_job_status(task_test_config) else: - test_job_status = get_gce_job_status( - task_test_config, use_startup_script, use_existing_instance - ) + test_job_status = get_gce_job_status(task_test_config, use_startup_script) for index in range(len(metadata_history_rows_list)): job_history_row = bigquery.JobHistoryRow(