Skip to content

Commit e44dfa7

Browse files
author
Googler
committed
feat(components): Support dynamic machine parameters in preview.custom_job.utils.create_custom_training_job_from_component
Signed-off-by: Googler <nobody@google.com> PiperOrigin-RevId: 657895789
1 parent eea7d48 commit e44dfa7

File tree

3 files changed

+51
-6
lines changed

3 files changed

+51
-6
lines changed

components/google-cloud/RELEASE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* Add support for running tasks on a `PersistentResource` (see [CustomJobSpec](https://cloud.google.com/vertex-ai/docs/reference/rest/v1beta1/CustomJobSpec)) via `persistent_resource_id` parameter on `v1.custom_job.CustomTrainingJobOp` and `v1.custom_job.create_custom_training_job_from_component`
77
* Bump image for Structured Data pipelines.
88
* Add check that component in preview.custom_job.utils.create_custom_training_job_from_component doesn't have any parameters that share names with any custom job fields
9+
* Add dynamic machine spec support for `preview.custom_job.utils.create_custom_training_job_from_component`.
910

1011
## Release 2.15.0
1112
* Add Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline`.

components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,27 @@ def insert_system_labels_into_payload(payload):
3232
return json.dumps(job_spec)
3333

3434

35+
def cast_accelerator_count_to_int(payload):
36+
"""Casts accelerator_count from string to an int."""
37+
38+
job_spec = json.loads(payload)
39+
# TODO(b/353577594): accelerator_count placeholder is not resolved to int.
40+
# Need to typecast to int to avoid type mismatch error. Can remove when fix
41+
# placeholder resolution.
42+
if (
43+
'accelerator_count'
44+
in job_spec['job_spec']['worker_pool_specs'][0]['machine_spec']
45+
):
46+
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
47+
'accelerator_count'
48+
] = int(
49+
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
50+
'accelerator_count'
51+
]
52+
)
53+
return json.dumps(job_spec)
54+
55+
3556
def create_custom_job_with_client(job_client, parent, job_spec):
3657
create_custom_job_fn = None
3758
try:
@@ -86,6 +107,7 @@ def create_custom_job(
86107
# Create custom job if it does not exist
87108
job_name = remote_runner.check_if_job_exists()
88109
if job_name is None:
110+
payload = cast_accelerator_count_to_int(payload)
89111
job_name = remote_runner.create_job(
90112
create_custom_job_with_client,
91113
insert_system_labels_into_payload(payload),

components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def create_custom_training_job_from_component(
5454
display_name: str = '',
5555
replica_count: int = 1,
5656
machine_type: str = 'n1-standard-4',
57-
accelerator_type: str = '',
57+
accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED',
5858
accelerator_count: int = 1,
5959
boot_disk_type: str = 'pd-ssd',
6060
boot_disk_size_gb: int = 100,
@@ -83,7 +83,7 @@ def create_custom_training_job_from_component(
8383
replica_count: The count of instances in the cluster. One replica always counts towards the master in worker_pool_spec[0] and the remaining replicas will be allocated in worker_pool_spec[1]. See [more information.](https://cloud.google.com/vertex-ai/docs/training/distributed-training#configure_a_distributed_training_job)
8484
machine_type: The type of the machine to run the CustomJob. The default value is "n1-standard-4". See [more information](https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types).
8585
accelerator_type: The type of accelerator(s) that may be attached to the machine per `accelerator_count`. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype).
86-
accelerator_count: The number of accelerators to attach to the machine. Defaults to 1 if `accelerator_type` is set.
86+
accelerator_count: The number of accelerators to attach to the machine. Defaults to 1 if `accelerator_type` is set statically.
8787
boot_disk_type: Type of the boot disk (default is "pd-ssd"). Valid values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard" (Persistent Disk Hard Disk Drive). boot_disk_type is set as a static value and cannot be changed as a pipeline parameter.
8888
boot_disk_size_gb: Size in GB of the boot disk (default is 100GB). `boot_disk_size_gb` is set as a static value and cannot be changed as a pipeline parameter.
8989
timeout: The maximum job running time. The default is 7 days. A duration in seconds with up to nine fractional digits, terminated by 's', for example: "3.5s".
@@ -148,7 +148,11 @@ def create_custom_training_job_from_component(
148148
)[0]['container']
149149

150150
worker_pool_spec = {
151-
'machine_spec': {'machine_type': machine_type},
151+
'machine_spec': {
152+
'machine_type': "{{$.inputs.parameters['machine_type']}}",
153+
'accelerator_type': "{{$.inputs.parameters['accelerator_type']}}",
154+
'accelerator_count': "{{$.inputs.parameters['accelerator_count']}}",
155+
},
152156
'replica_count': 1,
153157
'container_spec': {
154158
'image_uri': user_component_container['image'],
@@ -161,9 +165,6 @@ def create_custom_training_job_from_component(
161165
'env': env or [],
162166
},
163167
}
164-
if accelerator_type:
165-
worker_pool_spec['machine_spec']['accelerator_type'] = accelerator_type
166-
worker_pool_spec['machine_spec']['accelerator_count'] = accelerator_count
167168
if boot_disk_type:
168169
worker_pool_spec['disk_spec'] = {
169170
'boot_disk_type': boot_disk_type,
@@ -210,6 +211,26 @@ def create_custom_training_job_from_component(
210211
'defaultValue'
211212
] = default_value
212213

214+
# add machine parameters into the customjob component
215+
if accelerator_type == 'ACCELERATOR_TYPE_UNSPECIFIED':
216+
accelerator_count = 0
217+
218+
cj_component_spec['inputDefinitions']['parameters']['machine_type'] = {
219+
'parameterType': 'STRING',
220+
'defaultValue': machine_type,
221+
'isOptional': True,
222+
}
223+
cj_component_spec['inputDefinitions']['parameters']['accelerator_type'] = {
224+
'parameterType': 'STRING',
225+
'defaultValue': accelerator_type,
226+
'isOptional': True,
227+
}
228+
cj_component_spec['inputDefinitions']['parameters']['accelerator_count'] = {
229+
'parameterType': 'NUMBER_INTEGER',
230+
'defaultValue': accelerator_count,
231+
'isOptional': True,
232+
}
233+
213234
# check if user component has any input parameters that already exist in the
214235
# custom job component
215236
for param_name in user_component_spec.get('inputDefinitions', {}).get(
@@ -234,6 +255,7 @@ def create_custom_training_job_from_component(
234255
cj_component_spec['outputDefinitions']['parameters'].update(
235256
user_component_spec.get('outputDefinitions', {}).get('parameters', {})
236257
)
258+
237259
# use artifacts from user component
238260
## assign artifacts, not update, since customjob has no artifact outputs
239261
cj_component_spec['inputDefinitions']['artifacts'] = user_component_spec.get(

0 commit comments

Comments
 (0)