Skip to content

Commit

Permalink
feat(components): Support dynamic values for boot_disk_type, boot_dis…
Browse files Browse the repository at this point in the history
…k_size, nfs_mounts, env in preview.custom_job.utils.create_custom_training_job_from_component

Signed-off-by: Googler <nobody@google.com>
PiperOrigin-RevId: 657752663
  • Loading branch information
Googler committed Aug 1, 2024
1 parent c420f8c commit 08518e2
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 18 deletions.
1 change: 1 addition & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Add check that component in preview.custom_job.utils.create_custom_training_job_from_component doesn't have any parameters that share names with any custom job fields
* Add dynamic machine spec support for `preview.custom_job.utils.create_custom_training_job_from_component`.
* Add preflight validations for LLM text generation pipeline.
* Add dynamic support for boot_disk_type, boot_disk_size, nfs_mounts, env in `preview.custom_job.utils.create_custom_training_job_from_component`.

## Release 2.15.0
* Add Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,66 @@ def insert_system_labels_into_payload(payload):
return json.dumps(job_spec)


def is_json(test_string: str) -> bool:
try:
json.loads(test_string)
except ValueError as _:
return False
return True


# rename this
def cast_accelerator_count_to_int(payload):
"""Casts accelerator_count from string to an int."""

job_spec = json.loads(payload)
# TODO(b/353577594): accelerator_count placeholder is not resolved to int.
# Need to typecast to int to avoid type mismatch error. Can remove when fix
# placeholder resolution.
if (
'accelerator_count'
in job_spec['job_spec']['worker_pool_specs'][0]['machine_spec']
):
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
'accelerator_count'
] = int(
worker_pool_spec = job_spec['job_spec']['worker_pool_specs'][0]
# print(
# 'before accelerator_count:'
# f' {repr(worker_pool_spec["machine_spec"]["accelerator_count"])}'
# )
if 'accelerator_count' in worker_pool_spec['machine_spec']:
worker_pool_spec['machine_spec']['accelerator_count'] = int(
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
'accelerator_count'
]
)
# print(
# 'after accelerator_count:'
# f' {repr(worker_pool_spec["machine_spec"]["accelerator_count"])}'
# )
# if worker_pool_spec.get('boot_disk', {}).get('boot_disk_size_gb', ''):
# worker_pool_spec['boot_disk']['boot_disk_size_gb'] = int(
# job_spec['job_spec']['worker_pool_specs'][0]['boot_disk'][
# 'boot_disk_size_gb'
# ]
# )

# this doesn't work on int fields for some reason
# if is_json(
# worker_pool_spec.get('machine_spec', {}).get('accelerator_count', '')
# ):
# worker_pool_spec['machine_spec']['accelerator_count'] = json.loads(
# worker_pool_spec['machine_spec']['accelerator_count']
# )
# if is_json(
# worker_pool_spec.get('boot_disk', {}).get('boot_disk_size_gb', '')
# ):
# worker_pool_spec['boot_disk']['boot_disk_size_gb'] = json.loads(
# worker_pool_spec['boot_disk']['boot_disk_size_gb']
# )
# if is_json(worker_pool_spec.get('container_spec', {}).get('env', '')):
# worker_pool_spec['container_spec']['env'] = json.loads(
# worker_pool_spec['container_spec']['env']
# )
# print(f"before nfs_mounts: {repr(worker_pool_spec['nfs_mounts'])}")
# if is_json(worker_pool_spec.get('nfs_mounts', '')):
# worker_pool_spec['nfs_mounts'] =
# json.loads(worker_pool_spec['nfs_mounts'])
# print(f"after nfs_mounts: {repr(worker_pool_spec['nfs_mounts'])}")
return json.dumps(job_spec)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def _replace_executor_placeholder(
]


# there is trouble parsing nfs_mounts
# https://pantheon.corp.google.com/vertex-ai/pipelines/locations/us-central1/runs/pipeline-20240731020844?project=managed-pipeline-test&e=13802955&mods=-autopush_coliseum&inv=1&invt=AbYIhQ
# i think probably same error with env
# keep identical to CustomTrainingJobOp
def create_custom_training_job_from_component(
component_spec: Callable,
Expand Down Expand Up @@ -163,15 +166,32 @@ def create_custom_training_job_from_component(
user_component_container.get('args', [])
),
'env': env or [],
# 'env': [],
# 'env': "{{$.inputs.parameters['env']}}",
# 'env': "{{$.json_escape[1].inputs.parameters['env']}}",
# 'env': "{{$.inputs.parameters['env'].json_escape[1]}}",
# 'env': "{{$.inputs.parameters['env'].json_escape[0]}}",
# 'env': "{{$.inputs.parameters['env'].json_escape[2]}}",
},
'disk_spec': {
'boot_disk_type': "{{$.inputs.parameters['boot_disk_type']}}",
'boot_disk_size_gb': "{{$.inputs.parameters['boot_disk_size_gb']}}",
},
# 'nfs_mounts': "{{$.inputs.parameters['nfs_mounts']}}",
}
if boot_disk_type:
worker_pool_spec['disk_spec'] = {
'boot_disk_type': boot_disk_type,
'boot_disk_size_gb': boot_disk_size_gb,
}
if nfs_mounts:
worker_pool_spec['nfs_mounts'] = nfs_mounts
# list fields (env, nfs_mounts) cause issues with parsing payload.
# even with local unit tests
# probably remote_runner json.loads is doing something unexpected.

# worker_pool_spec['container_spec']['env'] = json.loads(

# if boot_disk_type:
# worker_pool_spec['disk_spec'] = {
# 'boot_disk_type': boot_disk_type,
# 'boot_disk_size_gb': boot_disk_size_gb,
# }
# if nfs_mounts:
# worker_pool_spec['nfs_mounts'] = nfs_mounts

worker_pool_specs = [worker_pool_spec]

Expand Down Expand Up @@ -212,9 +232,6 @@ def create_custom_training_job_from_component(
] = default_value

# add machine parameters into the customjob component
if accelerator_type == 'ACCELERATOR_TYPE_UNSPECIFIED':
accelerator_count = 0

cj_component_spec['inputDefinitions']['parameters']['machine_type'] = {
'parameterType': 'STRING',
'defaultValue': machine_type,
Expand All @@ -227,7 +244,31 @@ def create_custom_training_job_from_component(
}
cj_component_spec['inputDefinitions']['parameters']['accelerator_count'] = {
'parameterType': 'NUMBER_INTEGER',
'defaultValue': accelerator_count,
'defaultValue': (
accelerator_count
if accelerator_type != 'ACCELERATOR_TYPE_UNSPECIFIED'
else 0
),
'isOptional': True,
}
cj_component_spec['inputDefinitions']['parameters']['boot_disk_type'] = {
'parameterType': 'STRING',
'defaultValue': boot_disk_type,
'isOptional': True,
}
cj_component_spec['inputDefinitions']['parameters']['boot_disk_size_gb'] = {
'parameterType': 'NUMBER_INTEGER',
'defaultValue': boot_disk_size_gb,
'isOptional': True,
}
cj_component_spec['inputDefinitions']['parameters']['nfs_mounts'] = {
'parameterType': 'LIST',
'defaultValue': nfs_mounts or [],
'isOptional': True,
}
cj_component_spec['inputDefinitions']['parameters']['env'] = {
'parameterType': 'LIST',
'defaultValue': env or [],
'isOptional': True,
}

Expand Down

0 comments on commit 08518e2

Please sign in to comment.