From bc62bfd9ddb26b70e2e0e4a2f44ed5da2271b4ae Mon Sep 17 00:00:00 2001 From: "Ricardo M. Oliveira" Date: Tue, 19 Nov 2024 18:11:32 -0300 Subject: [PATCH] Backport fixes in kubeflow/pipelines#11075 Introduced back the functions to convert k8s size values to float, but moved to kfp.dsl.utils Signed-off-by: Ricardo M. Oliveira --- sdk/python/kfp/compiler/compiler_test.py | 24 +++--- .../kfp/compiler/pipeline_spec_builder.py | 19 +++-- sdk/python/kfp/dsl/utils.py | 78 +++++++++++++++++++ 3 files changed, 104 insertions(+), 17 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 2433f09bc6d1..2d013d3652fd 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -3435,31 +3435,31 @@ def simple_pipeline(): ['exec-return-1']['container']) self.assertEqual( - '5', dict_format['deploymentSpec']['executors']['exec-return-1-2'] - ['container']['resources']['resourceCpuLimit']) + 5.0, dict_format['deploymentSpec']['executors']['exec-return-1-2'] + ['container']['resources']['cpuLimit']) self.assertNotIn( 'memoryLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-2']['container']['resources']) self.assertEqual( - '50G', dict_format['deploymentSpec']['executors']['exec-return-1-3'] - ['container']['resources']['resourceMemoryLimit']) + 50.0, dict_format['deploymentSpec']['executors']['exec-return-1-3'] + ['container']['resources']['memoryLimit']) self.assertNotIn( 'cpuLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-3']['container']['resources']) self.assertEqual( - '2', dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['resourceCpuRequest']) + 2.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['cpuRequest']) self.assertEqual( - '5', dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['resourceCpuLimit']) + 5.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['cpuLimit']) self.assertEqual( - '4G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['resourceMemoryRequest']) + 4.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['memoryRequest']) self.assertEqual( - '50G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] - ['container']['resources']['resourceMemoryLimit']) + 50.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['memoryLimit']) class TestPlatformConfig(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index afc014530fa2..3e89dbe6fef5 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -645,17 +645,30 @@ def convert_to_placeholder(input_value: str) -> str: if task.container_spec.resources is not None: if task.container_spec.resources.cpu_request is not None: + container_spec.resources.cpu_request = utils.validate_cpu_request_limit_to_float( + cpu=convert_to_placeholder( + task.container_spec.resources.cpu_request)) container_spec.resources.resource_cpu_request = ( convert_to_placeholder( task.container_spec.resources.cpu_request)) if task.container_spec.resources.cpu_limit is not None: + container_spec.resources.cpu_limit = utils.validate_cpu_request_limit_to_float( + cpu=convert_to_placeholder( + task.container_spec.resources.cpu_limit)) container_spec.resources.resource_cpu_limit = ( - convert_to_placeholder(task.container_spec.resources.cpu_limit)) + convert_to_placeholder( + task.container_spec.resources.cpu_limit)) if task.container_spec.resources.memory_request is not None: + container_spec.resources.memory_request = utils.validate_memory_request_limit_to_float( + memory=convert_to_placeholder( + task.container_spec.resources.memory_request)) container_spec.resources.resource_memory_request = ( convert_to_placeholder( task.container_spec.resources.memory_request)) if task.container_spec.resources.memory_limit is not None: + container_spec.resources.memory_limit = utils.validate_memory_request_limit_to_float( + memory=convert_to_placeholder( + task.container_spec.resources.memory_limit)) container_spec.resources.resource_memory_limit = ( convert_to_placeholder( task.container_spec.resources.memory_limit)) @@ -663,10 +676,6 @@ def convert_to_placeholder(input_value: str) -> str: container_spec.resources.accelerator.CopyFrom( pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec .ResourceSpec.AcceleratorConfig( - resource_type=convert_to_placeholder( - task.container_spec.resources.accelerator_type), - resource_count=convert_to_placeholder( - task.container_spec.resources.accelerator_count), type=convert_to_placeholder( task.container_spec.resources.accelerator_type), count=convert_to_placeholder( diff --git a/sdk/python/kfp/dsl/utils.py b/sdk/python/kfp/dsl/utils.py index 4400bca89424..a91ce50d047a 100644 --- a/sdk/python/kfp/dsl/utils.py +++ b/sdk/python/kfp/dsl/utils.py @@ -20,6 +20,8 @@ import types from typing import List +from kfp.dsl import constants + COMPONENT_NAME_PREFIX = 'comp-' _EXECUTOR_LABEL_PREFIX = 'exec-' @@ -126,3 +128,79 @@ def validate_pipeline_name(name: str) -> None: 'Please specify a pipeline name that matches the regular ' 'expression "^[a-z0-9][a-z0-9-]{0,127}$" using ' '`dsl.pipeline(name=...)` decorator.' % name) + + +def validate_cpu_request_limit_to_float(cpu: str) -> float: + """Validates cpu request/limit string and converts to its numeric float + value. + + Args: + cpu: CPU requests or limits. This string should be a number or a + number followed by an "m" to indicate millicores (1/1000). For + more information, see `Specify a CPU Request and a CPU Limit + `_. + + Raises: + ValueError if the cpu request/limit string value is invalid. + + Returns: + The numeric value (float) of the cpu request/limit. + """ + if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: + raise ValueError( + 'Invalid cpu string. Should be float or integer, or integer' + ' followed by "m".') + + return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) + + +def validate_memory_request_limit_to_float(memory: str) -> float: + """Validates memory request/limit string and converts to its numeric value. + + Args: + memory: Memory requests or limits. This string should be a number or + a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", + "Gi", "M", "Mi", "K", or "Ki". + + Raises: + ValueError if the memory request/limit string value is invalid. + + Returns: + The numeric value (float) of the memory request/limit. + """ + if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', + memory) is None: + raise ValueError( + 'Invalid memory string. Should be a number or a number ' + 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' + '"Gi", "M", "Mi", "K", "Ki".') + + if memory.endswith('E'): + memory = float(memory[:-1]) * constants._E / constants._G + elif memory.endswith('Ei'): + memory = float(memory[:-2]) * constants._EI / constants._G + elif memory.endswith('P'): + memory = float(memory[:-1]) * constants._P / constants._G + elif memory.endswith('Pi'): + memory = float(memory[:-2]) * constants._PI / constants._G + elif memory.endswith('T'): + memory = float(memory[:-1]) * constants._T / constants._G + elif memory.endswith('Ti'): + memory = float(memory[:-2]) * constants._TI / constants._G + elif memory.endswith('G'): + memory = float(memory[:-1]) + elif memory.endswith('Gi'): + memory = float(memory[:-2]) * constants._GI / constants._G + elif memory.endswith('M'): + memory = float(memory[:-1]) * constants._M / constants._G + elif memory.endswith('Mi'): + memory = float(memory[:-2]) * constants._MI / constants._G + elif memory.endswith('K'): + memory = float(memory[:-1]) * constants._K / constants._G + elif memory.endswith('Ki'): + memory = float(memory[:-2]) * constants._KI / constants._G + else: + # By default interpret as a plain integer, in the unit of Bytes. + memory = float(memory) / constants._G + + return memory