diff --git a/components/google-cloud/RELEASE.md b/components/google-cloud/RELEASE.md index 20fc219125d8..416a0f5bab87 100644 --- a/components/google-cloud/RELEASE.md +++ b/components/google-cloud/RELEASE.md @@ -8,6 +8,7 @@ * Add check that component in preview.custom_job.utils.create_custom_training_job_from_component doesn't have any parameters that share names with any custom job fields * Add dynamic machine spec support for `preview.custom_job.utils.create_custom_training_job_from_component`. * Add preflight validations for LLM text generation pipeline. +* Add dynamic support for boot_disk_type, boot_disk_size, nfs_mounts, env in `preview.custom_job.utils.create_custom_training_job_from_component`. ## Release 2.15.0 * Add Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline`. diff --git a/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py b/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py index e56548c002fe..c189b5182ac4 100644 --- a/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py +++ b/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py @@ -32,6 +32,15 @@ def insert_system_labels_into_payload(payload): return json.dumps(job_spec) +def is_json(test_string: str) -> bool: + try: + json.loads(test_string) + except ValueError as _: + return False + return True + + +# rename this def cast_accelerator_count_to_int(payload): """Casts accelerator_count from string to an int.""" @@ -39,6 +48,7 @@ def cast_accelerator_count_to_int(payload): # TODO(b/353577594): accelerator_count placeholder is not resolved to int. # Need to typecast to int to avoid type mismatch error. Can remove when fix # placeholder resolution. + # worker_pool_spec = job_spec['job_spec']['worker_pool_specs'][0] if ( 'accelerator_count' in job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'] @@ -50,6 +60,44 @@ def cast_accelerator_count_to_int(payload): 'accelerator_count' ] ) + # print( + # 'after accelerator_count:' + # f' {repr(worker_pool_spec["machine_spec"]["accelerator_count"])}' + if ( + job_spec['job_spec']['worker_pool_specs'][0] + .get('boot_disk', {}) + .get('boot_disk_size_gb', '') + ): + job_spec['job_spec']['worker_pool_specs'][0]['boot_disk'][ + 'boot_disk_size_gb' + ] = int( + job_spec['job_spec']['worker_pool_specs'][0]['boot_disk'][ + 'boot_disk_size_gb' + ] + ) + + # this doesn't work on int fields for some reason + # if is_json( + # worker_pool_spec.get('machine_spec', {}).get('accelerator_count', '') + # ): + # worker_pool_spec['machine_spec']['accelerator_count'] = json.loads( + # worker_pool_spec['machine_spec']['accelerator_count'] + # ) + # if is_json( + # worker_pool_spec.get('boot_disk', {}).get('boot_disk_size_gb', '') + # ): + # worker_pool_spec['boot_disk']['boot_disk_size_gb'] = json.loads( + # worker_pool_spec['boot_disk']['boot_disk_size_gb'] + # ) + # if is_json(worker_pool_spec.get('container_spec', {}).get('env', '')): + # worker_pool_spec['container_spec']['env'] = json.loads( + # worker_pool_spec['container_spec']['env'] + # ) + # print(f"before nfs_mounts: {repr(worker_pool_spec['nfs_mounts'])}") + # if is_json(worker_pool_spec.get('nfs_mounts', '')): + # worker_pool_spec['nfs_mounts'] = + # json.loads(worker_pool_spec['nfs_mounts']) + # print(f"after nfs_mounts: {repr(worker_pool_spec['nfs_mounts'])}") return json.dumps(job_spec) diff --git a/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py b/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py index ed35a559f557..36e9039987cd 100644 --- a/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py +++ b/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py @@ -48,6 +48,9 @@ def _replace_executor_placeholder( ] +# there is trouble parsing nfs_mounts +# https://pantheon.corp.google.com/vertex-ai/pipelines/locations/us-central1/runs/pipeline-20240731020844?project=managed-pipeline-test&e=13802955&mods=-autopush_coliseum&inv=1&invt=AbYIhQ +# i think probably same error with env # keep identical to CustomTrainingJobOp def create_custom_training_job_from_component( component_spec: Callable, @@ -163,15 +166,32 @@ def create_custom_training_job_from_component( user_component_container.get('args', []) ), 'env': env or [], + # 'env': [], + # 'env': "{{$.inputs.parameters['env']}}", + # 'env': "{{$.json_escape[1].inputs.parameters['env']}}", + # 'env': "{{$.inputs.parameters['env'].json_escape[1]}}", + # 'env': "{{$.inputs.parameters['env'].json_escape[0]}}", + # 'env': "{{$.inputs.parameters['env'].json_escape[2]}}", }, + 'disk_spec': { + 'boot_disk_type': "{{$.inputs.parameters['boot_disk_type']}}", + # 'boot_disk_size_gb': "{{$.inputs.parameters['boot_disk_size_gb']}}", + }, + # 'nfs_mounts': "{{$.inputs.parameters['nfs_mounts']}}", } - if boot_disk_type: - worker_pool_spec['disk_spec'] = { - 'boot_disk_type': boot_disk_type, - 'boot_disk_size_gb': boot_disk_size_gb, - } - if nfs_mounts: - worker_pool_spec['nfs_mounts'] = nfs_mounts + # list fields (env, nfs_mounts) cause issues with parsing payload. + # even with local unit tests + # probably remote_runner json.loads is doing something unexpected. + + # worker_pool_spec['container_spec']['env'] = json.loads( + + # if boot_disk_type: + # worker_pool_spec['disk_spec'] = { + # 'boot_disk_type': boot_disk_type, + # 'boot_disk_size_gb': boot_disk_size_gb, + # } + # if nfs_mounts: + # worker_pool_spec['nfs_mounts'] = nfs_mounts worker_pool_specs = [worker_pool_spec] @@ -212,9 +232,6 @@ def create_custom_training_job_from_component( ] = default_value # add machine parameters into the customjob component - if accelerator_type == 'ACCELERATOR_TYPE_UNSPECIFIED': - accelerator_count = 0 - cj_component_spec['inputDefinitions']['parameters']['machine_type'] = { 'parameterType': 'STRING', 'defaultValue': machine_type, @@ -227,7 +244,31 @@ def create_custom_training_job_from_component( } cj_component_spec['inputDefinitions']['parameters']['accelerator_count'] = { 'parameterType': 'NUMBER_INTEGER', - 'defaultValue': accelerator_count, + 'defaultValue': ( + accelerator_count + if accelerator_type != 'ACCELERATOR_TYPE_UNSPECIFIED' + else 0 + ), + 'isOptional': True, + } + cj_component_spec['inputDefinitions']['parameters']['boot_disk_type'] = { + 'parameterType': 'STRING', + 'defaultValue': boot_disk_type, + 'isOptional': True, + } + cj_component_spec['inputDefinitions']['parameters']['boot_disk_size_gb'] = { + 'parameterType': 'NUMBER_INTEGER', + 'defaultValue': boot_disk_size_gb, + 'isOptional': True, + } + cj_component_spec['inputDefinitions']['parameters']['nfs_mounts'] = { + 'parameterType': 'LIST', + 'defaultValue': nfs_mounts or [], + 'isOptional': True, + } + cj_component_spec['inputDefinitions']['parameters']['env'] = { + 'parameterType': 'LIST', + 'defaultValue': env or [], 'isOptional': True, }