diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 2433f09bc6d1..2d013d3652fd 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -3435,31 +3435,31 @@ def simple_pipeline(): ['exec-return-1']['container']) self.assertEqual( - '5', dict_format['deploymentSpec']['executors']['exec-return-1-2'] - ['container']['resources']['resourceCpuLimit']) + 5.0, dict_format['deploymentSpec']['executors']['exec-return-1-2'] + ['container']['resources']['cpuLimit']) self.assertNotIn( 'memoryLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-2']['container']['resources']) self.assertEqual( - '50G', dict_format['deploymentSpec']['executors']['exec-return-1-3'] - ['container']['resources']['resourceMemoryLimit']) + 50.0, dict_format['deploymentSpec']['executors']['exec-return-1-3'] + ['container']['resources']['memoryLimit']) self.assertNotIn( 'cpuLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-3']['container']['resources']) self.assertEqual( - '2', dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['resourceCpuRequest']) + 2.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['cpuRequest']) self.assertEqual( - '5', dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['resourceCpuLimit']) + 5.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['cpuLimit']) self.assertEqual( - '4G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['resourceMemoryRequest']) + 4.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['memoryRequest']) self.assertEqual( - '50G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['resourceMemoryLimit']) + 50.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['memoryLimit']) class TestPlatformConfig(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 421bef1ad2c3..dc10665944f7 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -15,9 +15,11 @@ import collections import copy +import re from typing import DefaultDict, Dict, List, Mapping, Set, Tuple, Union from kfp import dsl +from kfp.dsl import constants from kfp.dsl import for_loop from kfp.dsl import pipeline_channel from kfp.dsl import pipeline_context @@ -803,3 +805,79 @@ def recursive_replace_placeholders(data: Union[Dict, List], old_value: str, if isinstance(data, pipeline_channel.PipelineChannel): data = str(data) return new_value if data == old_value else data + + +def validate_cpu_request_limit_to_float(cpu: str) -> float: + """Validates cpu request/limit string and converts to its numeric float + value. + + Args: + cpu: CPU requests or limits. This string should be a number or a + number followed by an "m" to indicate millicores (1/1000). For + more information, see `Specify a CPU Request and a CPU Limit + `_. + + Raises: + ValueError if the cpu request/limit string value is invalid. + + Returns: + The numeric value (float) of the cpu request/limit. + """ + if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: + raise ValueError( + 'Invalid cpu string. Should be float or integer, or integer' + ' followed by "m".') + + return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) + + +def validate_memory_request_limit_to_float(memory: str) -> float: + """Validates memory request/limit string and converts to its numeric value. + + Args: + memory: Memory requests or limits. This string should be a number or + a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", + "Gi", "M", "Mi", "K", or "Ki". + + Raises: + ValueError if the memory request/limit string value is invalid. + + Returns: + The numeric value (float) of the memory request/limit. + """ + if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', + memory) is None: + raise ValueError( + 'Invalid memory string. Should be a number or a number ' + 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' + '"Gi", "M", "Mi", "K", "Ki".') + + if memory.endswith('E'): + memory = float(memory[:-1]) * constants._E / constants._G + elif memory.endswith('Ei'): + memory = float(memory[:-2]) * constants._EI / constants._G + elif memory.endswith('P'): + memory = float(memory[:-1]) * constants._P / constants._G + elif memory.endswith('Pi'): + memory = float(memory[:-2]) * constants._PI / constants._G + elif memory.endswith('T'): + memory = float(memory[:-1]) * constants._T / constants._G + elif memory.endswith('Ti'): + memory = float(memory[:-2]) * constants._TI / constants._G + elif memory.endswith('G'): + memory = float(memory[:-1]) + elif memory.endswith('Gi'): + memory = float(memory[:-2]) * constants._GI / constants._G + elif memory.endswith('M'): + memory = float(memory[:-1]) * constants._M / constants._G + elif memory.endswith('Mi'): + memory = float(memory[:-2]) * constants._MI / constants._G + elif memory.endswith('K'): + memory = float(memory[:-1]) * constants._K / constants._G + elif memory.endswith('Ki'): + memory = float(memory[:-2]) * constants._KI / constants._G + else: + # By default interpret as a plain integer, in the unit of Bytes. + memory = float(memory) / constants._G + + return memory diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index afc014530fa2..582a5c0e5901 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -648,29 +648,42 @@ def convert_to_placeholder(input_value: str) -> str: container_spec.resources.resource_cpu_request = ( convert_to_placeholder( task.container_spec.resources.cpu_request)) + container_spec.resources.cpu_request = compiler_utils.validate_cpu_request_limit_to_float( + cpu=convert_to_placeholder( + task.container_spec.resources.cpu_request)) if task.container_spec.resources.cpu_limit is not None: container_spec.resources.resource_cpu_limit = ( - convert_to_placeholder(task.container_spec.resources.cpu_limit)) + convert_to_placeholder( + task.container_spec.resources.cpu_limit)) + container_spec.resources.cpu_limit = compiler_utils.validate_cpu_request_limit_to_float( + cpu=convert_to_placeholder( + task.container_spec.resources.cpu_limit)) if task.container_spec.resources.memory_request is not None: container_spec.resources.resource_memory_request = ( convert_to_placeholder( task.container_spec.resources.memory_request)) + container_spec.resources.memory_request = compiler_utils.validate_memory_request_limit_to_float( + memory=convert_to_placeholder( + task.container_spec.resources.memory_request)) if task.container_spec.resources.memory_limit is not None: container_spec.resources.resource_memory_limit = ( convert_to_placeholder( task.container_spec.resources.memory_limit)) + container_spec.resources.memory_limit = compiler_utils.validate_memory_request_limit_to_float( + memory=convert_to_placeholder( + task.container_spec.resources.memory_limit)) if task.container_spec.resources.accelerator_count is not None: container_spec.resources.accelerator.CopyFrom( pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec .ResourceSpec.AcceleratorConfig( - resource_type=convert_to_placeholder( - task.container_spec.resources.accelerator_type), - resource_count=convert_to_placeholder( - task.container_spec.resources.accelerator_count), type=convert_to_placeholder( task.container_spec.resources.accelerator_type), count=convert_to_placeholder( int(task.container_spec.resources.accelerator_count)), + resource_type=convert_to_placeholder( + task.container_spec.resources.accelerator_type), + resource_count=convert_to_placeholder( + task.container_spec.resources.accelerator_count), )) return container_spec diff --git a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml index 380151586e9e..81595db1803c 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml @@ -62,13 +62,17 @@ deploymentSpec: resources: accelerator: count: '1' - type: tpu-v3 + type: 'tpu-v3' resourceCount: '1' - resourceType: tpu-v3 + resourceType: 'tpu-v3' + cpuLimit: 4.0 + cpuRequest: 2.0 + memoryLimit: 15.032385536 + memoryRequest: 4.294967296 resourceCpuLimit: '4' resourceCpuRequest: '2' - resourceMemoryLimit: 14Gi - resourceMemoryRequest: 4Gi + resourceMemoryLimit: '14Gi' + resourceMemoryRequest: '4Gi' pipelineInfo: description: A linear two-step pipeline with resource specification. name: two-step-pipeline-with-resource-spec