Skip to content

Commit cf18aba

Browse files
author
Googler
committed
feat(components): Support dynamic values for boot_disk_type, boot_disk_size, nfs_mounts, env in preview.custom_job.utils.create_custom_training_job_from_component
Signed-off-by: Googler <nobody@google.com> PiperOrigin-RevId: 657752663
1 parent c420f8c commit cf18aba

File tree

3 files changed

+91
-14
lines changed

3 files changed

+91
-14
lines changed

components/google-cloud/RELEASE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* Add check that component in preview.custom_job.utils.create_custom_training_job_from_component doesn't have any parameters that share names with any custom job fields
99
* Add dynamic machine spec support for `preview.custom_job.utils.create_custom_training_job_from_component`.
1010
* Add preflight validations for LLM text generation pipeline.
11+
* Add dynamic support for boot_disk_type, boot_disk_size, nfs_mounts, env in `preview.custom_job.utils.create_custom_training_job_from_component`.
1112

1213
## Release 2.15.0
1314
* Add Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline`.

components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,66 @@ def insert_system_labels_into_payload(payload):
3232
return json.dumps(job_spec)
3333

3434

35+
def is_json(test_string: str) -> bool:
36+
try:
37+
json.loads(test_string)
38+
except ValueError as _:
39+
return False
40+
return True
41+
42+
43+
# rename this
3544
def cast_accelerator_count_to_int(payload):
3645
"""Casts accelerator_count from string to an int."""
3746

3847
job_spec = json.loads(payload)
3948
# TODO(b/353577594): accelerator_count placeholder is not resolved to int.
4049
# Need to typecast to int to avoid type mismatch error. Can remove when fix
4150
# placeholder resolution.
42-
if (
43-
'accelerator_count'
44-
in job_spec['job_spec']['worker_pool_specs'][0]['machine_spec']
45-
):
46-
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
47-
'accelerator_count'
48-
] = int(
51+
worker_pool_spec = job_spec['job_spec']['worker_pool_specs'][0]
52+
# print(
53+
# 'before accelerator_count:'
54+
# f' {repr(worker_pool_spec["machine_spec"]["accelerator_count"])}'
55+
# )
56+
if 'accelerator_count' in worker_pool_spec['machine_spec']:
57+
worker_pool_spec['machine_spec']['accelerator_count'] = int(
4958
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
5059
'accelerator_count'
5160
]
5261
)
62+
# print(
63+
# 'after accelerator_count:'
64+
# f' {repr(worker_pool_spec["machine_spec"]["accelerator_count"])}'
65+
# )
66+
if worker_pool_spec.get('boot_disk', {}).get('boot_disk_size_gb', ''):
67+
worker_pool_spec['boot_disk']['boot_disk_size_gb'] = int(
68+
job_spec['job_spec']['worker_pool_specs'][0]['boot_disk'][
69+
'boot_disk_size_gb'
70+
]
71+
)
72+
73+
# this doesn't work on int fields for some reason
74+
# if is_json(
75+
# worker_pool_spec.get('machine_spec', {}).get('accelerator_count', '')
76+
# ):
77+
# worker_pool_spec['machine_spec']['accelerator_count'] = json.loads(
78+
# worker_pool_spec['machine_spec']['accelerator_count']
79+
# )
80+
# if is_json(
81+
# worker_pool_spec.get('boot_disk', {}).get('boot_disk_size_gb', '')
82+
# ):
83+
# worker_pool_spec['boot_disk']['boot_disk_size_gb'] = json.loads(
84+
# worker_pool_spec['boot_disk']['boot_disk_size_gb']
85+
# )
86+
# if is_json(worker_pool_spec.get('container_spec', {}).get('env', '')):
87+
# worker_pool_spec['container_spec']['env'] = json.loads(
88+
# worker_pool_spec['container_spec']['env']
89+
# )
90+
# print(f"before nfs_mounts: {repr(worker_pool_spec['nfs_mounts'])}")
91+
# if is_json(worker_pool_spec.get('nfs_mounts', '')):
92+
# worker_pool_spec['nfs_mounts'] =
93+
# json.loads(worker_pool_spec['nfs_mounts'])
94+
# print(f"after nfs_mounts: {repr(worker_pool_spec['nfs_mounts'])}")
5395
return json.dumps(job_spec)
5496

5597

components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ def _replace_executor_placeholder(
4848
]
4949

5050

51+
# there is trouble parsing nfs_mounts
52+
# https://pantheon.corp.google.com/vertex-ai/pipelines/locations/us-central1/runs/pipeline-20240731020844?project=managed-pipeline-test&e=13802955&mods=-autopush_coliseum&inv=1&invt=AbYIhQ
53+
# i think probably same error with env
5154
# keep identical to CustomTrainingJobOp
5255
def create_custom_training_job_from_component(
5356
component_spec: Callable,
@@ -163,15 +166,26 @@ def create_custom_training_job_from_component(
163166
user_component_container.get('args', [])
164167
),
165168
'env': env or [],
169+
# 'env': "{{$.inputs.parameters['env']}}",
166170
},
171+
'disk_spec': {
172+
'boot_disk_type': "{{$.inputs.parameters['boot_disk_type']}}",
173+
'boot_disk_size_gb': "{{$.inputs.parameters['boot_disk_size_gb']}}",
174+
},
175+
# 'nfs_mounts': "{{$.inputs.parameters['nfs_mounts']}}",
167176
}
168-
if boot_disk_type:
169-
worker_pool_spec['disk_spec'] = {
170-
'boot_disk_type': boot_disk_type,
171-
'boot_disk_size_gb': boot_disk_size_gb,
172-
}
173-
if nfs_mounts:
174-
worker_pool_spec['nfs_mounts'] = nfs_mounts
177+
# todo: I think the list fields (env, nfs_mounts) cause issues with parsing payload.
178+
# probably remote_runner json.loads is doing something unexpected.
179+
180+
# worker_pool_spec['container_spec']['env'] = json.loads(
181+
182+
# if boot_disk_type:
183+
# worker_pool_spec['disk_spec'] = {
184+
# 'boot_disk_type': boot_disk_type,
185+
# 'boot_disk_size_gb': boot_disk_size_gb,
186+
# }
187+
# if nfs_mounts:
188+
# worker_pool_spec['nfs_mounts'] = nfs_mounts
175189

176190
worker_pool_specs = [worker_pool_spec]
177191

@@ -230,6 +244,26 @@ def create_custom_training_job_from_component(
230244
'defaultValue': accelerator_count,
231245
'isOptional': True,
232246
}
247+
cj_component_spec['inputDefinitions']['parameters']['boot_disk_type'] = {
248+
'parameterType': 'STRING',
249+
'defaultValue': boot_disk_type,
250+
'isOptional': True,
251+
}
252+
cj_component_spec['inputDefinitions']['parameters']['boot_disk_size_gb'] = {
253+
'parameterType': 'NUMBER_INTEGER',
254+
'defaultValue': boot_disk_size_gb,
255+
'isOptional': True,
256+
}
257+
cj_component_spec['inputDefinitions']['parameters']['nfs_mounts'] = {
258+
'parameterType': 'LIST',
259+
'defaultValue': nfs_mounts,
260+
'isOptional': True,
261+
}
262+
cj_component_spec['inputDefinitions']['parameters']['env'] = {
263+
'parameterType': 'LIST',
264+
'defaultValue': env,
265+
'isOptional': True,
266+
}
233267

234268
# check if user component has any input parameters that already exist in the
235269
# custom job component

0 commit comments

Comments
 (0)